init
This commit is contained in:
commit
cf0af3737d
2 changed files with 115 additions and 0 deletions
102
main.py
Normal file
102
main.py
Normal file
|
@ -0,0 +1,102 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
import libevdev
|
||||
#from libevdev import ecodes
|
||||
|
||||
def main(device_path, dest_ip, dest_port,username,password):
|
||||
"""
|
||||
Reads absolute X and Y coordinates from an evdev device and sends them
|
||||
over UDP to a specified address and port.
|
||||
|
||||
Args:
|
||||
device_path (str): The path to the input event device
|
||||
(e.g., /dev/input/event0).
|
||||
dest_ip (str): The destination IP address.
|
||||
dest_port (int): The destination port.
|
||||
"""
|
||||
try:
|
||||
# Open the evdev device in a non-blocking way
|
||||
fd = open(device_path, 'rb')
|
||||
d = libevdev.Device(fd)
|
||||
print(f"Successfully opened device: {d.name}")
|
||||
|
||||
|
||||
print("Listening for ABS_X and ABS_Y events. Press Ctrl+C to exit.")
|
||||
|
||||
# Create a UDP socket
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
print(f"Sending coordinates to {dest_ip}:{dest_port}")
|
||||
|
||||
# Variables to store the latest coordinates
|
||||
x_coor = None
|
||||
y_coor = None
|
||||
|
||||
# Loop indefinitely to read events
|
||||
while True:
|
||||
for e in d.events():
|
||||
# We are only interested in absolute axis events
|
||||
if e.type == libevdev.EV_ABS:
|
||||
if e.code == libevdev.EV_ABS.ABS_X:
|
||||
x_coor = e.value
|
||||
elif e.code == libevdev.EV_ABS.ABS_Y:
|
||||
y_coor = e.value
|
||||
|
||||
# When we have both coordinates, send them
|
||||
if x_coor is not None and y_coor is not None:
|
||||
print(f"Read coordinates: X={x_coor}, Y={y_coor}")
|
||||
|
||||
# Pack the two 16-bit unsigned integers into a buffer.
|
||||
# The format string 'HH' specifies two unsigned short integers (u16).
|
||||
# The '<' character ensures little-endian byte order.
|
||||
try:
|
||||
packet = struct.pack('<HH', x_coor, y_coor)
|
||||
|
||||
# Send the packet over UDP
|
||||
sock.sendto(packet, (dest_ip, dest_port))
|
||||
print(f"Sent {len(packet)} bytes.")
|
||||
|
||||
# Reset coordinates to wait for the next pair of events
|
||||
x_coor = None
|
||||
y_coor = None
|
||||
|
||||
except struct.error as se:
|
||||
print(f"Error packing data: {se}. Coordinates might be out of range for u16 (0-65535).")
|
||||
except socket.error as soe:
|
||||
print(f"Socket error: {soe}")
|
||||
|
||||
|
||||
except FileNotFoundError:
|
||||
print(f"Error: Device not found at {device_path}")
|
||||
print("Please ensure you have the correct event device path.")
|
||||
except PermissionError:
|
||||
print(f"Error: Permission denied for {device_path}.")
|
||||
print("This script usually needs to be run with root privileges (e.g., 'sudo python3 script.py')")
|
||||
except KeyboardInterrupt:
|
||||
print("\nExiting.")
|
||||
finally:
|
||||
if 'fd' in locals() and not fd.closed:
|
||||
fd.close()
|
||||
if 'sock' in locals():
|
||||
sock.close()
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 4:
|
||||
print("Usage: python3 evdev_udp_sender.py <device_path> <dest_ip> <dest_port>")
|
||||
print("Example: sudo python3 evdev_udp_sender.py /dev/input/event4 127.0.0.1 12345")
|
||||
sys.exit(1)
|
||||
|
||||
device = sys.argv[1]
|
||||
ip = sys.argv[2]
|
||||
try:
|
||||
port = int(sys.argv[3])
|
||||
if not (0 < port < 65536):
|
||||
raise ValueError
|
||||
except ValueError:
|
||||
print("Error: Port must be an integer between 1 and 65535.")
|
||||
sys.exit(1)
|
||||
|
||||
main(device, ip, port)
|
||||
|
13
shell.nix
Normal file
13
shell.nix
Normal file
|
@ -0,0 +1,13 @@
|
|||
let
|
||||
pkgs = import <nixpkgs> {};
|
||||
in pkgs.mkShell {
|
||||
packages = [ pkgs.libinput
|
||||
(pkgs.python3.withPackages (python-pkgs: [
|
||||
python-pkgs.paho-mqtt
|
||||
pkgs.libinput
|
||||
python-pkgs.libevdev
|
||||
]))
|
||||
];
|
||||
}
|
||||
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue