Great topic. A few thoughts on kinematics and streaming.
Firmware kinetics matters for fast reads.
Calcium flux, ion channel, fast enzyme assays, these need millisecond-resolution reads on a single well right after injection. The firmware parks on one well and runs a tight internal loop capturing hundreds of reads per second. A software loop cannot do that i think. Same principle as thermocyclers uploading a full protocol indeed.
For slow plate-mode kinetics, a software loop is fine. But the backend should indeed expose firmware kinetics where available.
On streaming, a signal-based pattern could work nicely.
related to my comment in the plr architecture thread: 3. Signals — alongside Protocols
The idea: a driver declares what it can emit as typed signals, and consumers subscribe before the run starts. Data flows through a buffered queue so nothing is lost.
Here’s a worked-out example. A driver declares its signals as class attributes:
class SynergyH1Driver(CanReadAbsorbance):
absorbance = SignalR(unit="OD", dtype="array")
temperature = SignalR(unit="°C", dtype="number")
async def start_kinetic(self, n_cycles, interval, wavelength):
await self._send("start_kinetic", n_cycles, interval, wavelength)
# Firmware runs the loop — each cycle, data arrives over the wire
asyncio.create_task(self._consume_cycles(n_cycles))
async def _consume_cycles(self, n_cycles):
for i in range(n_cycles):
raw = await self._read_cycle_result()
self.absorbance.emit(raw)
self.temperature.emit(await self._read_temp())
The capability opens a stream before telling the firmware to start — subscribe first, then go:
class AbsorbanceCapability:
async def read_kinetic(self, plate, n_cycles, interval, wavelength):
positions = self._plate_to_positions(plate)
# Open the gate BEFORE firmware starts — nothing missed
stream = self.backend.absorbance.stream()
await self.backend.start_kinetic(n_cycles, interval, wavelength)
# Each cycle arrives as it's measured
async for data in stream:
yield data
User code:
async for reading in reader.absorbance.read_kinetic(
plate, n_cycles=10, interval=60, wavelength=600):
print(f"Mean OD: {reading.mean():.3f}")
# Arrives every 60s — not batched at the end
The SignalR is simple — just a typed channel with a subscriber queue:
class SignalR:
def __init__(self, unit, dtype="number"):
self.unit = unit
self.dtype = dtype
self._subscribers = []
def emit(self, value):
for queue in self._subscribers:
queue.put_nowait(value)
async def stream(self):
queue = asyncio.Queue()
self._subscribers.append(queue)
try:
while True:
value = await queue.get()
yield value
finally:
self._subscribers.remove(queue)
This same pattern works across dimensions, the signal doesn’t care what’s advancing. Time series (each cycle emits one plate array), spatial scanning (each position emits one measurement), or wavelength sweep (each λ emits one plate array).
This signal pattern is borrowed from bluesky/ophiod; the hardware abstraction layer used at NSLS-II synchrotron beamlines. Their SignalR / SignalRW / SignalX hierarchy solves the same problem, typed, subscribable channels between hardware drivers and experiment orchestration. We’d be adapting it for lab instruments instead of beamline components. In ophyd-async these are primarily used for continuously updating sensors, but the same subscribe-and-iterate mechanism likely works for discrete measurement sequences like kinetic reads, the signal just emits one value per cycle instead of continuously.