Quellcode für fusion_hat.modules.ultrasonic

import time
from ..pin import Pin
import threading

[Doku] class Ultrasonic(): """ Ultrasonic sensor module. Args: trig (Pin): Trigger pin object. echo (Pin): Echo pin object. timeout (float, optional): Timeout duration in seconds. Default is 0.02. Raises: TypeError: If trig or echo is not a Pin object. """ SOUND_SPEED = 343.3 # ms """Sound speed in meters per second.""" def __init__(self, trig, echo, timeout=0.02): """ Initialize Ultrasonic sensor module. :param trig: Trigger pin object. :type trig: fusion_hat.Pin :param echo: Echo pin object. :type echo: fusion_hat.Pin :param timeout: Timeout duration in seconds. Default is 0.02. :type timeout: float """ if not isinstance(trig, Pin): raise TypeError("trig must be fusion_hat.Pin object") if not isinstance(echo, Pin): raise TypeError("echo must be fusion_hat.Pin object") self.timeout = timeout trig.close() echo.close() self.trig = Pin(trig._pin_num) self.echo = Pin(echo._pin_num, mode=Pin.IN, pull=Pin.PULL_DOWN) self.thread_read_interval = 0.02 self.thread = None self.thread_started = False self.thread_value = -1
[Doku] def read_raw(self): """ Read raw distance value from ultrasonic sensor. Returns: float: Distance in centimeters. Returns -1 if timeout occurs, -2 if pulse start or end is 0, or any other error. """ self.trig.off() time.sleep(0.001) self.trig.on() time.sleep(0.00001) self.trig.off() pulse_end = 0 pulse_start = 0 timeout_start = time.time() while self.echo.value() == 0: pulse_start = time.time() if pulse_start - timeout_start > self.timeout: return -1 while self.echo.value() == 1: pulse_end = time.time() if pulse_end - timeout_start > self.timeout: return -1 if pulse_start == 0 or pulse_end == 0: return -2 during = pulse_end - pulse_start cm = round(during * self.SOUND_SPEED / 2 * 100, 2) return cm
[Doku] def read_with_retry(self, times=10): """ Read distance value with retry mechanism. Args: times (int, optional): Number of retry attempts. Default is 10. Returns: float: Distance in centimeters. Returns -1 if all attempts fail. """ for _ in range(times): value = self.read_raw() if value > 0: return value return -1
[Doku] def read(self): """ Read distance value. Returns: float: Distance in centimeters. Returns -1 if thread is running, otherwise returns the last read value. """ if self.thread is not None and self.thread_started: return self.thread_value else: return self.read_with_retry()
[Doku] def thread_read_loop(self): """ Thread loop for reading distance value periodically. """ while self.thread_started: self.thread_value = self.read_with_retry() time.sleep(self.thread_read_interval)
[Doku] def start_thread(self, interval=0.01): """ Start the thread for reading distance value periodically. Args: interval (float, optional): Interval duration in seconds. Default is 0.01. """ if self.thread is None: self.thread_started = True self.thread_read_interval = interval self.thread = threading.Thread(target=self.thread_read_loop, daemon=True) self.thread.start()
[Doku] def stop_thread(self): """ Stop the thread for reading distance value. """ self.thread_started = False if self.thread is not None: self.thread.join() self.thread = None