OneClientReadLoopServer

class lsst.ts.tcpip.OneClientReadLoopServer(*, port: int | None, host: str | None = '127.0.0.1', log: Logger, connect_callback: Callable[[BaseClientOrServer], Awaitable[None]] | None = None, name: str = '', encoding: str = 'utf-8', terminator: bytes = b'\r\n', **kwargs: Any)

Bases: OneClientServer

A OneClientServer that reads and processes data in a loop.

Parameters:
portint

IP port for this server. If 0 then randomly pick an available port (or ports, if listening on multiple sockets). 0 is strongly recommended for unit tests.

hoststr or None

IP address for this server. The default is DEFAULT_LOCALHOST. Specify LOCALHOST_IPV4 to force IPV4 or LOCALHOST_IPV6 for IPV6. If None then bind to all network interfaces (e.g. listen on an IPv4 socket and an IPv6 socket). Warning: None can cause trouble with port=0; see port in the Attributes section for more information.

loglogging.Logger

Logger.

connect_callbackcallable or None, optional

Asynchronous or (deprecated) synchronous function to call when when a client connects or disconnects. If the other end (client) closes the connection, it may take monitor_connection_interval seconds or longer to notice. The function receives one argument: this OneClientServer.

namestr, optional

Name used for log messages, e.g. “Commands” or “Telemetry”.

encodingstr
The encoding used by read_str and write_str, read_json,

and write_json.

terminatorbytes
The terminator used by read_str and write_str, read_json,

and write_json.

**kwargsdict [str, typing.Any]

Additional keyword arguments for asyncio.start_server, beyond host and port.

Attributes:
read_loop_taskasyncio.Future

A task that reads data from the reader in a loop.

plus…

Attributes provided by parent classes OneClientServer.

Attributes Summary

connected

Return True if self._reader and self._writer are connected.

Methods Summary

basic_close_client()

Close the connected client socket, if any.

call_connect_callback()

A client has connected or disconnected.

close()

Close socket server and client socket, and set done_task done.

close_client([cancel_read_loop_task])

Stop the read loop and close the client.

read(n)

Read up to n bytes.

read_and_dispatch()

Read, parse, and dispatch one item of data.

read_into(struct)

Read binary data from a stream reader into a ctypes.Structure.

read_json()

Read JSON data.

read_loop()

Read incoming data and handle them.

read_str()

Read and decode a terminated str; strip the terminator.

readexactly(n)

Read exactly n bytes.

readline()

Read a sequence of bytes ending with \n.

readuntil([separator])

Read one line, where “line” is a sequence of bytes ending with separator.

start(**kwargs)

Start the TCP/IP server.

write(data)

Write data and call drain.

write_from(*structs)

Write binary data from one or more ctypes.Structures.

write_json(data)

Write data in JSON format.

write_str(line)

Encode, terminate, and write a str.

writelines(lines)

Write an iterable of bytes and call drain.

Attributes Documentation

connected

Return True if self._reader and self._writer are connected.

Note: if the other end drops the connection and if you are not trying to read data (e.g. in a background loop), then it takes the operating system awhile to realize the connection is lost. So this can return true for some unknown time after the connection has been dropped.

Methods Documentation

async basic_close_client() None

Close the connected client socket, if any.

Also:

  • Reset self.connected_task to a new Future.

  • Call connect_callback, if a client was connected.

Unlike close_client, this does not touch self.should_be_connected.

Always safe to call.

async call_connect_callback() None

A client has connected or disconnected.

async close() None

Close socket server and client socket, and set done_task done.

Call connect_callback if a client was connected.

Always safe to call.

async close_client(cancel_read_loop_task: bool = True) None

Stop the read loop and close the client.

Parameters:
cancel_read_loop_taskbool

Cancel the read loop task or not? Defaults to True and should be False when called from the read loop task itself.

async read(n: int) bytes

Read up to n bytes.

Parameters:
nint

The number of bytes to read. If -1 then block until the other end closes its writer, then return all data seen.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

abstract async read_and_dispatch() None

