""" Fusion Hat device related functions
Example:
Import device module
>>> from fusion_hat import device
Enable speaker
>>> device.enable_speaker()
>>> device.get_speaker_state()
True
Disable speaker
>>> device.disable_speaker()
>>> device.get_speaker_state()
False
Get user button state
>>> device.get_usr_btn()
False
Get shutdown request state
>>> device.get_shutdown_request()
0
Toggle user LED
>>> device.set_user_led(True)
>>> device.set_user_led(False)
>>> device.set_user_led(1)
>>> device.set_user_led(0)
Get firmware version
>>> device.get_firmware_version()
'1.1.4'
Set volume
>>> device.set_volume(50)
Get battery voltage
>>> device.get_battery_voltage()
8.4
"""
__all__ = [
'NAME',
'ID',
'UUID',
'PRODUCT_ID',
'PRODUCT_VER',
'VENDOR',
'DTOVERLAY_NAME',
'is_detected',
'is_driver_loaded',
'doctor',
'is_installed',
'is_connected',
'enable_speaker',
'disable_speaker',
'get_speaker_state',
'get_usr_btn',
'set_led',
'get_firmware_version',
'set_volume',
]
import os
from typing import Callable, Any
NAME = "Fusion Hat"
""" Name of the board """
ID = "fusion_hat"
""" ID of the board """
UUID = "9daeea78-0000-0774-000a-582369ac3e02"
""" UUID of the board """
PRODUCT_ID = 0x0774
""" Product ID of the board """
PRODUCT_VER = 0x000a
""" Product version of the board """
VENDOR = "SunFounder"
""" Vendor of the board """
DEVICE_PATH = "/sys/class/fusion_hat/fusion_hat/"
DTOVERLAY_NAME = "sunfounder-fusionhat"
[ドキュメント]
def is_detected() -> bool:
"""Check if the driver sysfs interface exists (driver loaded).
Returns:
bool: True if /sys/class/fusion_hat/ exists
"""
return is_driver_loaded()
[ドキュメント]
def is_driver_loaded() -> bool:
""" Check if Fusion Hat driver is loaded
This function checks if the Fusion Hat driver module is loaded by
verifying the existence of /sys/class/fusion_hat/ directory.
Returns:
bool: True if driver is loaded, False otherwise
"""
BASE_PATH = "/sys/class/fusion_hat/"
return os.path.exists(BASE_PATH)
[ドキュメント]
def is_installed() -> bool:
""" Check if a Fusion Hat board is installed
.. deprecated::
Use :func:`is_detected` instead. This function will be removed in a future version.
Returns:
bool: True if installed, False otherwise
"""
import warnings
warnings.warn(
"is_installed is deprecated, use is_detected instead.",
DeprecationWarning,
stacklevel=2
)
return is_detected()
[ドキュメント]
def is_connected():
""" Check if Fusion HAT is connected
.. deprecated::
Use :func:`is_driver_loaded` instead. This function will be removed in a future version.
Returns:
bool: True if connected
"""
import warnings
warnings.warn(
"is_connected is deprecated, use is_driver_loaded instead.",
DeprecationWarning,
stacklevel=2
)
return is_driver_loaded()
def raise_if_fusion_hat_not_ready() -> bool:
""" Check if Fusion HAT is ready
Checks whether the sysfs interface exists (driver loaded and working).
If not, prompts the user to run ``fusion_hat doctor`` to diagnose and fix.
Returns:
bool: True if ready
"""
if not is_driver_loaded():
raise IOError(
"Fusion Hat driver not loaded (sysfs interface missing). "
"Run 'fusion_hat doctor' to diagnose and fix."
)
[ドキュメント]
def _find_config_txt() -> str:
"""Locate the active Raspberry Pi config.txt file.
Returns:
str: path to config.txt (may not exist)
"""
candidates = [
"/boot/firmware/config.txt",
"/boot/config.txt",
]
for p in candidates:
if os.path.isfile(p):
return p
return candidates[0] # default for Bookworm
[ドキュメント]
def _has_dtoverlay() -> bool:
"""Check if dtoverlay=sunfounder-fusionhat is already in config.txt.
Returns:
bool: True if the uncommented dtoverlay line exists
"""
config = _find_config_txt()
if not os.path.isfile(config):
return False
try:
with open(config, "r") as f:
for line in f:
stripped = line.strip()
if f"dtoverlay={DTOVERLAY_NAME}" in stripped and not stripped.startswith("#"):
return True
except Exception:
pass
return False
[ドキュメント]
def _add_dtoverlay() -> bool:
"""Ensure dtoverlay=sunfounder-fusionhat is active in config.txt.
- Already active → no-op
- Commented out → uncomment
- Not present → append
Returns:
bool: True if the line is active after the call
"""
from ._utils import run_command
config = _find_config_txt()
if not os.path.isfile(config):
return False
if _has_dtoverlay():
return True
try:
line = f"dtoverlay={DTOVERLAY_NAME}"
# Check for commented-out line and uncomment it
_, has_commented = run_command(
f"grep -q '^# *{line}' {config} 2>/dev/null && echo yes || echo no",
timeout=5,
)
if has_commented.strip() == "yes":
run_command(
f"sudo sed -i 's/^# *{line}.*/{line}/' {config}",
timeout=10,
)
else:
run_command(
f"echo '{line}' | sudo tee -a {config} > /dev/null 2>&1",
timeout=10,
)
return _has_dtoverlay()
except Exception:
return False
[ドキュメント]
def _remove_dtoverlay() -> bool:
"""Remove dtoverlay=sunfounder-fusionhat from config.txt.
Uses sudo sed since the file requires root.
Returns:
bool: True if the line was removed or not present
"""
from ._utils import run_command
config = _find_config_txt()
if not os.path.isfile(config):
return False
if not _has_dtoverlay():
return True
try:
run_command(
f"sudo sed -i '/^dtoverlay={DTOVERLAY_NAME}/d' {config}",
timeout=10,
)
return not _has_dtoverlay()
except Exception:
return False
I2C_SCAN_TIMEOUT = 5 # seconds timeout for i2cdetect
# ── doctor helpers ───────────────────────────────────────────────────────────
GREEN = "\033[32m"
RED = "\033[31m"
CYAN = "\033[36m"
YELLOW = "\033[33m"
BOLD = "\033[1m"
RESET = "\033[0m"
def _icon(ok: bool) -> str:
return f"{GREEN}✓{RESET}" if ok else f"{RED}✗{RESET}"
[ドキュメント]
def _print_check(name: str, ok: bool, detail: str = "", indent: int = 2):
"""Print a single check result inline, clearing previous spinner."""
import sys
pad = " " * indent
d = f" ({detail})" if detail else ""
sys.stdout.write(f"\r{pad}{_icon(ok)} {name}{d}\033[K\n")
sys.stdout.flush()
def _print_section(title: str):
print(f"\n {BOLD}{title}{RESET}")
print(f" {'─' * 40}")
# ── driver checks ────────────────────────────────────────────────────────────
def _check_sysfs() -> tuple:
ok = os.path.exists("/sys/class/fusion_hat/")
return ok, "" if ok else "/sys/class/fusion_hat not found"
def _check_module_loaded() -> tuple:
ok = os.path.exists("/sys/module/fusion_hat")
return ok, "" if ok else "module not loaded"
def _check_i2c_mcu() -> tuple:
from ._utils import run_command
try:
_, out = run_command("sudo i2cdetect -y 1 0x10 0x1f 2>/dev/null",
timeout=I2C_SCAN_TIMEOUT)
for line in out.strip().split("\n"):
if line.startswith("10:"):
entries = line[3:].strip().split()
if len(entries) > 7 and entries[7] in ("17", "UU"):
return True, ""
return False, "MCU not responding at 0x17"
except Exception:
return False, "i2cdetect failed"
def _check_dtoverlay_driver() -> tuple:
ok = _has_dtoverlay()
return ok, "" if ok else f"dtoverlay={DTOVERLAY_NAME} not in config.txt"
def _check_module_file() -> tuple:
import platform
kv = platform.uname().release
ko_paths = [
f"/lib/modules/{kv}/extra/fusion_hat.ko",
f"/lib/modules/{kv}/updates/fusion_hat.ko",
f"/lib/modules/{kv}/extra/fusion_hat.ko.xz",
f"/lib/modules/{kv}/updates/fusion_hat.ko.xz",
f"/lib/modules/{kv}/updates/dkms/fusion_hat.ko.xz",
]
ok = any(os.path.exists(p) for p in ko_paths)
return ok, "" if ok else "fusion_hat.ko not installed"
# ── audio checks ─────────────────────────────────────────────────────────────
AUDIO_CARD_NAME = "sndrpigooglevoi"
# PulseAudio uses the full DT name (up to 31 chars), while aplay -l
# truncates it. Both variants identify the Fusion Hat sound card.
AUDIO_CARD_NAMES = (AUDIO_CARD_NAME, "snd_rpi_googlevoicehat_soundcar")
[ドキュメント]
def _check_sound_card() -> tuple:
"""Check Fusion HAT sound card (speaker) via ALSA."""
from ._utils import run_command
_, out = run_command("aplay -l 2>/dev/null")
if AUDIO_CARD_NAME in out:
return True, ""
return False, "sound card not found"
[ドキュメント]
def _check_capture_device() -> tuple:
"""Check Fusion HAT mic via ALSA."""
from ._utils import run_command
_, out = run_command("arecord -l 2>/dev/null")
if AUDIO_CARD_NAME in out:
return True, ""
return False, "capture device not found"
# ── I2S clock health check ───────────────────────────────────────────────────
PCM_CLK_PATH = "/sys/kernel/debug/clk/clk_summary"
[ドキュメント]
def _check_asound_conf() -> tuple:
"""Check that /etc/asound.conf routes to the Fusion Hat card."""
asound_path = "/etc/asound.conf"
if not os.path.isfile(asound_path):
return False, "/etc/asound.conf not found"
try:
with open(asound_path, "r") as f:
content = f.read()
if AUDIO_CARD_NAME in content:
return True, ""
return False, f"{AUDIO_CARD_NAME} not referenced in asound.conf"
except Exception:
return False, "cannot read /etc/asound.conf"
[ドキュメント]
def _check_pa_default_sink() -> tuple:
"""Check that PulseAudio default sink is the Fusion Hat card."""
import subprocess
import pwd
# Find a non-root user
username = None
uid = None
for user in pwd.getpwall():
if user.pw_uid >= 1000 and user.pw_uid < 65534:
username = user.pw_name
uid = user.pw_uid
break
if username is None:
return True, "no user session — skipped"
env = {
"XDG_RUNTIME_DIR": f"/run/user/{uid}",
"DBUS_SESSION_BUS_ADDRESS": f"unix:path=/run/user/{uid}/bus",
}
try:
result = subprocess.run(
["sudo", "-u", username, "env",
f"XDG_RUNTIME_DIR=/run/user/{uid}",
f"DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/{uid}/bus",
"pactl", "info"],
capture_output=True, text=True, timeout=5,
)
if result.returncode != 0:
return True, "pulseaudio not running — skipped"
# Find default sink name
default_sink = None
for line in result.stdout.split("\n"):
if line.startswith("Default Sink:"):
default_sink = line.split(":", 1)[1].strip()
break
if not default_sink:
return True, "no default sink — skipped"
# Check if it belongs to Fusion Hat
result2 = subprocess.run(
["sudo", "-u", username, "env",
f"XDG_RUNTIME_DIR=/run/user/{uid}",
f"DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/{uid}/bus",
"pactl", "-f", "json", "list", "sinks"],
capture_output=True, text=True, timeout=5,
)
import json
sinks = json.loads(result2.stdout)
for s in sinks:
if s.get("name") == default_sink:
card = s.get("properties", {}).get("alsa.card_name", "")
for name_variant in AUDIO_CARD_NAMES:
if name_variant in card:
return True, ""
return False, f"default sink is '{card}' not Fusion Hat"
return False, "default sink not found in sink list"
except FileNotFoundError:
return True, "pactl not available — skipped"
except Exception:
return True, "pulseaudio check failed — skipped"
[ドキュメント]
def _check_alsa_volume() -> tuple:
"""Check that ALSA speaker volume is not zero or muted."""
from ._utils import run_command
# Try both possible control names
for ctrl in ("Fusion Hat", "Fusion Hat Playback Volume", "Playback", "Master"):
_, out = run_command(
f"amixer -c {AUDIO_CARD_NAME} sget '{ctrl}' 2>/dev/null",
timeout=5,
)
if out and "Playback" in out:
# Check if volume is > 0 and not muted
if "[0%]" in out or "[off]" in out or "[0dB]" in out:
return False, f"speaker volume is 0% or muted (control: {ctrl})"
return True, ""
return True, "volume control not found — skipped"
[ドキュメント]
def _read_pcm_enable_count() -> int:
"""Read the current PCM clock enable count from debugfs.
Returns:
int: enable count (first numeric column for 'pcm' row),
or -1 if the clock summary is not accessible.
"""
try:
if not os.path.isfile(PCM_CLK_PATH):
return -1
with open(PCM_CLK_PATH, "r") as f:
for line in f:
if " pcm " in line or line.strip().startswith("pcm "):
parts = line.strip().split()
# Format: name enable prepare protect rate ...
# The pcm line is indented, first numeric col is enable count
for p in parts:
if p.isdigit():
return int(p)
return -1
except Exception:
return -1
[ドキュメント]
def _check_i2s_clock() -> tuple:
"""Check if the I2S PCM clock actually starts during playback.
Plays a brief silent tone via ALSA and verifies that the PCM clock's
enable count increases. A stuck count (especially 0 both before and
during playback) means the I2S peripheral is not responding.
Returns:
(bool, str): True if the clock started, False with a diagnostic
message otherwise.
"""
from ._utils import run_command
# Only run if the sound card exists — otherwise skip with an ok result
# because the missing-card check already catches that case.
_, aplay_out = run_command("aplay -l 2>/dev/null")
if AUDIO_CARD_NAME not in aplay_out:
return True, "sound card not available — skipped"
# Get card index for hw:X,0
card_index = None
for line in aplay_out.split("\n"):
if AUDIO_CARD_NAME in line:
parts = line.split()
if parts:
idx = parts[1].rstrip(":")
if idx.isdigit():
card_index = idx
break
if card_index is None:
return True, "cannot determine card index — skipped"
before = _read_pcm_enable_count()
if before < 0:
# debugfs not available — skip the check
return True, "debugfs not mounted — skipped"
# Play a very short silent audio to trigger hw_params -> clock enable.
# We use sox to generate 0.1s of near-silence and aplay to push it.
import tempfile
import subprocess
try:
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
wav_path = tmp.name
# Generate 0.5s tone at 48 kHz / S32_LE / stereo (long enough to
# reliably catch the PCM clock enable before aplay exits).
subprocess.run(
["sox", "-n", "-r", "48000", "-b", "32", "-c", "2",
wav_path, "synth", "0.5", "sin", "440"],
capture_output=True, timeout=5
)
if not os.path.isfile(wav_path):
return True, "sox not available — skipped"
proc = subprocess.Popen(
["aplay", "-D", f"hw:{card_index},0", wav_path],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
import time
time.sleep(0.2) # give hw_params a moment to set up the clock
during = _read_pcm_enable_count()
proc.terminate()
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
proc.kill()
finally:
# Clean up temp file
try:
os.unlink(wav_path)
except OSError:
pass
if during > before:
return True, ""
elif during == 0 and before == 0:
return False, "I2S clock stuck — PCM enable count stayed 0 during playback"
else:
return False, f"I2S clock anomaly — before={before} during={during}"
except FileNotFoundError:
return True, "sox/aplay not available — skipped"
except Exception as e:
return True, f"check failed: {e} — skipped"
[ドキュメント]
def _get_fusion_hat_pa_sink() -> str:
"""Find the PulseAudio sink name for the Fusion Hat sound card.
Returns:
str: sink name like ``alsa_output.platform-soc_sound.stereo-fallback``,
or empty string if not found.
"""
import subprocess
import pwd
# Find a non-root user to run pactl as
username = None
uid = None
for user in pwd.getpwall():
if user.pw_uid >= 1000 and user.pw_uid < 65534:
username = user.pw_name
uid = user.pw_uid
break
if username is None:
return ""
try:
result = subprocess.run(
["sudo", "-u", username, "env",
f"XDG_RUNTIME_DIR=/run/user/{uid}",
f"DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/{uid}/bus",
"pactl", "-f", "json", "list", "sinks"],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
return ""
import json
sinks = json.loads(result.stdout)
for s in sinks:
props = s.get("properties", {})
card_name = props.get("alsa.card_name", "")
if any(v in card_name for v in AUDIO_CARD_NAMES):
return s.get("name", "")
except Exception:
pass
return ""
[ドキュメント]
def _set_pa_default_sink(sink_name: str) -> bool:
"""Set the PulseAudio default sink.
Runs as the first non-root user (typically 'pi') via sudo.
"""
import subprocess
import pwd
# Find a non-root user to run pactl as
username = None
uid = None
for user in pwd.getpwall():
if user.pw_uid >= 1000 and user.pw_uid < 65534:
username = user.pw_name
uid = user.pw_uid
break
if username is None:
return False
try:
subprocess.run(
["sudo", "-u", username, "env",
f"XDG_RUNTIME_DIR=/run/user/{uid}",
f"DBUS_SESSION_BUS_ADDRESS=unix:path=/run/user/{uid}/bus",
"pactl", "set-default-sink", sink_name],
capture_output=True, timeout=5,
)
return True
except Exception:
return False
[ドキュメント]
def _fix_i2s_stuck() -> bool:
"""Attempt to fix a stuck I2S peripheral.
Two-step fix for the "BCLK + LRCLK stuck HIGH / speaker hot" issue:
1. **PulseAudio routing** — Ensure the default sink is the Fusion Hat
card, not the built-in headphone jack. When Pipewire/PulseAudio
routes audio to the wrong card, the I2S clock never starts.
2. **Trigger hw_params** — Play a brief tone via ``play`` (sox) to
force the ALSA ``hw_params`` call which configures the I2S
registers correctly.
Returns:
bool: True if PCM clock enable count increased after the fix.
"""
import subprocess
import time
# ── Step 1: fix PulseAudio default sink ──────────────────────────
sink = _get_fusion_hat_pa_sink()
if sink:
_set_pa_default_sink(sink)
# ── Step 2: enable speaker + trigger hw_params ───────────────────
speaker_path = os.path.join(DEVICE_PATH, "speaker")
if os.path.exists(speaker_path):
try:
with open(speaker_path, "w") as f:
f.write("1")
except Exception:
pass
# Play a tone long enough to be caught by the clock check
before = _read_pcm_enable_count()
try:
proc = subprocess.Popen(
["play", "-n", "synth", "0.5", "sin", "440"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
time.sleep(0.6)
during = _read_pcm_enable_count()
proc.terminate()
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
proc.kill()
return during > before
except FileNotFoundError:
pass
except Exception:
pass
# Fallback: direct hw access via aplay
try:
result = subprocess.run(
["aplay", "-l"], capture_output=True, text=True, timeout=5
)
card_index = None
for line in (result.stdout + result.stderr).split("\n"):
if AUDIO_CARD_NAME in line:
parts = line.split()
if len(parts) > 1:
idx = parts[1].rstrip(":")
if idx.isdigit():
card_index = idx
break
if card_index is None:
return False
import tempfile
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
wav_path = tmp.name
subprocess.run(
["sox", "-n", "-r", "48000", "-b", "32", "-c", "2",
wav_path, "synth", "0.5", "sin", "440"],
capture_output=True, timeout=5
)
if not os.path.isfile(wav_path):
return False
before2 = _read_pcm_enable_count()
proc = subprocess.Popen(
["aplay", "-D", f"hw:{card_index},0", wav_path],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
time.sleep(0.6)
during2 = _read_pcm_enable_count()
proc.terminate()
try:
proc.wait(timeout=2)
except subprocess.TimeoutExpired:
proc.kill()
finally:
try:
os.unlink(wav_path)
except OSError:
pass
return during2 > before2
except Exception:
return False
# TODO: re-enable when confirmed needed
# def _check_audio_modules() -> tuple:
# """Check if WM8960 sound modules are loaded."""
# from ._utils import run_command
# _, out = run_command("lsmod 2>/dev/null")
# missing = []
# for mod in ["snd_soc_wm8960", "snd_soc_rpi_googlevoicehat"]:
# if mod not in out:
# missing.append(mod)
# if missing:
# return False, "missing: " + ", ".join(missing)
# return True, ""
[ドキュメント]
def doctor(fix_mode: bool = False) -> dict:
"""Live hardware health check — prints results as each check runs.
Sections:
Driver — sysfs, module, I2C MCU, dtoverlay, module file
Audio — sound card, capture device, I2S clock health
Args:
fix_mode: If True, summary messages adapt for ``--fix`` mode
(don't suggest running ``--fix`` again).
Returns:
dict with keys: overall, driver_ok, audio_ok, results (per-check dict)
"""
import sys
from ._utils import run_command
results = {}
driver_ok = True
audio_ok = True
print("")
print("=" * 50)
print(" Fusion Hat Doctor")
print("=" * 50)
# ── Driver ──
_print_section("Driver")
checks = [
("sysfs interface", _check_sysfs),
("kernel module loaded", _check_module_loaded),
("I2C MCU (0x17)", _check_i2c_mcu),
("dtoverlay in config.txt", _check_dtoverlay_driver),
("kernel module file", _check_module_file),
]
for name, func in checks:
sys.stdout.write(f" ... {name}\r")
sys.stdout.flush()
ok, detail = func()
results[name] = ok
if not ok:
driver_ok = False
_print_check(name, ok, detail)
results["driver_ok"] = driver_ok
# ── Audio ──
_print_section("Audio")
audio_checks = [
("sound card (speaker)", _check_sound_card),
("capture device (mic)", _check_capture_device),
("asound.conf", _check_asound_conf),
("PulseAudio default sink", _check_pa_default_sink),
("ALSA speaker volume", _check_alsa_volume),
("I2S clock (PCM)", _check_i2s_clock),
]
for name, func in audio_checks:
sys.stdout.write(f" ... {name}\r")
sys.stdout.flush()
ok, detail = func()
results[name] = ok
if not ok:
audio_ok = False
_print_check(name, ok, detail)
i2s_ok = results.get("I2S clock (PCM)", True)
if not i2s_ok:
audio_ok = False
results["audio_ok"] = audio_ok
# ── Summary ──
dtoverlay_ok = results.get("dtoverlay in config.txt", False)
sysfs_ok = results.get("sysfs interface", False)
overall = driver_ok and audio_ok
results["overall"] = overall
print("")
if not dtoverlay_ok:
print(f" {YELLOW}dtoverlay not configured{RESET}")
_print_fix_hint(fix_mode)
elif not sysfs_ok:
print(f" {YELLOW}dtoverlay configured but reboot needed{RESET}")
_print_fix_hint(fix_mode)
else:
if driver_ok and audio_ok:
print(f" {GREEN}All checks passed.{RESET}")
else:
if not driver_ok:
print(f" {YELLOW}Driver issues found.{RESET}")
if not audio_ok:
print(f" {YELLOW}Audio issues found.{RESET}")
_print_fix_hint(fix_mode)
print("")
print("=" * 50)
print("")
return results
[ドキュメント]
def _print_fix_hint(fix_mode: bool = False):
"""Print a hint to run ``--fix``, or skip if already in fix mode."""
if not fix_mode:
print(f" → Run: {BOLD}fusion_hat doctor --fix{RESET}")
[ドキュメント]
def _find_driver_src() -> str:
"""Find the Fusion Hat driver source directory."""
import platform
candidates = []
try:
import fusion_hat
pkg_dir = os.path.dirname(fusion_hat.__file__)
repo = os.path.dirname(pkg_dir)
candidates.append(os.path.join(repo, "driver"))
except Exception:
pass
candidates += [
"/home/pi/fusion-hat/driver",
os.path.expanduser("~/fusion-hat/driver"),
]
try:
kv = platform.uname().release
candidates.append(f"/usr/src/fusion_hat-{kv}")
except Exception:
pass
for p in candidates:
if os.path.isdir(p) and os.path.isfile(os.path.join(p, "Makefile")):
return os.path.realpath(p)
return ""
def doctor_fix() -> dict:
"""Run doctor and attempt to fix driver issues found.
Handles: I2C enable, driver install, dtoverlay, modprobe.
Returns dict with before, fixes, after, reboot.
"""
from ._utils import run_command
import time
before = doctor(fix_mode=True)
fixes = []
reboot = False
# ── audio auto-fix: PulseAudio default sink wrong ────────────────────
if before.get("PulseAudio default sink") is False:
print(f"\n {YELLOW}PulseAudio default sink is wrong — fixing...{RESET}")
sink = _get_fusion_hat_pa_sink()
if sink and _set_pa_default_sink(sink):
fixes.append(f"set PA default sink to {sink}")
time.sleep(0.3)
pa_after, _ = _check_pa_default_sink()
if pa_after:
print(f" {GREEN}PulseAudio default sink fixed{RESET}")
else:
print(f" {YELLOW}PA sink still wrong after retry{RESET}")
else:
fixes.append("failed to set PA default sink")
# ── audio auto-fix: I2S clock stuck ──────────────────────────────────
if before.get("I2S clock (PCM)") is False:
print(f"\n {YELLOW}I2S clock stuck — trying auto-fix (trigger hw_params)...{RESET}")
if _fix_i2s_stuck():
fixes.append("triggered I2S hw_params via playback")
time.sleep(0.3)
after_i2s = _check_i2s_clock()
if after_i2s[0]:
print(f" {GREEN}I2S clock recovered{RESET}")
else:
print(f" {YELLOW}I2S still stuck after retry — try 'fusion_hat speaker setup'{RESET}")
else:
fixes.append("failed to trigger I2S hw_params")
if before["overall"]:
return {"before": before, "fixes": fixes, "after": before, "fixed": True, "reboot": False}
# I2C not enabled
if not os.path.exists("/dev/i2c-1"):
fixes.append("enable I2C")
run_command("sudo raspi-config nonint do_i2c 0 2>/dev/null")
run_command("sudo modprobe i2c-dev 2>/dev/null")
# Module file missing
if not before.get("kernel module file", True):
driver_dir = _find_driver_src()
if driver_dir:
fixes.append(f"install driver from {driver_dir}")
run_command(f"cd {driver_dir} && sudo make modules_install 2>/dev/null")
run_command("sudo depmod -a 2>/dev/null")
else:
fixes.append("driver source not found — cannot auto-install")
dtoverlay_ok = before.get("dtoverlay in config.txt", False)
sysfs_ok = before.get("sysfs interface", False)
module_loaded = before.get("kernel module loaded", False)
# dtoverlay
if not dtoverlay_ok:
if _add_dtoverlay():
fixes.append(f"dtoverlay={DTOVERLAY_NAME} added to config.txt")
reboot = True
else:
fixes.append("failed to add dtoverlay to config.txt")
# dtoverlay configured but sysfs not working → reboot needed
if dtoverlay_ok and not sysfs_ok:
fixes.append("dtoverlay configured, reboot required to activate")
reboot = True
else:
# Module not loaded
if not module_loaded:
fixes.append("modprobe fusion_hat")
run_command("sudo modprobe fusion_hat 2>/dev/null")
if not os.path.exists("/sys/module/fusion_hat"):
reboot = True
# Module loaded but sysfs missing
if module_loaded and not sysfs_ok:
fixes.append("reload fusion_hat module")
run_command("sudo rmmod fusion_hat 2>/dev/null")
run_command("sudo modprobe fusion_hat 2>/dev/null")
print(f"\n --- Fixes ---")
for action in fixes:
print(f" → {action}")
# Quick re-check after fixes
after = {
"sysfs interface": os.path.exists("/sys/class/fusion_hat/"),
"kernel module loaded": os.path.exists("/sys/module/fusion_hat"),
}
# Re-check I2C
try:
_, out = run_command("sudo i2cdetect -y 1 0x10 0x1f 2>/dev/null",
timeout=I2C_SCAN_TIMEOUT)
i2c_ok = False
for line in out.strip().split("\n"):
if line.startswith("10:"):
entries = line[3:].strip().split()
if len(entries) > 7 and entries[7] in ("17", "UU"):
i2c_ok = True
break
after["I2C MCU (0x17)"] = i2c_ok
except Exception:
after["I2C MCU (0x17)"] = False
after["dtoverlay in config.txt"] = _has_dtoverlay()
after["overall"] = all(after.values())
if reboot and not after["overall"]:
return {"before": before, "fixes": fixes, "after": after, "fixed": False, "reboot": True}
return {"before": before, "fixes": fixes, "after": after, "fixed": after["overall"], "reboot": reboot}
def force_dt_overlay() -> bool:
"""Force-add dtoverlay=sunfounder-fusionhat to config.txt.
This is the primary way to configure the Fusion HAT device-tree overlay.
Required for the kernel driver to load and detect the HAT.
Returns:
bool: True if the line was added or already present
"""
from ._utils import run_command
if _has_dtoverlay():
print("dtoverlay=sunfounder-fusionhat is already in config.txt.")
return True
if not _add_dtoverlay():
print("Failed to write config.txt. Check permissions.")
return False
print("Added dtoverlay=sunfounder-fusionhat to config.txt.")
try:
answer = input(" Reboot now to apply? (y/N): ").strip().lower()
if answer in ("y", "yes"):
print(" Rebooting...")
run_command("sudo reboot 2>&1")
else:
print(" Reboot later with: sudo reboot")
except (KeyboardInterrupt, EOFError):
print("")
print(" Reboot later with: sudo reboot")
return True
def remove_dt_overlay() -> bool:
"""Remove dtoverlay=sunfounder-fusionhat from config.txt.
Returns:
bool: True if the line was removed or not present
"""
if not _has_dtoverlay():
print("dtoverlay=sunfounder-fusionhat is not in config.txt.")
return True
if _remove_dtoverlay():
print("Removed dtoverlay=sunfounder-fusionhat from config.txt.")
print("Run 'sudo reboot' if the HAT was previously working via this overlay.")
return True
print("Failed to update config.txt. Check permissions.")
return False
def uninstall() -> bool:
"""Uninstall Fusion HAT: driver, DKMS, overlay, config, Python package.
Removes: loaded module, DKMS registration + source, .ko files,
.dtbo overlay, dtoverlay from config.txt, and the Python package.
Returns:
bool: True if uninstall succeeded (or nothing to do)
"""
import platform
from ._utils import run_command
print("")
print("=" * 60)
print(" Fusion HAT Uninstall")
print("=" * 60)
print("")
kv = platform.uname().release
ok = True
# 1. Unload the module
print(" [1/6] Unloading kernel module...")
if os.path.exists("/sys/module/fusion_hat"):
_, out = run_command("sudo rmmod fusion_hat 2>&1")
if os.path.exists("/sys/module/fusion_hat"):
print(f" [FAIL] Could not unload fusion_hat: {out.strip()}")
ok = False
else:
print(" [OK] fusion_hat module unloaded")
else:
print(" [OK] fusion_hat not loaded")
# 2. DKMS uninstall
print(" [2/6] Removing DKMS registration...")
_, dkms_status = run_command("dkms status fusion_hat 2>/dev/null || true")
if dkms_status.strip():
for line in dkms_status.strip().split("\n"):
ver = line.split("/")[1].split(",")[0].strip() if "/" in line else ""
if ver:
run_command(f"sudo dkms remove -m fusion_hat -v {ver} --all 2>/dev/null")
_, dkms_after = run_command("dkms status fusion_hat 2>/dev/null || true")
if not dkms_after.strip():
print(" [OK] DKMS registration removed")
else:
print(f" [!] DKMS may still have entries: {dkms_after.strip()}")
else:
print(" [OK] Not registered with DKMS")
import glob as _glob
for dkms_dir in _glob.glob("/usr/src/fusion_hat-*"):
run_command(f"sudo rm -rf {dkms_dir} 2>/dev/null")
# 3. Remove module files
print(" [3/6] Removing kernel module files...")
ko_paths = [
f"/lib/modules/{kv}/extra/fusion_hat.ko",
f"/lib/modules/{kv}/updates/fusion_hat.ko",
f"/lib/modules/{kv}/extra/fusion_hat.ko.xz",
f"/lib/modules/{kv}/updates/fusion_hat.ko.xz",
]
removed = 0
for p in ko_paths:
if os.path.isfile(p):
run_command(f"sudo rm -f {p} 2>/dev/null")
if not os.path.isfile(p):
removed += 1
run_command("sudo depmod -a 2>/dev/null")
print(f" [OK] Removed {removed} module file(s)")
# 4. Remove dtbo from overlays
print(" [4/6] Removing device-tree overlay (.dtbo)...")
dtbo_name = "sunfounder-fusionhat.dtbo"
overlay_dirs = [
"/boot/firmware/overlays",
"/boot/overlays",
]
dtbo_removed = False
for d in overlay_dirs:
p = os.path.join(d, dtbo_name)
if os.path.isfile(p):
run_command(f"sudo rm -f {p} 2>/dev/null")
dtbo_removed = True
print(f" [OK] {'Removed' if dtbo_removed else 'Not found'}")
# 5. Remove dtoverlay from config.txt
print(" [5/6] Removing dtoverlay from config.txt...")
if _has_dtoverlay():
if _remove_dtoverlay():
print(" [OK] dtoverlay removed from config.txt")
else:
print(" [FAIL] Could not remove dtoverlay from config.txt")
ok = False
else:
print(" [OK] No dtoverlay in config.txt")
# 6. Uninstall Python package
print(" [6/6] Uninstall Python package...")
_, pip_out = run_command(
"pip show fusion-hat 2>/dev/null | grep -i location",
timeout=10,
)
if pip_out.strip():
_, out = run_command(
"sudo pip uninstall -y fusion-hat --break-system-packages 2>&1",
timeout=30,
)
if "Successfully uninstalled" in out:
print(" [OK] Python package uninstalled")
else:
print(f" [!] pip uninstall returned: {out.strip()[-120:]}")
else:
print(" [OK] Python package not found")
print("")
if ok:
print(" Uninstall complete. Reboot to fully clean up.")
else:
print(" Uninstall completed with some issues. Check output above.")
print("")
print("=" * 60)
print("")
return ok
def require_fusion_hat(func: Callable[..., Any]) -> Callable[..., Any]:
""" Decorator to require Fusion HAT
Args:
func (Callable[..., Any]): function to decorate
Returns:
Callable[..., Any]: decorated function
"""
def wrapper(*arg, **kwargs):
raise_if_fusion_hat_not_ready()
return func(*arg, **kwargs)
return wrapper
[ドキュメント]
@require_fusion_hat
def enable_speaker() -> None:
""" Enable speaker """
PATH = DEVICE_PATH + "speaker"
with open(PATH, "w") as f:
f.write("1")
[ドキュメント]
@require_fusion_hat
def disable_speaker() -> None:
""" Disable speaker """
PATH = DEVICE_PATH + "speaker"
with open(PATH, "w") as f:
f.write("0")
[ドキュメント]
@require_fusion_hat
def get_speaker_state() -> bool:
""" Get speaker state
Returns:
bool: True if enabled
"""
PATH = DEVICE_PATH + "speaker"
with open(PATH, "r") as f:
state = f.read()[:-1] # [:-1] rm \n
return state == "1"
[ドキュメント]
@require_fusion_hat
def get_usr_btn() -> bool:
""" Get user button state
Returns:
bool: True if pressed
"""
PATH = DEVICE_PATH + "button"
with open(PATH, "r") as f:
state = f.read()[:-1] # [:-1] rm \n
return state == "1"
@require_fusion_hat
def get_charge_state() -> bool:
""" [Deprecated] Get charge state
Returns:
bool: True if charging
"""
print("Warning: get_charge_state is deprecated, please use get_charge_state instead.")
path = f"/sys/class/power_supply/fusion-hat/charge_state"
with open(path, "r") as f:
state = f.read()[:-1] # [:-1] rm \n
return state == "1"
@require_fusion_hat
def get_battery_voltage() -> float:
""" [Deprecated] Get battery voltage
Returns:
float: battery voltage(V)
"""
print("Warning: get_battery_voltage is deprecated, please use get_battery_voltage instead.")
path = f"/sys/class/power_supply/fusion-hat/voltage_now"
with open(path, "r") as f:
voltage = f.read().strip()
voltage = float(voltage) / 1000
return voltage
def get_shutdown_request() -> None:
""" [Deprecated] Get shutdown request """
raise NotImplementedError("get_shutdown_request is deprecated.")
[ドキュメント]
@require_fusion_hat
def set_led(state: [int, bool]) -> None:
""" Set led state
Args:
state (int or bool): 0:off, 1:on, True:on, False:off
"""
path = f"{DEVICE_PATH}led"
with open(path, "w") as f:
f.write(str(int(state)))
@require_fusion_hat
def get_led() -> bool:
""" Get led state
Returns:
bool: True if on
"""
path = f"{DEVICE_PATH}led"
with open(path, "r") as f:
state = f.read().strip()
return state == "1"
[ドキュメント]
@require_fusion_hat
def get_firmware_version() -> str:
""" Get firmware version
Returns:
str: firmware version
"""
path = f"{DEVICE_PATH}firmware_version"
with open(path, "r") as f:
version = f.read().strip()
return version
@require_fusion_hat
def get_driver_version() -> str:
""" Get driver version
Returns:
str: driver version
"""
path = f"{DEVICE_PATH}version"
with open(path, "r") as f:
version = f.read().strip()
return version
[ドキュメント]
def set_volume(value: int) -> None:
""" Set volume
Args:
value (int): volume(0~100)
"""
value = min(100, max(0, value))
cmd = "sudo amixer -M sset 'fusion_hat speaker' %d%%" % value
os.system(cmd)