BaseClientOrServer

class lsst.ts.tcpip.BaseClientOrServer(*, log: Logger, connect_callback: Callable[[BaseClientOrServer], None | Awaitable[None]] | None = None, monitor_connection_interval: float = 0.1, name: str = '', do_start: bool = True, encoding: str = 'utf-8', terminator: bytes = b'\r\n', **kwargs: Any)

Bases: ABC

Abstract base class for a TCP/IP client or server.

Manage a stream reader and writer:

  • Provide high-level methods for reading and writing.

  • Optionally call a callback function when the connection state changes.

  • Optionally monitor the connection in the background.

Parameters:
loglogging.Logger

Logger.

connect_callbackcallable or None, optional

Asynchronous function to call when the connection state changes. The function receives one argument: this BaseClientOrServer.

Note: if the connection is unexpectedly lost and you are not reading from the socket, it may take monitor_connection_interval seconds or longer to notice.

monitor_connection_intervalfloat, optional

Interval between checking if the connection is still alive (seconds). Defaults to DEFAULT_MONITOR_CONNECTION_INTERVAL. If ≤ 0 then do not monitor the connection at all.

Monitoring is only useful if you do not regularly read from the reader using the read methods of this class.

namestr

Optional name used for log messages.

do_startbool, optional

Call start in the constructor? Normally True (the default), but set False by Client when creating an already-closed Client.

encodingstr
The encoding used by read_str and write_str, read_json,

and write_json.

terminatorbytes
The terminator used by read_str and write_str, read_json,

and write_json.

**kwargsdict [str, typing.Any]

Keyword arguments for start_task.

Raises:
TypeError

If connect_callback is synchronous.

Notes

Always wait for start_task after constructing an instance, before using the instance.

This class provides high-level read and write methods that monitor the connection (to call connect_callback as needed) and reject any attempt to read or write if not connected. Please use them.

This class can be used as an async context manager, which may be useful for unit tests.

Subclasses should call _start_monitoring_connection from the start method, if monitoring is needed.

Attributes:
loglogging.Logger

A child of the log constructor argument.

namestr

The name constructor argument.

encodingstr

The encoding constructor argument.

terminatorbytes

The terminator constructor argument.

readerasyncio.StreamReader or None

Stream reader to read data from the server. This will be a stream reader (not None) if connected is True.

writerasyncio.StreamWriter or None

Stream writer to write data to the server. This will be a stream writer (not None) if connected is True.

start_taskasyncio.Future

Future that is set done when:

  • The connection is made, for the Client subclass.

  • The server is ready to receive connections, for Server subclasses.

done_taskasyncio.Future

Future that is set done when this instance is closed, at which point the instance is no longer usable.

should_be_connectedbool

This flag helps you determine if you unexpectedly lost the connection (e.g. if the other end hung up). It is set true when the connection is made and false when you call close, or OneClientServer.close_client. The connection was unexpectedy lost if connected is false and should_be_connected is true.

If your CSC unexpectedly loses its connection to a low-level controller, you should send the CSC to fault state.

Attributes Summary

connected

Return True if self._reader and self._writer are connected.

Methods Summary

call_connect_callback()

Call self.__connect_callback.

close()

Close the client or server, making it unusable.

read(n)

Read up to n bytes.

read_into(struct)

Read binary data from a stream reader into a ctypes.Structure.

read_json()

Read JSON data.

read_str()

Read and decode a terminated str; strip the terminator.

readexactly(n)

Read exactly n bytes.

readline()

Read a sequence of bytes ending with \n.

readuntil([separator])

Read one line, where “line” is a sequence of bytes ending with separator.

start(**kwargs)

Start asynchronous processes.

write(data)

Write data and call drain.

write_from(*structs)

Write binary data from one or more ctypes.Structures.

write_json(data)

Write data in JSON format.

write_str(line)

Encode, terminate, and write a str.

writelines(lines)

Write an iterable of bytes and call drain.

Attributes Documentation

connected

Return True if self._reader and self._writer are connected.

Note: if the other end drops the connection and if you are not trying to read data (e.g. in a background loop), then it takes the operating system awhile to realize the connection is lost. So this can return true for some unknown time after the connection has been dropped.

