Networked interface for PLR

I was checking out the codebase, and noticed that there was a websocket backend for liquid handling, but I couldn’t really find http backends for any of the other devices. For example, in pylabrobot/heating_shaking, I don’t see any http/websocket communication capabilities.

If I have a bunch of devices instantiating a workcell, is the idea that each one is its own API? Or perhaps should they be combined into one universal API (since you probably want things to be controlled by a central service because they’re all on one deck and need to be coordinated)? Is this in scope of PLR?

I’m thinking right now that the way to go would be to create a python API on top of PLR to expose a full workcell, and each individual control that we want to expose are put into that API. This would largely not be PLR, just basically a connector between plr and an api layer. Thoughts?

they don’t exist yet but they should. chat will be able to write all of them very easily

yes, one central server is the goal

routes like /lh/<device_id>/<command>

maybe we can use one flask blueprint per PLR module, and dynamically register those using a small pylabrobot.server master cli (with nice options and mappings)

I would like for some kind of server capability to exist in PLR.

when I think about open source vs internal the question is mostly “is everyone gonna be writing the same thing?” with drivers and simple abstractions like this the answer is obviously yes. so then you realize other people are gonna collaborate and make an open source version regardless, so you might as well lead that project and get on the train sooner rather than later

in terms of whether “this would largely be PLR”, I guess in some sense it’s an HTTP <> PLR* bridge where PLR* means the interface layer (the current “PLR” python api). But since the HTTP > part is pretty small and still 1:1 tied to all PLR interfaces (like when we make a change in the front end), I think it should live in the same repo. Maintenance will be easiest this way. This will be the “PLR HTTP api”.

As you wrote in your post, I already have a small start at this system with the LH server. But 1) we need more servers and a 2) a way to nicely connect them and 3) (most work) a nice api for the resource model. for (3) I have done almost no work, and it requires more architecture thinking. (1) and (2) I can see clearly.

@koeng and I decided in call on:

  • HTTP (we can use default tools like curl) is best because it’s universal. we will send POST requests for actions, and having an action queue. (the current implementation has a basic prototype version of this)
    • in addition we will have a websocket to pull commands, this is faster
  • pure websockets were considered. the downside is we can’t send commands using simple tools like curl
  • gRPC was considered. benefits of code generation are not super useful to us because we will still need custom code on the client side to implement the unavoidable async nature of PLR. The downside is it needs custom tools and doesn’t have support in every language (eg Zig)
1 Like

next we are discussing how to deal with the resource model. the backend requires this, and it might change in between commands or even as commands are being sent

unfortunately we have to pass resources as a part of the standard.py objects. one example: star backend has “minimum search height for liquid level detection”, which can’t be part of the universal command and does require getting the absolute location of a resource. in short: the resource model + current state of it have to be part of our universal standard (raw numbers are not possible because robots aren’t the same at that level of abstraction)

very machine-specific servers

STARBackend has two levels of commands: (1) implementations of universal PLR commands (conforming to LiquidHandlerBackend spec) and (2) one python function per firmware command. the second category only uses fundamental types

extremely specific to backends will work when having a STAR-specific client and server, where the client overrides (2) commands and server calls (2) commands directly

however it doesn’t scale with other machines. it will double the work needed. if we can avoid this we should

there are around a dozen universal commands for LHBackend: pylabrobot/pylabrobot/liquid_handling/backends/backend.py at 22f7d05490349a5b5bf0a8331d613a6ed9aa9f41 · PyLabRobot/pylabrobot · GitHub. star implements all of them

we have hundreds of firmware commands implemented and growing (slowly)

my main concern is with expanding to different machines

sharing resources

the client and server both need a model of the current resources on deck

  • how do we get it there?
  • how do we communicate changes?

approach 1

sent diff alongside every client/server interaction. ie only communicate about resources when actually doing hardware stuff

we can efficiently learn diff through tree + hash model. but client and server will have to have code to update their states based on the diff they get

approach 2

we could use the existing callback architecture that exists in the resource model to push resource state updates. server->client updates will require a websocket. client->server could be over http but probably should not be so we can be uniform.

resource model updates are sync whereas machine commands are async

(this implies send_websocket_message is a sync function because state updates are sync)

approach ?

there might be other ideas to consider

approach 3

go through io layer, very easy to implement remote-robot calls but very little control/manipulation possibilities and very difficult to create good clients (would need full re-implementation of PLR if not using PLR itself as the client :sweat_smile:)

I think I’ll want full control, so we can implement a general commands + the specific commands, but I am also curious if there is anyone who only uses the universal commands.

So basically over the websocket the server could send either task completion OR resource updates? The basic problem that solves is that, unlike simply intercepting calls, the API client is only updated once a task has been successfully completed (like a plate movement or something).

I actually considered this for a bit but I think it actually super sucks if you aren’t using PLR as a client

