from fusion_hat.modules.mpu6050 import MPU6050
from smbus2 import SMBus
from enum import Enum
import time
# I2C bus ID
I2C_BUS = 1
[Doku]
class MagnetometerType(Enum):
mag_QMC6310 = 1
mag_QMC5883P = 2
mag_QMC5883L = 3
mag_HMC5883L = 4
[Doku]
def convert_2_int16(value):
if value > 32767:
value = -(65536 - value)
return value
[Doku]
def i2c_ack(bus, addr):
"""
Check if a device responds at the specified I2C address
Parameters:
bus: SMBus instance
addr: I2C address to check
Returns:
bool: True if the device responds, False otherwise
"""
try:
bus.read_byte(addr)
return True
except Exception:
return False
[Doku]
class QMC6310:
# https://www.qstcorp.com/upload/pdf/202202/%EF%BC%88%E5%B7%B2%E4%BC%A0%EF%BC%8913-52-17%20QMC6310%20Datasheet%20Rev.C(1).pdf
# I2C address of QMC6310 compass sensor
DEF_ADDR = 0x1C
'''
# Define register address of QMC6310 compass sensor
'''
QMC6310_REG_DATA_START = 0x01
QMC6310_REG_DTAT_X= 0x01 # 0x01, 0x02
QMC6310_REG_DTAT_Y= 0x03 # 0x03, 0x04
QMC6310_REG_DTAT_Z= 0x05 # 0x05, 0x06
QMC6310_REG_STATUS = 0x09
QMC6310_REG_CONTROL_1 = 0x0A
QMC6310_REG_CONTROL_2 = 0x0B
'''
# Define the register parameter configuration of the sensor
'''
QMC6310_VAL_MODE_SUSPEND = 0 << 0
QMC6310_VAL_MODE_NORMAL = 1 << 0
QMC6310_VAL_MODE_SINGLE = 2 << 0
QMC6310_VAL_MODE_CONTINUOUS = 3 << 0
QMC6310_VAL_ODR_10HZ = 0 << 2
QMC6310_VAL_ODR_50HZ = 1 << 2
QMC6310_VAL_ODR_100HZ = 2 << 2
QMC6310_VAL_ODR_200HZ = 3 << 2
QMC6310_VAL_OSR1_8 = 0 << 4
QMC6310_VAL_OSR1_4 = 1 << 4
QMC6310_VAL_OSR1_2 = 2 << 4
QMC6310_VAL_OSR1_1 = 3 << 4
QMC6310_VAL_OSR2_1 = 0 << 6
QMC6310_VAL_OSR2_2 = 1 << 6
QMC6310_VAL_OSR2_4 = 2 << 6
QMC6310_VAL_OSR2_8 = 3 << 6
QMC6310_VAL_MODE_SET_RESET_ON = 0 << 0
QMC6310_VAL_MODE_SET_ON = 1 << 0
QMC6310_VAL_MODE_SET_RESET_OFF = 2 << 0
RANGE = {
"30G":0 << 2, # 1000 LSB/Gauss
"12G":1 << 2, # 2500 LSB/Gauss
"8G":2 << 2, # 3750 LSB/Gauss
"2G":3 << 2, # 15000 LSB/Gauss
}
LSB = {
"30G":1, # 1 LSB/mGauss
"12G":2.5, # 2.5 LSB/mGauss
"8G":3.75, # 3.75 LSB/mGauss
"2G":15, # 15 LSB/mGauss
}
QMC6310_VAL_SELF_TEST_ON = 1 << 6
QMC6310_VAL_SELF_TEST_OFF = 0 << 6
QMC6310_VAL_SOFT_RST_ON = 1 << 7
QMC6310_VAL_SOFT_RST_OFF = 0 << 7
def __init__(self, bus: SMBus, addr: int = DEF_ADDR, field_range="8G"):
"""
Initialize QMC6310 magnetometer
Parameters:
bus: I2C bus
addr: Device I2C address, default is DEF_ADDR (0x1C)
field_range: Magnetic field range ("30G", "12G", "8G", "2G")
"""
self.bus = bus
self.ADDR = addr
# Check if device is available
try:
self.bus.read_byte(self.ADDR)
except Exception:
raise IOError("QMC6310 is not available")
if field_range in ["30G", "12G", "8G", "2G"]:
self.field_range = field_range
else:
self.field_range = "8G"
raise ValueError("field_range must be one of ['30G', '12G', '8G', '2G']")
# Initialize
self.bus.write_byte_data(self.ADDR, 0x29, 0x06)
self.bus.write_byte_data(self.ADDR, self.QMC6310_REG_CONTROL_2, self.RANGE[field_range])
self.bus.write_byte_data(self.ADDR, self.QMC6310_REG_CONTROL_1,
self.QMC6310_VAL_MODE_NORMAL | self.QMC6310_VAL_ODR_200HZ | self.QMC6310_VAL_OSR1_8 | self.QMC6310_VAL_OSR2_8)
[Doku]
def read_raw(self):
"""
Read raw magnetometer data
Returns:
list: [x, y, z] raw magnetic field values
"""
# Read 6 bytes starting from data register
result = self.bus.read_i2c_block_data(self.ADDR, self.QMC6310_REG_DATA_START, 6)
x = convert_2_int16(result[1] << 8 | result[0])
y = convert_2_int16(result[3] << 8 | result[2])
z = convert_2_int16(result[5] << 8 | result[4])
return [x, y, z]
[Doku]
def read_magnet(self):
"""
Read magnetometer data in Gauss units (consistent with other magnetometer classes)
Returns:
tuple: (x, y, z) magnetic field values in Gauss
"""
raw_data = self.read_raw()
lsb_value = self.LSB[self.field_range]
# Convert to Gauss (divide by 1000 since LSB is in mGauss)
return (raw_data[0] / (lsb_value * 1000), raw_data[1] / (lsb_value * 1000), raw_data[2] / (lsb_value * 1000))
[Doku]
class HMC5883L:
"""
HMC5883L 3-axis magnetometer sensor driver class
Used to measure Earth's magnetic field strength, suitable for direction detection and heading angle calculation
"""
DEF_ADDR = 0x1E
REG_CONFIG_A = 0x00
REG_CONFIG_B = 0x01
REG_MODE = 0x02
REG_OUT_X_H = 0x03
def __init__(self, bus: SMBus, addr: int = DEF_ADDR):
self.bus = bus
self.ADDR = addr
self.bus.write_byte_data(self.ADDR, self.REG_CONFIG_A, 0b01110000) # average 8 samples, 15Hz output rate
self.bus.write_byte_data(self.ADDR, self.REG_CONFIG_B, 0x20) # gain 1.3 Ga
self.bus.write_byte_data(self.ADDR, self.REG_MODE, 0x00) # continuous measurement mode
time.sleep(0.006)
self.scale = 1090.0
[Doku]
def _read_word(self, reg_h):
"""
Read a signed 16-bit integer from the sensor
Parameters:
reg_h: high-byte register address
Returns:
val: converted signed 16-bit integer
"""
high = self.bus.read_byte_data(self.ADDR, reg_h)
low = self.bus.read_byte_data(self.ADDR, reg_h + 1)
val = (high << 8) | low
if val >= 0x8000:
val = -((65535 - val) + 1)
return val
[Doku]
def read_magnet(self):
"""
Read magnetometer data
Returns:
x, y, z: X, Y, Z axis magnetic field strength values (unit: Gauss)
Note: HMC5883L data register order is X, Z, Y
"""
x = self._read_word(self.REG_OUT_X_H)
z = self._read_word(self.REG_OUT_X_H + 4) # X Z Y
y = self._read_word(self.REG_OUT_X_H + 2)
return (x / self.scale, y / self.scale, z / self.scale)
[Doku]
class QMC5883L:
"""
QMC5883L 3-axis magnetometer sensor driver class
Used to measure Earth's magnetic field strength, is a common alternative to HMC5883L
"""
# Class constant definition
DEF_ADDR = 0x0D
REG_CONTROL = 0x09
REG_SET_RESET = 0x0B
REG_OUT_X_L = 0x00
def __init__(self, bus: SMBus, addr: int = DEF_ADDR):
"""
Initialize QMC5883L magnetometer
Parameters:
bus: SMBus instance for I2C communication
addr: Device I2C address, default is DEF_ADDR (0x0D)
"""
self.bus = bus
self.ADDR = addr
try:
self.bus.write_byte_data(self.ADDR, self.REG_SET_RESET, 0x01)
except OSError:
pass
# Configure control register: 0b11110101
# OSR2,OSR1=11 (512-sample averaging) | RNG1,RNG0=11 (±8 Gauss range)
# ODR1,ODR0=10 (50 Hz output data rate) | MODE1,MODE0=01 (continuous measurement mode)
self.bus.write_byte_data(self.ADDR, self.REG_CONTROL, 0b11110101)
time.sleep(0.01)
self.scale = 12000.0
[Doku]
def _read_word_le(self, reg_l):
"""
Read a signed 16-bit integer from the sensor in little-endian format
Note: QMC5883L uses little-endian format to store data
Parameters:
reg_l: Low byte register address
Returns:
val: Converted signed 16-bit integer
"""
# First read the low byte
low = self.bus.read_byte_data(self.ADDR, reg_l)
# Then read the high byte
high = self.bus.read_byte_data(self.ADDR, reg_l + 1)
# Combine the two bytes into a 16-bit integer (note little-endian format)
val = (high << 8) | low
# Convert to signed integer
if val >= 0x8000:
val = -((65535 - val) + 1)
return val
[Doku]
def read_magnet(self):
"""
Read magnetometer data
Returns:
x, y, z: X, Y, Z axis magnetic field strength values (unit: Gauss)
Note: QMC5883L data register order is X, Y, Z (different from HMC5883L)
"""
x = self._read_word_le(self.REG_OUT_X_L + 0)
y = self._read_word_le(self.REG_OUT_X_L + 2)
z = self._read_word_le(self.REG_OUT_X_L + 4)
return (x / self.scale, y / self.scale, z / self.scale)
[Doku]
class QMC5883P:
"""
QMC5883P 3-axis magnetometer sensor driver class
Another version of the QMC5883 series with I2C address 0x2C
"""
DEF_ADDR = 0x2C
REG_X_LSB = 0x01
REG_STATUS = 0x09
REG_MODE = 0x0A
REG_CONFIG = 0x0B
def __init__(self, bus: SMBus, addr: int = DEF_ADDR):
"""
Initialize QMC5883P magnetometer
Parameters:
bus: SMBus instance for I2C communication
addr: Device I2C address, default is DEF_ADDR (0x2C)
"""
self.bus = bus
self.ADDR = addr
try:
self.bus.write_byte_data(self.ADDR, self.REG_MODE, 0xCF)
time.sleep(0.003)
self.bus.write_byte_data(self.ADDR, self.REG_CONFIG, 0x08)
time.sleep(0.003)
except Exception:
pass
self.scale = 12000.0
[Doku]
@staticmethod
def _to_i16(lsb, msb):
"""
Convert two bytes to a 16-bit signed integer
Parameters:
lsb: Low byte value
msb: High byte value
Returns:
v: Converted 16-bit signed integer
"""
v = (msb << 8) | lsb
if v >= 0x8000:
v -= 0x10000
return v
[Doku]
def read_magnet(self):
"""
Read magnetometer data
Reads 6 bytes starting from 0x01 register (little-endian signed format)
Features: Returns zero on error, does not interrupt main loop
Returns:
x, y, z: X, Y, Z axis magnetic field strength values (unit: Gauss)
"""
try:
raw = self.bus.read_i2c_block_data(self.ADDR, self.REG_X_LSB, 6)
# Data order: X_LSB, X_MSB, Y_LSB, Y_MSB, Z_LSB, Z_MSB
x = self._to_i16(raw[0], raw[1]) # X axis data
y = self._to_i16(raw[2], raw[3]) # Y axis data
z = self._to_i16(raw[4], raw[5]) # Z axis data
# Convert to Gauss unit and return
return (x / self.scale, y / self.scale, z / self.scale)
except Exception:
return (0.0, 0.0, 0.0)
#---------------- magnetometer ----------------
[Doku]
class Magnetometer:
"""
Magnetometer sensor driver class
Integrates QMC6310, HMC5883L, QMC5883L, and QMC5883P magnetometers
"""
def __init__(self, mag_type=None, field_range="8G"):
"""
Initialize the magnetometer based on the specified type
Parameters:
mag_type: The type of magnetometer to initialize (use None for auto-detection)
field_range: Magnetic field range for QMC6310 ("30G", "12G", "8G", "2G")
"""
self.active_magnetometer = None
self.mag_type = None
self.mpu = None
try:
self.bus = SMBus(I2C_BUS)
if mag_type == MagnetometerType.mag_QMC6310:
try:
self.active_magnetometer = QMC6310(self.bus, addr=0x1C, field_range=field_range)
self.mag_type = "QMC6310"
print(f"Magnetometer: {self.mag_type} @0x1C")
except Exception as e:
print(f"Failed to initialize QMC6310: {e}")
# If QMC6310 failed but was explicitly requested, don't try others
if mag_type == MagnetometerType.mag_QMC6310:
return
else:
try:
self.mpu = MPU6050(bus=I2C_BUS)
self.mpu.enable_bypass()
print("MPU6050 bypass enabled")
except Exception as e:
print(f"MPU6050 bypass mode failed: {e}")
if mag_type == MagnetometerType.mag_QMC5883P or (mag_type is None and i2c_ack(self.bus, 0x2C)):
try:
self.active_magnetometer = QMC5883P(self.bus, addr=0x2C)
self.mag_type = "QMC5883P"
print(f"Magnetometer: {self.mag_type} @0x2C")
except Exception as e:
print(f"Failed to initialize QMC5883P: {e}")
if mag_type == MagnetometerType.mag_QMC5883L or (mag_type is None and i2c_ack(self.bus, 0x0D)):
try:
self.active_magnetometer = QMC5883L(self.bus, addr=0x0D)
self.mag_type = "QMC5883L"
print(f"Magnetometer: {self.mag_type} @0x0D")
except Exception as e:
print(f"Failed to initialize QMC5883L: {e}")
if mag_type == MagnetometerType.mag_HMC5883L or (mag_type is None and i2c_ack(self.bus, 0x1E)):
try:
self.active_magnetometer = HMC5883L(self.bus, addr=0x1E)
self.mag_type = "HMC5883L"
print(f"Magnetometer: {self.mag_type} @0x1E")
except Exception as e:
print(f"Failed to initialize HMC5883L: {e}")
if self.active_magnetometer is None:
print("No magnetometer found on 0x1C/0x2C/0x0D/0x1E; running without active_magnetometer.")
except Exception as e:
print(f"Error during magnetometer initialization: {e}")
self.active_magnetometer = None
[Doku]
def read(self):
"""
Read magnetometer data
Returns:
tuple: (x, y, z) magnetic field values in appropriate units based on sensor type
None: if no magnetometer is initialized
"""
if self.active_magnetometer is None:
return None
try:
# Now all magnetometer classes have a consistent read_magnet method
return self.active_magnetometer.read_magnet()
except Exception as e:
print(f"Error reading magnetometer: {e}")
return None
[Doku]
def get_type(self):
"""
Get the type of initialized magnetometer
Returns:
str: Magnetometer type
"""
return self.mag_type
# Example usage
if __name__ == "__main__":
# Auto-detect magnetometer (default behavior)
mag = Magnetometer()
# Alternative: Specify magnetometer type using enum
# mag = Magnetometer(mag_type=MagnetometerType.mag_QMC6310)
# mag = Magnetometer(mag_type=MagnetometerType.mag_QMC5883P)
# mag = Magnetometer(mag_type=MagnetometerType.mag_QMC5883L)
# mag = Magnetometer(mag_type=MagnetometerType.mag_HMC5883L)
print(f"Initialized magnetometer type: {mag.get_type()}")
# Read and print data if magnetometer is initialized
if mag.get_type():
try:
while True:
data = mag.read()
if data:
print(f"Magnetic field: X={data[0]:.2f}, Y={data[1]:.2f}, Z={data[2]:.2f}")
time.sleep(0.5)
except KeyboardInterrupt:
print("\nMeasurement stopped by user")
else:
print("No magnetometer available")