Skip to content
49 changes: 32 additions & 17 deletions docs/source/public_api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -229,29 +229,44 @@ This page summarises the parts of the LabThings API that should be most frequent
:return: When used as intended, the result is an `.EndpointDescriptor`.


.. py:class:: ThingServer(things: config_model.ThingsConfig, settings_folder: Optional[str] = None, application_config: Optional[collections.abc.Mapping[str, Any]] = None, debug: bool = False)
.. py:class:: ThingServer(config: ThingServerConfig, debug: bool = False)

The `ThingServer` sets up a `fastapi.FastAPI` application and uses it
to expose the capabilities of `Thing` instances over HTTP.

Full documentation of how the class works is available at `labthings_fastpi.server.ThingServer`\ . Most of the attributes of `ThingServer` should not be accessed directly by `Thing` subclasses - instead they should use the `ThingServerInterface` for a cleaner way to access the server.

:param things: A mapping of Thing names to `~lt.Thing` subclasses, or
`ThingConfig` objects specifying the subclass, its initialisation
arguments, and any connections to other `~lt.Thing`\ s.
:param settings_folder: the location on disk where `~lt.Thing`
settings will be saved.
:param application_config: A mapping containing custom configuration for the
application. This is not processed by LabThings. Each `~lt.Thing` can access
application. This is not processed by LabThings. Each `~lt.Thing` can access
this via the Thing-Server interface.
:param debug: If ``True``, set the log level for `~lt.Thing` instances to
DEBUG.

Full documentation of how the class works is available at `labthings_fastpi.server.ThingServer`\ .
Most of the attributes of `ThingServer` should not be accessed directly by `Thing` subclasses - instead they should use the `ThingServerInterface` for a cleaner way to access the server.

.. automethod:: labthings_fastapi.server.ThingServer.from_config
:no-index:
:param config: a `ThingServerConfig` instance (or compatible dictionary) setting the server's configuration.
:param debug: sets the log level for `~lt.Thing` instances to DEBUG if it is set to `True`\ .
:param \**kwargs: for backwards compatibility, keyword arguments are used to create the server configuration if `config` is missing. This raises a `DeprecationWarning`\ .

.. py:method:: from_config(config: ThingServerConfig, debug: bool = False) -> ThingServer

This method of creating a `ThingServer` is deprecated, as it is equivalent to the constructor.

:return: a `ThingServer` with the supplied configuration.

.. py:classmethod:: from_things(things: Mapping[str: ThingConfig | type[Thing] | str]) -> ThingServer

This class method allows a server to be created by passing in a mapping of Thing names to Thing configurations.
Thing configurations may be either a `Thing` subclass, or an import string (e.g. ``my.module:MyClass``), or a `ThingConfig` instance (or compatible dictionary).

.. code-block:: python

server = lt.ThingServer.from_things(
{
"my_thing": MyThingSubclass,
"their_thing": "their.module:TheirThing",
"configured_thing": {
"cls": MyThingSubclass,
"kwargs": {"a": 10},
},
}
)

:param things: a mapping of Thing names to Thing configurations.
:param \**kwargs: additional keyword arguments are passed to `ThingServerConfig`\ .


.. py:class:: ThingServerInterface(server: ThingServer, name: str)
Expand Down
2 changes: 1 addition & 1 deletion docs/source/quickstart/counter.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def slowly_increase_counter(self) -> None:
if __name__ == "__main__":
import uvicorn

server = lt.ThingServer({"counter": TestThing})
server = lt.ThingServer.from_things({"counter": TestThing})

# We run the server using `uvicorn`:
uvicorn.run(server.app, port=5000, ws="websockets-sansio")
2 changes: 1 addition & 1 deletion docs/source/thing_slots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ The following example shows the use of a `~lt.thing_slot`:


things = {"thing_a": ThingA, "thing_b": ThingB}
server = lt.ThingServer(things)
server = lt.ThingServer.from_things(things)


In this example, ``ThingB.thing_a`` is the simplest form of `~lt.thing_slot`: it
Expand Down
2 changes: 1 addition & 1 deletion docs/source/tutorial/writing_a_thing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Our first Thing will pretend to be a light: we can set its brightness and turn i
self.is_on = not self.is_on


