Quellcode für fusion_hat.user_button

import time
import threading
import warnings
from typing import Callable, Optional
import evdev
from evdev import InputDevice, ecodes
from .device import raise_if_fusion_hat_not_ready

[Doku] class UserButton: """ User button class using evdev for Linux input events """ def __init__(self) -> None: raise_if_fusion_hat_not_ready() self.pressed = False self.pressed_for = 0 self.pressed_at = time.time() self._device: Optional[InputDevice] = None self._device_path: Optional[str] = None self.__on_click__ = None self.__on_press__ = None self.__on_release__ = None self.__on_press_released__ = None # 尝试自动查找按钮设备 self._find_button_device() # 如果找到设备,设置事件监听 if self._device_path: self._setup_event_listener()
[Doku] def set_on_click(self, callback: Callable[[], None]) -> None: """ Set the callback function when the user button is clicked Args: callback (Callable[[], None]): callback function """ self.__on_click__ = callback
[Doku] def set_on_press(self, callback: Callable[[], None]) -> None: """ Set the callback function when the user button is pressed Args: callback (Callable[[], None]): callback function """ self.__on_press__ = callback
[Doku] def set_on_release(self, callback: Callable[[], None]) -> None: """ Set the callback function when the user button is released Args: callback (Callable[[], None]): callback function """ self.__on_release__ = callback
[Doku] def set_on_press_released(self, callback: Callable[[], None]) -> None: """ Set the callback function when the user button is pressed and released Args: callback (Callable[[], None]): callback function """ self.__on_press_released__ = callback
[Doku] def set_on_long_press(self, callback: Callable[[], None], duration: float=2.0) -> None: """ [Deprecated] Set the callback function when the user button is pressed for a long time Args: callback (Callable[[], None]): callback function """ pass
[Doku] def set_on_long_press_released(self, callback: Callable[[], None], duration: float=2.0) -> None: """ [Deprecated] Set the callback function when the user button is pressed for a long time and released Args: callback (Callable[[], None]): callback function duration (float, optional): long press duration(2.0~5.0) """ pass
[Doku] def _find_button_device(self) -> None: """ Find the Fusion HAT USR button device """ try: # 查找名称包含"Fusion HAT USR"的输入设备 devices = [InputDevice(path) for path in evdev.list_devices()] for device in devices: if "Fusion HAT USR" in device.name: self._device = device self._device_path = device.path break except Exception: # 如果找不到设备,后续通过get_state时会尝试再次查找 pass
[Doku] def get_state(self) -> bool: """ Get the state of the user button Returns: bool: True if pressed, False if released """ # 如果设备未找到,尝试再次查找 if not self._device and not self._device_path: self._find_button_device() # 如果仍然没有设备,返回默认状态(未按下) if not self._device and not self._device_path: return False try: # 如果设备已经初始化,直接返回当前状态 if self._is_task_running: return self.pressed # 否则尝试通过sysfs文件直接读取状态 # 通常在/sys/class/input下可以找到对应的event设备 # 这里我们尝试读取sysfs接口(如果可用) # 作为备选方案,我们使用轮询检查最新状态 # 临时打开设备并读取最新事件 if self._device_path: temp_device = InputDevice(self._device_path) # 非阻塞方式检查是否有新事件 for event in temp_device.read_loop(): if event.type == ecodes.EV_KEY and event.code == ecodes.BTN_0: return event.value == 1 # 只读取少量事件就退出 break temp_device.close() return self.pressed except Exception: return False
[Doku] def is_pressed(self) -> bool: """ Check if the user button is pressed Returns: bool: True if pressed, False if released """ if self._is_task_running: return self.pressed return self.get_state()
[Doku] def get_pressed_for(self) -> float: """ Get the time the user button has been pressed for Returns: float: time in seconds """ if self.is_pressed(): return time.time() - self.pressed_at return self.pressed_for
[Doku] def _setup_event_listener(self) -> None: """ 设置事件监听器,在独立线程中处理输入事件 """ def event_handler(): while True: try: # 确保设备已打开 if not self._device and self._device_path: self._device = InputDevice(self._device_path) # 读取输入事件 - 这是一个阻塞调用,直到有事件发生 for event in self._device.read_loop(): # 处理按键事件 if event.type == ecodes.EV_KEY and event.code == ecodes.BTN_0: pressed = event.value == 1 if pressed: if not self.pressed: self.pressed = True self.pressed_at = time.time() # 触发按下回调 if self.__on_press__ is not None: try: self.__on_press__() except Exception: pass # 触发press_released回调(按下状态) if self.__on_press_released__ is not None: try: self.__on_press_released__(True) except Exception: pass else: if self.pressed: self.pressed = False self.pressed_for = time.time() - self.pressed_at # 触发释放回调 if self.__on_release__ is not None: try: self.__on_release__() except Exception: pass # 触发press_released回调(释放状态) if self.__on_press_released__ is not None: try: self.__on_press_released__(False) except Exception: pass # 触发点击回调 if self.__on_click__ is not None: try: self.__on_click__() except Exception: pass except Exception: # 如果发生错误,关闭设备并尝试重新打开 if self._device: try: self._device.close() except Exception: pass self._device = None # 短暂等待后重试 time.sleep(0.1) # 在守护线程中启动事件处理器 listener_thread = threading.Thread(target=event_handler, daemon=True) listener_thread.start()
[Doku] def start(self) -> None: """ 此方法已弃用,不再需要调用 由于使用了Linux输入事件系统,按钮事件会自动被监听和处理,无需手动启动轮询循环。 """ warnings.warn( "UserButton.start() 方法已弃用。由于使用了Linux输入事件系统,按钮事件会自动被监听和处理。", DeprecationWarning, stacklevel=2 )
[Doku] def stop(self) -> None: """ 关闭按钮设备连接 """ # 关闭设备 if self._device: try: self._device.close() except Exception: pass self._device = None