|
| 1 | +from adafruit_bus_device.i2c_device import I2CDevice |
| 2 | +from .transceiver_v1 import I2cTransceiverV1 |
| 3 | +import time |
| 4 | + |
| 5 | +class CircuitPythonI2cTransceiver(I2cTransceiverV1): |
| 6 | + def __init__(self, i2c, device_address): |
| 7 | + super(CircuitPythonI2cTransceiver, self).__init__() |
| 8 | + self._device = I2CDevice(i2c, device_address) |
| 9 | + |
| 10 | + @property |
| 11 | + def description(self): |
| 12 | + return "CircuitPython I2C Transceiver" |
| 13 | + |
| 14 | + def transceive(self, slave_address, tx_data, rx_length, read_delay, timeout): |
| 15 | + error = None |
| 16 | + status = self.STATUS_OK |
| 17 | + try: |
| 18 | + if tx_data is not None: |
| 19 | + self.write(tx_data) |
| 20 | + except OSError as e: |
| 21 | + status = self.STATUS_UNSPECIFIED_ERROR |
| 22 | + error = e |
| 23 | + |
| 24 | + if read_delay > 0: |
| 25 | + time.sleep(read_delay) |
| 26 | + if(status == self.STATUS_OK): |
| 27 | + try: |
| 28 | + if rx_length is not None: |
| 29 | + return status, error, self.read(rx_length) |
| 30 | + except OSError as e: |
| 31 | + status = self.STATUS_UNSPECIFIED_ERROR |
| 32 | + error = e |
| 33 | + |
| 34 | + return status, error, bytearray(b"") |
| 35 | + |
| 36 | + |
| 37 | + def write(self, data): |
| 38 | + data_bytes = bytearray(data) if isinstance(data, (bytes, bytearray, list, tuple)) else bytearray() |
| 39 | + if(len(data_bytes) == 0): |
| 40 | + return |
| 41 | + with self._device as i2c_device: |
| 42 | + i2c_device.write(data_bytes) |
| 43 | + |
| 44 | + def read(self, count): |
| 45 | + read_buffer = bytearray(count) |
| 46 | + with self._device as i2c_device: |
| 47 | + i2c_device.readinto(read_buffer) |
| 48 | + return read_buffer |
0 commit comments