you would only do specific commands, the level (2) ones. for example, no aspirate just aspirate_pip (the C0AS... firmware command)

yes task updates AND resource updates would go over WS.

client is always only updated once the machine operation completes because the deck model is only updated after a machine update completes. for example, when dropping a resource to a specific location we send the command but only assign the resource to the new location on successful drop

the problem this solves is we don’t have to compare resource states at the time a new command is sent (complicated to implement), but we can do so as soon as the model updates (easy to implement with callbacks), either client or server side.

yes it would be horrible when not using PLR as a front end … the only thing this would enable is “PLR-io over network”

How are you thinking about multiple commands at once? For example, if you send two commands at once.

In one implementation, we could lock the robot - only 1 command runs at a time, and anything else will fail until that command completes. The other way is that we could create a queue of commands as they come in.

Personally, I think just failing upon a command currently running makes more sense - mostly, we don’t have to deal with the queue and the complications that can come from a queue. Just slap a lock on the bot and call it a day (this is how my opentronsfastapi worked)

Thoughts?

typically concurrency of operations is managed/locked by the backend

I would be happy with the server locking one task per machine, at least initially. we can always expand to a proper concurrency later. in v1 we do definitely need support concurrent tasks on different machines (e.g. read plate while doing liquid handling).

I would not like a queue in the sense that we can stack commands from the client onto the server, stuff should be done in real time. I shouldn’t have used the term queue above and let’s not use it hereafter because it’s not really a queue. more so a task list that contains currently running commands server side. (in case a machine can support two commands being executed at the same time, eg a star has a couple of those)

Decided to prototype some code. Let me get your thoughts @Rick

It is AI generated, which I just am using to think about ideas. It gives some scratchpad to actually talk about implementation in a system that actually does work. But don’t feel the need to read deeply (other than knowing the concept works and approximately what it is going to look like).

The concept is this: each command that is sent gets acknowledged with a task ID. You can check up on the task status. Upon completion, you get a new_lock, which allows you to run another command. This forces a sync between both systems: you must have gotten a successful completion on the last task to run another. Therefore, we can return any resource changes in that TaskStatusResponse, and ensure the client at least got the updates.

import asyncio
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, Any, Optional, List
import json

from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from pydantic import BaseModel


class TaskStatus(str, Enum):
    RUNNING = "running"
    COMPLETE = "complete"
    FAILED = "failed"


class CommandRequest(BaseModel):
    kwargs: Dict[str, Any] = {}
    last_lock: Optional[str] = None


class CommandResponse(BaseModel):
    cmd_running: str


class TaskStatusResponse(BaseModel):
    status: TaskStatus
    new_lock: Optional[str] = None
    result: Optional[Any] = None
    error: Optional[str] = None


class WebSocketMessage(BaseModel):
    task_id: str
    status: TaskStatus
    new_lock: Optional[str] = None
    result: Optional[Any] = None
    error: Optional[str] = None


class Task:
    def __init__(self, task_id: str, command: str, kwargs: Dict[str, Any]):
        self.task_id = task_id
        self.command = command
        self.kwargs = kwargs
        self.status = TaskStatus.RUNNING
        self.created_at = datetime.now()
        self.completed_at = None
        self.result = None
        self.error = None
        self.lock = str(uuid.uuid4())


class TaskManager:
    def __init__(self):
        self.tasks: Dict[str, Task] = {}
        self.task_lock = asyncio.Lock()
        self.websocket_connections: List[WebSocket] = []
        self.last_completed_lock: Optional[str] = None

    async def create_task(self, command: str, kwargs: Dict[str, Any], last_lock: Optional[str] = None) -> str:
        if last_lock is not None and last_lock != self.last_completed_lock:
            raise HTTPException(status_code=400, detail="Invalid lock provided")

        task_id = str(uuid.uuid4())
        task = Task(task_id, command, kwargs)
        self.tasks[task_id] = task

        # Create task and store reference to prevent garbage collection
        task_coro = asyncio.create_task(self._execute_task(task))
        # Store the task reference to prevent it from being garbage collected
        if not hasattr(self, '_background_tasks'):
            self._background_tasks = set()
        self._background_tasks.add(task_coro)
        task_coro.add_done_callback(self._background_tasks.discard)
        return task_id

    async def get_task_status(self, task_id: str) -> TaskStatusResponse:
        if task_id not in self.tasks:
            raise HTTPException(status_code=404, detail="Task not found")

        task = self.tasks[task_id]
        return TaskStatusResponse(
            status=task.status,
            new_lock=task.lock if task.status == TaskStatus.COMPLETE else None,
            result=task.result,
            error=task.error
        )

    async def _execute_task(self, task: Task):
        async with self.task_lock:
            try:
                if task.command == "sleep":
                    duration = task.kwargs.get("duration", 1.0)
                    await asyncio.sleep(duration)
                    task.result = f"Slept for {duration} seconds"
                    task.status = TaskStatus.COMPLETE
                else:
                    task.error = f"Unknown command: {task.command}"
                    task.status = TaskStatus.FAILED

            except Exception as e:
                task.error = str(e)
                task.status = TaskStatus.FAILED

            task.completed_at = datetime.now()
            if task.status == TaskStatus.COMPLETE:
                self.last_completed_lock = task.lock

            await self._notify_websockets(task)

    async def _notify_websockets(self, task: Task):
        if self.websocket_connections:
            message = WebSocketMessage(
                task_id=task.task_id,
                status=task.status,
                new_lock=task.lock if task.status == TaskStatus.COMPLETE else None,
                result=task.result,
                error=task.error
            )

            disconnected = []
            for websocket in self.websocket_connections:
                try:
                    await websocket.send_text(message.model_dump_json())
                except:
                    disconnected.append(websocket)

            for ws in disconnected:
                self.websocket_connections.remove(ws)

    def add_websocket(self, websocket: WebSocket):
        self.websocket_connections.append(websocket)

    def remove_websocket(self, websocket: WebSocket):
        if websocket in self.websocket_connections:
            self.websocket_connections.remove(websocket)