Methods Documentation

async call_connect_callback() None

Call self.__connect_callback.

This is always safe to call. It only calls the callback function if that function is not None and if the connection state has changed since the last time this method was called.

abstract async close() None

Close the client or server, making it unusable.

async read(n: int) bytes

Read up to n bytes.

Parameters:
nint

The number of bytes to read. If -1 then block until the other end closes its writer, then return all data seen.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

async read_into(struct: Structure) None

Read binary data from a stream reader into a ctypes.Structure.

Parameters:
structctypes.Structure

Structure to set.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

asyncio.IncompleteReadError

If EOF is reached before n bytes can be read. Use the IncompleteReadError.partial attribute to get the partially read data.

async read_json() Any

Read JSON data.

Read the data with read_str and return the json-decoded result.

Returns:
datatyping.Any

Data decoded from JSON.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

asyncio.IncompleteReadError

If EOF is reached before the complete separator is found and the internal buffer is reset.

LimitOverrunError

If the amount of data read exceeds the configured stream lmit. The data is left in the internal buffer and can be read again.

TypeError

If the data are of a type that cannot be decoded from JSON.

json.JSONDecodeError

If the data cannot be decoded from JSON.

async read_str() str

Read and decode a terminated str; strip the terminator.

Read until self.terminator, strip the terminator, and decode the data as self.encoding with strict error handling.

Returns:
linestr

Line of data, as a str with the terminator stripped.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

asyncio.IncompleteReadError

If EOF is reached before the complete separator is found and the internal buffer is reset.

LimitOverrunError

If the amount of data read exceeds the configured stream lmit. The data is left in the internal buffer and can be read again.

UnicodeError

If decoding fails.

async readexactly(n: int) bytes

Read exactly n bytes.

Parameters:
nint

The number of bytes to read.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

asyncio.IncompleteReadError

If EOF is reached before n bytes can be read. Use the IncompleteReadError.partial attribute to get the partially read data.

async readline() bytes

Read a sequence of bytes ending with \n.

If EOF is received and \n was not found, the method returns partially read data.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

async readuntil(separator: bytes = b'\n') bytes

Read one line, where “line” is a sequence of bytes ending with separator.

Read data from the stream until separator is found.

On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end.

See also read_str, which is more convenient for most use cases.

Parameters:
separatorbytes

The desired separator. The default matches the standard library, rather than using terminator.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

asyncio.IncompleteReadError

If EOF is reached before the complete separator is found and the internal buffer is reset.

LimitOverrunError

If the amount of data read exceeds the configured stream lmit. The data is left in the internal buffer and can be read again.

abstract async start(**kwargs: Any) None

Start asynchronous processes.

This is called automatically by the constructor.

A server should construct and start the server. A client should connect and set reader and writer. Both should call _start_monitoring_connection on success. Both should raise RuntimeError if start has already been called.

Raises:
RuntimeError

If already called.

async write(data: bytes) None

Write data and call drain.

Parameters:
databytes

The data to write.

Raises:
ConnectionError

If self.connected false before writing begins.

async write_from(*structs: Structure) None

Write binary data from one or more ctypes.Structures.

Parameters:
structslist [ctypes.Structure]

Structures to write.

Raises:
ConnectionError

If self.connected false before writing begins.

async write_json(data: Any) None

Write data in JSON format.

Encode the data as json and write the result with write_str.

Parameters:
dataany

The data to be written. Typically a dict, but any json-encodable data is acceptable.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

UnicodeError

If encoding fails.

json.JSONEncodeError

If the data cannot be json-encoded.

async write_str(line: str) None

Encode, terminate, and write a str.

Encode the str as self.encoding with strict error handling, and append self.terminator.

Parameters:
linestr

The line of data to be written.

Raises:
ConnectionError

If the connection is lost before, or while, reading.

UnicodeError

If encoding fails.

async writelines(lines: Iterable) None

Write an iterable of bytes and call drain.

Parameters:
linescollections.abc.Iterable [bytes]

The data to write, as an iterable collection of bytes.

Raises:
ConnectionError

If self.connected false before writing begins.