Source code for ld_eventsource.config.connect_strategy

from __future__ import annotations
from logging import Logger
from typing import Callable, Iterator, Optional, Union
from urllib3 import PoolManager

from ld_eventsource.http import _HttpClientImpl, _HttpConnectParams


[docs]class ConnectStrategy: """ An abstraction for how :class:`.SSEClient` should obtain an input stream. The default implementation is :meth:`http()`, which makes HTTP requests with ``urllib3``. Or, if you want to consume an input stream from some other source, you can create your own subclass of :class:`ConnectStrategy`. Instances of this class should be immutable and should not contain any state that is specific to one active stream. The :class:`ConnectionClient` that they produce is stateful and belongs to a single :class:`.SSEClient`. """
[docs] def create_client(self, logger: Logger) -> ConnectionClient: """ Creates a client instance. This is called once when an :class:`.SSEClient` is created. The SSEClient returns the returned :class:`ConnectionClient` and uses it to perform all subsequent connection attempts. :param logger: the logger being used by the SSEClient """ raise NotImplementedError("ConnectStrategy base class cannot be used by itself")
[docs] @staticmethod def http( url: str, headers: Optional[dict]=None, pool: Optional[PoolManager]=None, urllib3_request_options: Optional[dict]=None ) -> ConnectStrategy: """ Creates the default HTTP implementation, specifying request parameters. :param url: the stream URL :param headers: optional HTTP headers to add to the request :param pool: optional urllib3 ``PoolManager`` to provide an HTTP client :param urllib3_request_options: optional ``kwargs`` to add to the ``request`` call; these can include any parameters supported by ``urllib3``, such as ``timeout`` """ return _HttpConnectStrategy(_HttpConnectParams(url, headers, pool, urllib3_request_options))
[docs]class ConnectionClient: """ An object provided by :class:`.ConnectStrategy` that is retained by a single :class:`.SSEClient` to perform all connection attempts by that instance. For the default HTTP implementation, this represents an HTTP connection pool. """
[docs] def connect(self, last_event_id: Optional[str]) -> ConnectionResult: """ Attempts to connect to a stream. Raises an exception if unsuccessful. :param last_event_id: the current value of :attr:`SSEClient.last_event_id` (should be sent to the server to support resuming an interrupted stream) :return: a :class:`ConnectionResult` representing the stream """ raise NotImplementedError("ConnectionClient base class cannot be used by itself")
[docs] def close(self): """ Does whatever is necessary to release resources when the SSEClient is closed. """ pass
def __enter__(self): return self def __exit__(self, type, value, traceback): self.close()
[docs]class ConnectionResult: """ The return type of :meth:`ConnectionClient.connect()`. """ def __init__( self, stream: Iterator[bytes], closer: Optional[Callable] ): self.__stream = stream self.__closer = closer @property def stream(self) -> Iterator[bytes]: """ An iterator that returns chunks of data. """ return self.__stream
[docs] def close(self): """ Does whatever is necessary to release the connection. """ if self.__closer: self.__closer() self.__closer = None
def __enter__(self): return self def __exit__(self, type, value, traceback): self.close()
# _HttpConnectStrategy and _HttpConnectionClient are defined here rather than in http.py to avoid # a circular module reference. class _HttpConnectStrategy(ConnectStrategy): def __init__(self, params: _HttpConnectParams): self.__params = params def create_client(self, logger: Logger) -> ConnectionClient: return _HttpConnectionClient(self.__params, logger) class _HttpConnectionClient(ConnectionClient): def __init__(self, params: _HttpConnectParams, logger: Logger): self.__impl = _HttpClientImpl(params, logger) def connect(self, last_event_id: Optional[str]) -> ConnectionResult: stream, closer = self.__impl.connect(last_event_id) return ConnectionResult(stream, closer) def close(self): self.__impl.close() __all__ = ['ConnectStrategy', 'ConnectionClient', 'ConnectionResult']