BaseClientOrServer¶
- class lsst.ts.tcpip.BaseClientOrServer(*, log: Logger, connect_callback: Callable[[BaseClientOrServer], None | Awaitable[None]] | None = None, monitor_connection_interval: float = 0.1, name: str = '', do_start: bool = True, encoding: str = 'utf-8', terminator: bytes = b'\r\n', **kwargs: Any)¶
Bases:
ABC
Abstract base class for a TCP/IP client or server.
Manage a stream reader and writer:
Provide high-level methods for reading and writing.
Optionally call a callback function when the connection state changes.
Optionally monitor the connection in the background.
- Parameters:
- log
logging.Logger
Logger.
- connect_callbackcallable or
None
, optional Asynchronous function to call when the connection state changes. The function receives one argument: this
BaseClientOrServer
.Note: if the connection is unexpectedly lost and you are not reading from the socket, it may take
monitor_connection_interval
seconds or longer to notice.- monitor_connection_interval
float
, optional Interval between checking if the connection is still alive (seconds). Defaults to DEFAULT_MONITOR_CONNECTION_INTERVAL. If ≤ 0 then do not monitor the connection at all.
Monitoring is only useful if you do not regularly read from the reader using the read methods of this class.
- name
str
Optional name used for log messages.
- do_start
bool
, optional Call start in the constructor? Normally True (the default), but set False by Client when creating an already-closed Client.
- encoding
str
- The encoding used by
read_str
andwrite_str
,read_json
, and
write_json
.
- The encoding used by
- terminator
bytes
- The terminator used by
read_str
andwrite_str
,read_json
, and
write_json
.
- The terminator used by
- **kwargs
dict
[str
,typing.Any
] Keyword arguments for start_task.
- log
- Raises:
TypeError
If
connect_callback
is synchronous.
Notes
Always wait for
start_task
after constructing an instance, before using the instance.This class provides high-level read and write methods that monitor the connection (to call
connect_callback
as needed) and reject any attempt to read or write if not connected. Please use them.This class can be used as an async context manager, which may be useful for unit tests.
Subclasses should call
_start_monitoring_connection
from thestart
method, if monitoring is needed.- Attributes:
- log
logging.Logger
A child of the
log
constructor argument.- name
str
The
name
constructor argument.- encoding
str
The
encoding
constructor argument.- terminator
bytes
The
terminator
constructor argument.- reader
asyncio.StreamReader
or None Stream reader to read data from the server. This will be a stream reader (not None) if
connected
is True.- writer
asyncio.StreamWriter
or None Stream writer to write data to the server. This will be a stream writer (not None) if
connected
is True.- start_task
asyncio.Future
Future that is set done when:
The connection is made, for the
Client
subclass.The server is ready to receive connections, for Server subclasses.
- done_task
asyncio.Future
Future that is set done when this instance is closed, at which point the instance is no longer usable.
- should_be_connected
bool
This flag helps you determine if you unexpectedly lost the connection (e.g. if the other end hung up). It is set true when the connection is made and false when you call
close
, orOneClientServer.close_client
. The connection was unexpectedy lost ifconnected
is false andshould_be_connected
is true.If your CSC unexpectedly loses its connection to a low-level controller, you should send the CSC to fault state.
- log
Attributes Summary
Return True if self._reader and self._writer are connected.
Methods Summary
Call self.__connect_callback.
close
()Close the client or server, making it unusable.
read
(n)Read up to n bytes.
read_into
(struct)Read binary data from a stream reader into a
ctypes.Structure
.Read JSON data.
read_str
()Read and decode a terminated str; strip the terminator.
readexactly
(n)Read exactly n bytes.
readline
()Read a sequence of bytes ending with
\n
.readuntil
([separator])Read one line, where “line” is a sequence of bytes ending with
separator
.start
(**kwargs)Start asynchronous processes.
write
(data)Write data and call
drain
.write_from
(*structs)Write binary data from one or more
ctypes.Structure
s.write_json
(data)Write data in JSON format.
write_str
(line)Encode, terminate, and write a str.
writelines
(lines)Write an iterable of bytes and call
drain
.Attributes Documentation
- connected¶
Return True if self._reader and self._writer are connected.
Note: if the other end drops the connection and if you are not trying to read data (e.g. in a background loop), then it takes the operating system awhile to realize the connection is lost. So this can return true for some unknown time after the connection has been dropped.
Methods Documentation
- async call_connect_callback() None ¶
Call self.__connect_callback.
This is always safe to call. It only calls the callback function if that function is not None and if the connection state has changed since the last time this method was called.
- async read(n: int) bytes ¶
Read up to n bytes.
- Parameters:
- n
int
The number of bytes to read. If -1 then block until the other end closes its writer, then return all data seen.
- n
- Raises:
ConnectionError
If the connection is lost before, or while, reading.
- async read_into(struct: Structure) None ¶
Read binary data from a stream reader into a
ctypes.Structure
.- Parameters:
- struct
ctypes.Structure
Structure to set.
- struct
- Raises:
ConnectionError
If the connection is lost before, or while, reading.
asyncio.IncompleteReadError
If EOF is reached before
n
bytes can be read. Use theIncompleteReadError.partial
attribute to get the partially read data.
- async read_json() Any ¶
Read JSON data.
Read the data with
read_str
and return the json-decoded result.- Returns:
- data
typing.Any
Data decoded from JSON.
- data
- Raises:
ConnectionError
If the connection is lost before, or while, reading.
asyncio.IncompleteReadError
If EOF is reached before the complete separator is found and the internal buffer is reset.
LimitOverrunError
If the amount of data read exceeds the configured stream lmit. The data is left in the internal buffer and can be read again.
TypeError
If the data are of a type that cannot be decoded from JSON.
json.JSONDecodeError
If the data cannot be decoded from JSON.
- async read_str() str ¶
Read and decode a terminated str; strip the terminator.
Read until
self.terminator
, strip the terminator, and decode the data asself.encoding
with strict error handling.- Returns:
- line
str
Line of data, as a str with the terminator stripped.
- line
- Raises:
ConnectionError
If the connection is lost before, or while, reading.
asyncio.IncompleteReadError
If EOF is reached before the complete separator is found and the internal buffer is reset.
LimitOverrunError
If the amount of data read exceeds the configured stream lmit. The data is left in the internal buffer and can be read again.
UnicodeError
If decoding fails.
- async readexactly(n: int) bytes ¶
Read exactly n bytes.
- Parameters:
- n
int
The number of bytes to read.
- n
- Raises:
ConnectionError
If the connection is lost before, or while, reading.
asyncio.IncompleteReadError
If EOF is reached before
n
bytes can be read. Use theIncompleteReadError.partial
attribute to get the partially read data.
- async readline() bytes ¶
Read a sequence of bytes ending with
\n
.If EOF is received and
\n
was not found, the method returns partially read data.- Raises:
ConnectionError
If the connection is lost before, or while, reading.
- async readuntil(separator: bytes = b'\n') bytes ¶
Read one line, where “line” is a sequence of bytes ending with
separator
.Read data from the stream until separator is found.
On success, the data and separator will be removed from the internal buffer (consumed). Returned data will include the separator at the end.
See also
read_str
, which is more convenient for most use cases.- Parameters:
- separator
bytes
The desired separator. The default matches the standard library, rather than using
terminator
.
- separator
- Raises:
ConnectionError
If the connection is lost before, or while, reading.
asyncio.IncompleteReadError
If EOF is reached before the complete separator is found and the internal buffer is reset.
LimitOverrunError
If the amount of data read exceeds the configured stream lmit. The data is left in the internal buffer and can be read again.
- abstract async start(**kwargs: Any) None ¶
Start asynchronous processes.
This is called automatically by the constructor.
A server should construct and start the server. A client should connect and set reader and writer. Both should call
_start_monitoring_connection
on success. Both should raiseRuntimeError
ifstart
has already been called.- Raises:
RuntimeError
If already called.
- async write(data: bytes) None ¶
Write data and call
drain
.- Parameters:
- data
bytes
The data to write.
- data
- Raises:
ConnectionError
If
self.connected
false before writing begins.
- async write_from(*structs: Structure) None ¶
Write binary data from one or more
ctypes.Structure
s.- Parameters:
- structs
list
[ctypes.Structure
] Structures to write.
- structs
- Raises:
ConnectionError
If
self.connected
false before writing begins.
- async write_json(data: Any) None ¶
Write data in JSON format.
Encode the data as json and write the result with
write_str
.- Parameters:
- data
any
The data to be written. Typically a dict, but any json-encodable data is acceptable.
- data
- Raises:
ConnectionError
If the connection is lost before, or while, reading.
UnicodeError
If encoding fails.
json.JSONEncodeError
If the data cannot be json-encoded.
- async write_str(line: str) None ¶
Encode, terminate, and write a str.
Encode the str as
self.encoding
with strict error handling, and appendself.terminator
.- Parameters:
- line
str
The line of data to be written.
- line
- Raises:
ConnectionError
If the connection is lost before, or while, reading.
UnicodeError
If encoding fails.
- async writelines(lines: Iterable) None ¶
Write an iterable of bytes and call
drain
.- Parameters:
- lines
collections.abc.Iterable
[bytes
] The data to write, as an iterable collection of
bytes
.
- lines
- Raises:
ConnectionError
If
self.connected
false before writing begins.