-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRemoteRC28Controller.py
More file actions
58 lines (52 loc) · 1.98 KB
/
RemoteRC28Controller.py
File metadata and controls
58 lines (52 loc) · 1.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import time
import socket
import threading
class RemoteRC28Controller:
def __init__(self, host='localhost', port=5100):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect((host, port))
print(f"Connected to server at {host}:{port}")
except Exception as e:
raise Exception(f"Failed to connect: {e}")
self.callbacks = {
'button_press': [],
'button_release': [],
'wheel': []
}
self.running = True
self.thread = threading.Thread(target=self._read_loop, daemon=True)
self.thread.start()
def set_lights(self, transmit, left_orange, right_orange):
command = f"SET_LIGHTS {int(transmit)} {int(left_orange)} {int(right_orange)}\n"
self.sock.sendall(command.encode())
def register_callback(self, event_type, callback):
if event_type in self.callbacks:
self.callbacks[event_type].append(callback)
def _read_loop(self):
buffer = ""
while self.running:
try:
data = self.sock.recv(1024).decode()
if not data:
break
buffer += data
while '\n' in buffer:
line, buffer = buffer.split('\n', 1)
parts = line.strip().split()
if parts[0] == 'button_press':
for cb in self.callbacks['button_press']:
cb(parts[1])
elif parts[0] == 'button_release':
for cb in self.callbacks['button_release']:
cb()
elif parts[0] == 'wheel':
val = int(parts[1])
for cb in self.callbacks['wheel']:
cb(val)
except:
break
def close(self):
self.running = False
self.thread.join()
self.sock.close()