server = lt.ThingServer({"light": Light})
server = lt.ThingServer.from_things({"light": Light})

if __name__ == "__main__":
import uvicorn
Expand Down
2 changes: 1 addition & 1 deletion src/labthings_fastapi/outputs/mjpeg_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,15 +192,15 @@
:raise ValueError: if the frame is not available.
"""
if i < 0:
raise ValueError("i must be >= 0")

Check warning on line 195 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

195 line is not covered with tests
if i < self.last_frame_i - len(self._ringbuffer) + 2:
raise ValueError("the ith frame has been overwritten")

Check warning on line 197 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

197 line is not covered with tests
if i > self.last_frame_i:
# TODO: await the ith frame
raise ValueError("the ith frame has not yet been acquired")

Check warning on line 200 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

200 line is not covered with tests
entry = self._ringbuffer[i % len(self._ringbuffer)]
if entry.index != i:
raise ValueError("the ith frame has been overwritten")

Check warning on line 203 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

203 line is not covered with tests
return entry

@asynccontextmanager
Expand Down Expand Up @@ -252,9 +252,9 @@

:return: The next JPEG frame, as a `bytes` object.
"""
i = await self.next_frame()
async with self.buffer_for_reading(i) as frame:
return copy(frame)

Check warning on line 257 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

255-257 lines are not covered with tests

async def next_frame_size(self) -> int:
"""Wait for the next frame and return its size.
Expand All @@ -263,9 +263,9 @@

:return: The size of the next JPEG frame, in bytes.
"""
i = await self.next_frame()
async with self.buffer_for_reading(i) as frame:
return len(frame)

Check warning on line 268 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

266-268 lines are not covered with tests

async def frame_async_generator(self) -> AsyncGenerator[bytes, None]:
"""Yield frames as bytes objects.
Expand All @@ -292,12 +292,12 @@
yield frame
except StopAsyncIteration:
break
except Exception as e: # noqa: BLE001

Check warning on line 295 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

295 line is not covered with tests
# It's important that errors in the stream don't crash the server.
# This may be something we can remove in the future, now streams stop
# more elegantly. However, it will require careful testing.f
logging.exception(f"Error in stream: {e}, stream stopped")
return

Check warning on line 300 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

299-300 lines are not covered with tests

async def mjpeg_stream_response(self) -> MJPEGStreamResponse:
"""Return a StreamingResponse that streams an MJPEG stream.
Expand Down Expand Up @@ -331,7 +331,7 @@
and frame[-2] == 0xFF
and frame[-1] == 0xD9
):
raise ValueError("Invalid JPEG")

Check warning on line 334 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

334 line is not covered with tests
with self._lock:
entry = self._ringbuffer[(self.last_frame_i + 1) % len(self._ringbuffer)]
entry.timestamp = datetime.now()
Expand Down Expand Up @@ -359,7 +359,7 @@
:raises RuntimeError: if the stream is still streaming.
"""
if self._streaming is True:
raise RuntimeError(

Check warning on line 362 in src/labthings_fastapi/outputs/mjpeg_stream.py

View workflow job for this annotation

GitHub Actions / coverage

362 line is not covered with tests
"This function should only be called when the stream is stopped."
)
async with self.condition:
Expand Down Expand Up @@ -456,7 +456,7 @@
stream = MJPEGStreamDescriptor()


server = lt.ThingServer({"camera": Camera})
server = lt.ThingServer.from_things({"camera": Camera})

:param app: the `fastapi.FastAPI` application to which we are being added.
:param thing: the host `~lt.Thing` instance.
Expand Down
195 changes: 162 additions & 33 deletions src/labthings_fastapi/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@
See the :ref:`tutorial` for examples of how to set up a `~lt.ThingServer`.
"""

from __future__ import annotations
from typing import Any, AsyncGenerator, Optional, TypeVar
import warnings
from fastapi.testclient import TestClient
from pydantic import ValidationError
from typing import Any, AsyncGenerator, Optional, TypeVar, overload
from typing_extensions import Self
import os
import logging

from fastapi import APIRouter, FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from anyio.from_thread import BlockingPortal
from contextlib import asynccontextmanager, AsyncExitStack
from collections.abc import Mapping, Sequence
from contextlib import asynccontextmanager, AsyncExitStack, contextmanager
from collections.abc import Iterator, Mapping, Sequence
from types import MappingProxyType
import uvicorn