Read, parse, and dispatch one item of data.

Subclasses need to implement this method such that it reads and parses data and then dispatches handling the data to a method suitable for the subclass. Methods that might be helpful include:

async read_into(struct: Structure) None

Read binary data from a stream reader into a ctypes.Structure.

Parameters:
structctypes.Structure

Structure to set.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

asyncio.IncompleteReadError

If EOF is reached before n bytes can be read. Use the IncompleteReadError.partial attribute to get the partially read data.

async read_json() Any

Read JSON data.

Read the data with read_str and return the json-decoded result.

Returns:
datatyping.Any

Data decoded from JSON.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

asyncio.IncompleteReadError

If EOF is reached before the complete separator is found and the internal buffer is reset.

LimitOverrunError

If the amount of data read exceeds the configured stream lmit. The data is left in the internal buffer and can be read again.

TypeError

If the data are of a type that cannot be decoded from JSON.

json.JSONDecodeError

If the data cannot be decoded from JSON.

async read_loop() None

Read incoming data and handle them.

The actual reading is deferred to the read_and_dispatch method and needs to be implemented by subclasses.

async read_str() str

Read and decode a terminated str; strip the terminator.

Read until self.terminator, strip the terminator, and decode the data as self.encoding with strict error handling.

Returns:
linestr

Line of data, as a str with the terminator stripped.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

asyncio.IncompleteReadError

If EOF is reached before the complete separator is found and the internal buffer is reset.

LimitOverrunError

If the amount of data read exceeds the configured stream lmit. The data is left in the internal buffer and can be read again.

UnicodeError

If decoding fails.

async readexactly(n: int) bytes

Read exactly n bytes.

Parameters:
nint

The number of bytes to read.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

asyncio.IncompleteReadError

If EOF is reached before n bytes can be read. Use the IncompleteReadError.partial attribute to get the partially read data.

async readline() bytes

Read a sequence of bytes ending with \n.

If EOF is received and \n was not found, the method returns partially read data.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

async readuntil(separator: bytes = b'\n') bytes

Read one line, where “line” is a sequence of bytes ending with separator.

Read data from the stream until separator is found.

On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end.

See also read_str, which is more convenient for most use cases.

Parameters:
separatorbytes

The desired separator. The default matches the standard library, rather than using terminator.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

asyncio.IncompleteReadError

If EOF is reached before the complete separator is found and the internal buffer is reset.

LimitOverrunError

If the amount of data read exceeds the configured stream lmit. The data is left in the internal buffer and can be read again.

async start(**kwargs: Any) None

Start the TCP/IP server.

This is called automatically by the constructor, and is not intended to be called by the user. It is a public method so that subclasses can override it.

Parameters:
**kwargsdict [str, typing.Any]

Additional keyword arguments for asyncio.start_server, beyond host and port.

Raises:
RuntimeError

If start has already been called and has successfully constructed a server.

async write(data: bytes) None

Write data and call drain.

Parameters:
databytes

The data to write.

Raises:
ConnectionError

If self.connected false before writing begins.

async write_from(*structs: Structure) None

Write binary data from one or more ctypes.Structures.

Parameters:
structslist [ctypes.Structure]

Structures to write.

Raises:
ConnectionError

If self.connected false before writing begins.

async write_json(data: Any) None

Write data in JSON format.

Encode the data as json and write the result with write_str.

Parameters:
dataany

The data to be written. Typically a dict, but any json-encodable data is acceptable.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

UnicodeError

If encoding fails.

json.JSONEncodeError

If the data cannot be json-encoded.

async write_str(line: str) None

Encode, terminate, and write a str.

Encode the str as self.encoding with strict error handling, and append self.terminator.

Parameters:
linestr

The line of data to be written.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

UnicodeError

If encoding fails.

async writelines(lines: Iterable) None

Write an iterable of bytes and call drain.

Parameters:
linescollections.abc.Iterable [bytes]

The data to write, as an iterable collection of bytes.

Raises:
ConnectionError

If self.connected false before writing begins.