app = FastAPI(title="PLR API", description="PyLabRobot API for command execution")
task_manager = TaskManager()


@app.post("/plr_api/{cmd}", response_model=CommandResponse)
async def submit_command(cmd: str, request: CommandRequest):
    task_id = await task_manager.create_task(cmd, request.kwargs, request.last_lock)
    return CommandResponse(cmd_running=task_id)


@app.get("/plr_api/tasks/{task_id}", response_model=TaskStatusResponse)
async def get_task_status(task_id: str):
    return await task_manager.get_task_status(task_id)


@app.websocket("/plr_api/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    task_manager.add_websocket(websocket)

    try:
        while True:
            await websocket.receive_text()
    except WebSocketDisconnect:
        task_manager.remove_websocket(websocket)


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

seems more complicated than it needs to be

the server can keep track of what’s busy. the client shouldn’t have to prove this

also I was chatting with @CamilloMoschner and we spoke briefly about this thread

we discussed whether it’s a failure that the client needs this complicated data model to work with a plr server. we formalized my intuition that replicating the ‘resource management system’ on the client side is unavoidable.

we have:

  1. we need a lot of information to do automation correctly
    • you can’t just say “aspirate at xyz” without having a model for arriving at those numbers. this would only work for very simple prototypes but not a powerful language
  2. we can’t create new info on server

thus:

  1. client needs a lot of information (this means: client of this new api needs to be as complex as the PLR interface layer in our python API (resource model in particular))

Hmmm, are you saying that it would be better to have an API that is simplified from the PLR models and presents an interface that doesn’t require that level of information?

Let me get this right:

  1. In order to present a friendly API, PLR creates an internal representation of the lab system (“resource model”)
  2. XYZ offsets can be created from this resource model system, which are then passed directly to the firmware
  3. The firmware executes off of basically raw XYZ

Thus, the difficulty is in the complexity of the resource models

“better” depends on your objective

in general the answer for this question is “no”

exactly

So if the API is only passing messages, this seems imminently solvable: the resource model stays pretty much in PLR on both sides. The stuff above just makes sure they stay in sync.

Ie, the complexity of resource models is already solved by PLR, the challenge is just syncing it

adding:

one reason is to have a nice API

the other is that the resource model is the way we abstract between different machines with different capabilities

you could dumb down every liquid handler to aspirate u ul at xyz, but then you have to discard fancy things like STAR’s LLD. the “aspiration” commands needs to have a reference to the container

at that point we have a universal api between liquid handlers: aspirate u ul from container C

so the reason for having a resource model (and passing it in the standard is):

  1. friendly user API
  2. it’s the only abstraction between machines that doesn’t dumb everything down (massively)

when using PLR as a client, yes syncing is the main challenge

when using another client, the complexity is also parsing and (re-)implementing the resource management system (RMS)

sure, the actual commands such as

aspirate u ul from container C

are easy to transmit if the client and server already agreed what C is

Forcing the client on the other side to be PLR doesn’t negate the usefulness of having a network interface between the two, though it is a little annoying.

Read a little more of the source code. This is actually something I worked on implementing at Trilobio, so I understand the problem.

Basically, you have a tree of positions. There is the relative position from the parent, and then you can walk up the tree to get an absolute calculation of the overall space. You also got a little matrix math to apply rotations (don’t use quaternions because you don’t need 3d space, good on you). Simple enough, if I am reading it right.

This seems simple enough to implement on a client, other than for the fact that everything is stored as initialized python objects (why no JSON :frowning: )

Probably not that important of a problem. Another though - you could just query the server for the xyz positions of a given labware

ie, just have the client ask the server about this