from ..middleware.url_for import url_for_middleware
from ..thing_slots import ThingSlot
Expand Down Expand Up @@ -61,50 +64,82 @@ class ThingServer:
an `anyio.from_thread.BlockingPortal`.
"""

@overload
def __init__(self, config: ThingServerConfig, *, debug: bool = False) -> None: ...

@overload
def __init__(self, *, debug: bool = False, **kwargs: Any) -> None: ...

def __init__(
self,
things: ThingsConfig,
settings_folder: Optional[str] = None,
api_prefix: str = "",
application_config: Optional[Mapping[str, Any]] = None,
config: ThingServerConfig | None = None,
*,
debug: bool = False,
**kwargs: Any,
) -> None:
r"""Initialise a LabThings server.

The `~lt.ThingServer` is responsible for running the code in `~lt.Thing`
instances, and making them available over the network. It should be configured
by passing a `~lt.ThingServerConfig` object (or a dictionary that can
be validated as a `~lt.ThingServerConfig` object).

For convenience and backwards compatibility, if `config` is `None` the keyword
arguments will be passed to `~lt.ThingServerConfig` instead. Keyword arguments
may not be used if the `config` argument is used, and may be removed in the
future.

Setting up the `~lt.ThingServer` involves creating the underlying
`fastapi.FastAPI` app, setting its lifespan function (used to
set up and shut down the `~lt.Thing` instances), and configuring it
to allow cross-origin requests.

We also create the `.ActionManager` to manage :ref:`actions` and the
`.BlobManager` to manage the downloading of :ref:`blobs`.

:param things: A mapping of Thing names to `~lt.Thing` subclasses, or
`~lt.ThingConfig` objects specifying the subclass, its initialisation
arguments, and any connections to other `~lt.Thing`\ s.
:param settings_folder: the location on disk where `~lt.Thing`
settings will be saved.
:param api_prefix: A prefix for all API routes. This must either
be empty, or start with a slash and not end with a slash.
:param application_config: A mapping containing custom configuration for the
application. This is not processed by LabThings. Each `~lt.Thing` can
access this via the Thing-Server interface.
:param debug: If ``True``, set the log level for `~lt.Thing` instances to
DEBUG.
:param config: a `~lt.ThingServerConfig` object that configures the server,
or something that may be converted to one.
:param debug: ff ``True``, set the log level for `~lt.Thing` instances to
DEBUG.
:param \**kwargs: ff keyword arguments are supplied, they will be passed
to the constructor of `~lt.ThingServerConfig`\ . This is not allowed
if `config` is a `~lt.ThingServerConfig` object.

:raises TypeError: if the value of `config` cannot be parsed as a
`~lt.ThingServerConfig`\ .
:raises ValueError: if keyword arguments are supplied together with `config`\ .
"""
self.startup_failure: dict | None = None
self._debug = debug
# Note: this is safe to call multiple times.
configure_thing_logger(logging.DEBUG if debug else None)
self._config = ThingServerConfig(
things=things,
settings_folder=settings_folder,
api_prefix=api_prefix,
application_config=application_config,
)
configure_thing_logger(logging.DEBUG if self._debug else None)
if config is not None:
try:
self._config = ThingServerConfig.model_validate(config)
except ValidationError as e:
raise TypeError(
"The value passed to `ThingServer()` could not be validated as "
"a server configuration. If you are passing a dictionary of "
"Things, this must be done using `ThingServer.from_things` instead."
) from e
if kwargs != {}:
raise ValueError(
f"Extra keyword arguments supplied to `ThingServer()`: {kwargs}. "
"When a `ThingServerConfig` object is specified, no extra keyword "
"arguments may be supplied."
)
else:
warnings.warn(
DeprecationWarning(
"`ThingServer` should be initialised with the `config` parameter. "
"Taking configuration options from keyword arguments will be "
"removed in a future release."
),
stacklevel=2,
)
self._config = ThingServerConfig(**kwargs)
if self._config.settings_folder is None:
self._config.settings_folder = "./settings"
self.app = FastAPI(lifespan=self.lifespan)
self._set_cors_middleware()
self._set_url_for_middleware()
self.settings_folder = settings_folder or "./settings"
self.action_manager = ActionManager()
self.app.include_router(self.action_manager.router(), prefix=self._api_prefix)
self.app.include_router(blob.router, prefix=self._api_prefix)
Expand All @@ -117,18 +152,56 @@ def __init__(
self._connect_things()
self._attach_things_to_server()

@classmethod
def from_things(
cls,
things: ThingsConfig,
debug: bool = False,
**kwargs: Any,
) -> Self:
r"""Create a ThingServer using a dictionary of `~lt.Thing` subclasses.

