diff --git a/pslab/connection/__init__.py b/pslab/connection/__init__.py index fe7b50a..619e420 100644 --- a/pslab/connection/__init__.py +++ b/pslab/connection/__init__.py @@ -5,6 +5,15 @@ from .connection import ConnectionHandler from ._serial import SerialHandler from .wlan import WLANHandler +from .mock import MockHandler + +__all__ = [ + "ConnectionHandler", + "SerialHandler", + "WLANHandler", + "autoconnect", + "MockHandler", +] def detect() -> list[ConnectionHandler]: diff --git a/pslab/connection/mock.py b/pslab/connection/mock.py new file mode 100644 index 0000000..3269d94 --- /dev/null +++ b/pslab/connection/mock.py @@ -0,0 +1,123 @@ +"""Mock connection handler for PSLab. + +This module provides a minimal in-memory `ConnectionHandler` implementation for +use in tests and development without physical PSLab hardware. +""" + +from __future__ import annotations +from collections import deque +import pslab.protocol as CP +from pslab.connection.connection import ConnectionHandler + + +class MockHandler(ConnectionHandler): + """In-memory mock implementation of `ConnectionHandler`. + + The handler queues deterministic responses based on bytes written via + `write()` so higher-level code can be exercised without an actual device. + """ + + def __init__(self, version: str = "PSLab V6 ", fw=(3, 0, 0)) -> None: + """Initialize MockHandler. + + Parameters + ---------- + version : str, optional + Version string to return when queried. Default is "PSLab V6 ". + fw : tuple of int, optional + Firmware version as a (major, minor, patch) tuple. Default is (3, 0, 0). + """ + self._rx = deque() # bytes to be read + self._tx = bytearray() # bytes written by client + self.version = version # convenient attribute for callers + self._fw = fw + self._connected = False + + def connect(self) -> None: + """Mark the handler as connected.""" + self._connected = True + # Optional: validate the mock “device” by answering get_version + # self.version = self.get_version() + + def disconnect(self) -> None: + """Mark the handler as disconnected.""" + self._connected = False + + def read(self, numbytes: int) -> bytes: + """Read bytes from the internal receive buffer. + + Parameters + ---------- + numbytes : int + Number of bytes to read. + + Returns + ------- + bytes + Bytes read from the receive buffer (may be shorter if insufficient data + is available). + """ + out = bytearray() + while len(out) < numbytes and self._rx: + out.append(self._rx.popleft()) + return bytes(out) + + def write(self, data: bytes) -> int: + """Write bytes to the handler and queue any corresponding responses. + + Parameters + ---------- + data : bytes + Bytes written by the caller. + + Returns + ------- + int + Number of bytes written. + """ + self._tx.extend(data) + self._maybe_respond() + return len(data) + + def _queue(self, payload: bytes) -> None: + """Append bytes to the internal receive buffer. + + Parameters + ---------- + payload : bytes + Bytes to enqueue so they can be returned by `read()`. + """ + self._rx.extend(payload) + + def _maybe_respond(self) -> None: + """Inspect written bytes and enqueue protocol responses. + + This method implements minimal protocol handling for mock mode. + When known command patterns are detected in the transmit buffer, + corresponding response bytes are queued for later reads. + """ + # Detect “CP.COMMON, ” patterns + while len(self._tx) >= 2: + if self._tx[0] != CP.COMMON[0]: + # Drop unknown leading bytes + self._tx.pop(0) + continue + + cmd = self._tx[1] + + # GET_VERSION: ConnectionHandler.get_version reads 9 bytes + # and checks b"PSLab" + if cmd == CP.GET_VERSION[0]: + self._tx = self._tx[2:] + self._queue(self.version.encode("utf-8")[:9].ljust(9, b" ")) + continue + + # GET_FW_VERSION: reads 3 bytes (major, minor, patch) + if cmd == CP.GET_FW_VERSION[0]: + self._tx = self._tx[2:] + major, minor, patch = self._fw + self._queue(bytes([major, minor, patch])) + continue + + # Unknown command under CP.COMMON: drop and stop + break diff --git a/pslab/sciencelab.py b/pslab/sciencelab.py index 1cbc81e..de7908f 100644 --- a/pslab/sciencelab.py +++ b/pslab/sciencelab.py @@ -11,7 +11,7 @@ from typing import Iterable, List import pslab.protocol as CP -from pslab.connection import ConnectionHandler, SerialHandler, autoconnect +from pslab.connection import ConnectionHandler, SerialHandler, MockHandler, autoconnect from pslab.instrument.logic_analyzer import LogicAnalyzer from pslab.instrument.multimeter import Multimeter from pslab.instrument.oscilloscope import Oscilloscope @@ -22,27 +22,57 @@ class ScienceLab: """Aggregate interface for the PSLab's instruments. + Parameters + ---------- + device : ConnectionHandler, optional + Connection handler for communicating with the PSLab device. If not + provided, a new one will be created via autoconnect. If both *device* + and *mock* are provided, *device* takes precedence and *mock* is + ignored. + mock : bool, optional + If True, use a MockHandler instead of connecting to physical hardware. + Instruments will not be instantiated in mock mode. The default is + False. + Attributes ---------- - logic_analyzer : pslab.LogicAnalyzer - oscilloscope : pslab.Oscilloscope - waveform_generator : pslab.WaveformGenerator - pwm_generator : pslab.PWMGenerator - multimeter : pslab.Multimeter - power_supply : pslab.PowerSupply + logic_analyzer : pslab.LogicAnalyzer or None + oscilloscope : pslab.Oscilloscope or None + waveform_generator : pslab.WaveformGenerator or None + pwm_generator : pslab.PWMGenerator or None + multimeter : pslab.Multimeter or None + power_supply : pslab.PowerSupply or None i2c : pslab.I2CMaster nrf : pslab.peripherals.NRF24L01 + + Notes + ----- + Instrument attributes are None when initialized with mock=True. """ - def __init__(self, device: ConnectionHandler | None = None): - self.device = device if device is not None else autoconnect() + def __init__(self, device: ConnectionHandler | None = None, mock: bool = False): + if device is not None: + self.device = device + elif mock: + self.device = MockHandler() + else: + self.device = autoconnect() self.firmware = self.device.get_firmware_version() - self.logic_analyzer = LogicAnalyzer(device=self.device) - self.oscilloscope = Oscilloscope(device=self.device) - self.waveform_generator = WaveformGenerator(device=self.device) - self.pwm_generator = PWMGenerator(device=self.device) - self.multimeter = Multimeter(device=self.device) - self.power_supply = PowerSupply(device=self.device) + + if not mock: # In mock mode, skip instrument initialization to avoid hardware dependencies + self.logic_analyzer = LogicAnalyzer(device=self.device) + self.oscilloscope = Oscilloscope(device=self.device) + self.waveform_generator = WaveformGenerator(device=self.device) + self.pwm_generator = PWMGenerator(device=self.device) + self.multimeter = Multimeter(device=self.device) + self.power_supply = PowerSupply(device=self.device) + else: + self.logic_analyzer = None + self.oscilloscope = None + self.waveform_generator = None + self.pwm_generator = None + self.multimeter = None + self.power_supply = None @property def temperature(self): diff --git a/tests/test_sciencelab_mock.py b/tests/test_sciencelab_mock.py new file mode 100644 index 0000000..a0fbf40 --- /dev/null +++ b/tests/test_sciencelab_mock.py @@ -0,0 +1,25 @@ +from unittest.mock import patch + +from pslab.sciencelab import ScienceLab + + +def test_sciencelab_mock_does_not_autoconnect(): + # If autoconnect is called, the test should fail immediately. + with patch( + "pslab.sciencelab.autoconnect", + side_effect=AssertionError("autoconnect should not be called"), + ): + psl = ScienceLab(mock=True) + + # It should initialize and provide the expected mock firmware version object. + assert psl.firmware.major == 3 + assert psl.firmware.minor == 0 + assert psl.firmware.patch == 0 + + # In mock mode, instruments should not be instantiated (no hardware required). + assert psl.logic_analyzer is None + assert psl.oscilloscope is None + assert psl.waveform_generator is None + assert psl.pwm_generator is None + assert psl.multimeter is None + assert psl.power_supply is None