#!/usr/bin/env python3
import os
import time
from typing import Callable, Any
[Doku]
def retry(times: int = 5):
""" Retry decorator retry specified times if any error occurs
Args:
times (int, optional): number of times to retry. Defaults to 5.
Returns:
function: wrapper function
"""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
def wrapper(*arg, **kwargs):
for _ in range(times):
try:
return func(*arg, **kwargs)
except OSError:
continue
else:
return False
return wrapper
return decorator
[Doku]
def command_exists(cmd: str) -> bool:
""" Check if command exists
Args:
cmd (str): command to check
Returns:
bool: True if exists
"""
import subprocess
try:
subprocess.check_output(['which', cmd], stderr=subprocess.STDOUT)
return True
except subprocess.CalledProcessError:
return False
[Doku]
def run_command(cmd: str, timeout: float | None = None) -> tuple:
""" Run command and return status and output
Args:
cmd (str): command to run
timeout (float | None): timeout in seconds, None for no timeout
Returns:
tuple: status, output. On timeout, returns (124, "timed out after Ns")
"""
import subprocess
if timeout is not None:
try:
r = subprocess.run(
cmd, shell=True, capture_output=True, timeout=timeout)
return r.returncode, r.stdout.decode('utf-8', errors='replace')
except subprocess.TimeoutExpired:
return 124, f"timed out after {timeout}s"
p = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
result = p.stdout.read().decode('utf-8')
status = p.poll()
return status, result
[Doku]
def is_installed(cmd: str) -> bool:
""" Check if command is installed
Args:
cmd (str): command to check
Returns:
bool: True if installed
"""
status, _ = run_command(f"which {cmd}")
if status in [0, 127]:
return True
else:
return False
[Doku]
def mapping(x: float, in_min: float, in_max: float, out_min: float, out_max: float) -> float:
""" Map value from one range to another range
Args:
x (float): value to map
in_min (float): input minimum
in_max (float): input maximum
out_min (float): output minimum
out_max (float): output maximum
Returns:
float: mapped value
"""
return (x - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
[Doku]
def get_ip(ifaces: list=['wlan0', 'eth0']) -> str:
""" Get IP address
Args:
ifaces (list, optional): interfaces to check, defaults to ['wlan0', 'eth0']
Returns:
str/False: IP address or False if not found
"""
import re
if isinstance(ifaces, str):
ifaces = [ifaces]
for iface in list(ifaces):
search_str = 'ip addr show {}'.format(iface)
result = os.popen(search_str).read()
com = re.compile(r'(?<=inet )(.*)(?=\/)', re.M)
ipv4 = re.search(com, result)
if ipv4:
ipv4 = ipv4.groups()[0]
return ipv4
return False
[Doku]
def get_username() -> str:
""" Get username
Returns:
str: username
"""
return os.popen('echo ${SUDO_USER:-$LOGNAME}').readline().strip()
[Doku]
def constrain(value: float, min_value: float, max_value: float) -> float:
""" Constrain value to a range
Args:
value (float): value to constrain
min_value (float): minimum value
max_value (float): maximum value
Returns:
float: constrained value
"""
return min(max(value, min_value), max_value)
class LazyReader():
""" Lazy reader
Read something in a given interval,
even if you read it multiple times in a short time.
For those who don't need to read it too frequently.
"""
def __init__(self, read_function: Callable, interval: int=10) -> None:
""" Initialize the lazy reader.
Args:
read_function (Callable): The function to read.
interval (int, optional): The interval to read. Defaults to 10.
"""
self.read_function = read_function
self.interval = interval
self.value = None
self.last_read_time = 0
def read(self) -> Any:
""" Read the value.
Returns:
Any: The value.
"""
if time.time() - self.last_read_time > self.interval:
self.value = self.read_function()
self.last_read_time = time.time()
return self.value
__all__ = [
'retry',
'command_exists',
'run_command',
'is_installed',
'mapping',
'get_ip',
'get_username',
'constrain',
]