In test and example code, it's convenient to be able to pass server and
`Thing` configurations as keyword arguments rather than a config model.

This convenience method will turn its keyword arguments into a server
configuration and create a server based on it.

:param things: A mapping of names to `Thing` configurations. These may
be specified as a `~lt.ThingConfig` object, a `~lt.Thing` subclass,
or an import string referencing a `~lt.Thing` subclass.
:param debug: Whether to start the server in debug mode.
:param \**kwargs: Additional keyword arguments are passed to
`~lt.ThingServerConfig`\ .
:return: a `~lt.ThingServer` instance.
"""
return cls(
ThingServerConfig(
things=things,
**kwargs,
),
debug=debug,
)

@classmethod
def from_config(cls, config: ThingServerConfig, debug: bool = False) -> Self:
r"""Create a ThingServer from a configuration model.

This is equivalent to ``ThingServer(**dict(config))``\ .
This is equivalent to ``ThingServer(config, debug=debug)``\ .

:param config: The configuration parameters for the server.
:param debug: If ``True``, set the log level for `~lt.Thing` instances to
DEBUG.
:return: A `~lt.ThingServer` configured as per the model.
"""
return cls(**dict(config), debug=debug)
warnings.warn(
DeprecationWarning(
"`ThingServer.from_config()` is redundant and will be removed in "
"a future release. Use `ThingServer()` instead."
),
stacklevel=2,
)
return cls(config, debug=debug)

def _set_cors_middleware(self) -> None:
"""Configure the server to allow requests from other origins.
Expand Down Expand Up @@ -157,6 +230,26 @@ def _set_url_for_middleware(self) -> None:
"""
self.app.middleware("http")(url_for_middleware)

@property
def debug(self) -> bool:
"""Whether the server is in debug mode."""
return self._debug

@property
def settings_folder(self) -> str:
"""The folder in which we will store `Thing` settings.

:raises RuntimeError: if there is no settings folder set.
This should never happen, as it's set in `__init__`.
"""
if self._config.settings_folder is None:
raise RuntimeError(
"The settings folder should be set during initialisation. "
"This may indicate a LabThings bug, or incorrect subclassing "
"of `ThingServer`."
)
return self._config.settings_folder

@property
def things(self) -> Mapping[str, Thing]:
"""Return a dictionary of all the things.
Expand Down Expand Up @@ -384,3 +477,39 @@ def thing_paths(request: Request) -> Mapping[str, str]:
}

return router

def serve(self, host: str = "localhost", port: int = 5000) -> None:
r"""Run the server in `uvicorn`\ .

This method will run the server from Python, using `uvicorn.run`\ .
This is the most convenient way to run a LabThings server from Python, and
is identical to what happens when it is run from the command line.

:param host: The IP address or hostname on which to serve. By default, this
is ``localhost`` which is only accessible from your computer. To serve
over a network on all available IPv4 addresses, use ``"0.0.0.0"``.
:param port: The port on which to serve. This defaults to 5000.
"""
uvicorn.run(self.app, host=host, port=port, ws="websockets-sansio")

@contextmanager
def test_client(self) -> Iterator[TestClient]:
"""A context manager to test out a server without binding to a port.

This context manager will start up the server and run an event loop, but
instead of responding to requests on a network port, it uses
`fastapi.testclient.TestClient` to simulate HTTP requests.

This is provided to simplify test code, and should not be used in production.

:yields: a `fastapi.testclient.TestClient` to simulate HTTP requests.

.. warning::

Usually, a server is only started up and shut down once. Calling this
method multiple times may have unexpected results.

As a rule, only ever use this method in your test suite.
"""
with TestClient(self.app) as client:
yield client
Loading
Loading