Source code for ld_eventsource.sse_client

from ld_eventsource.actions import *
from ld_eventsource.errors import *
from ld_eventsource.config import *
from ld_eventsource.reader import _BufferedLineReader, _SSEReader

import logging
import time
from typing import Iterable, Optional, Union


[docs]class SSEClient: """ A client for reading a Server-Sent Events stream. This is a synchronous implementation which blocks the caller's thread when reading events or reconnecting. It can be run on a worker thread. The expected usage is to create an ``SSEClient`` instance, then read from it using the iterator properties :attr:`events` or :attr:`all`. By default, ``SSEClient`` uses ``urllib3`` to make HTTP requests to an SSE endpoint. You can customize this behavior using :class:`.ConnectStrategy`. Connection failures and error responses can be handled in various ways depending on the constructor parameters. The default behavior, if no non-default parameters are passed, is that the client will attempt to reconnect as many times as necessary if a connection is dropped or cannot be made; but if a connection is made and returns an invalid response (non-2xx status, 204 status, or invalid content type), it will not retry. This behavior can be customized with ``error_strategy``. The client will automatically follow 3xx redirects. For any non-retryable error, if this is the first connection attempt then the constructor will throw an exception (such as :class:`.HTTPStatusError`). Or, if a successful connection was made so the constructor has already returned, but a non-retryable error occurs subsequently, the iterator properties will simply run out of values to indicate that the ``SSEClient`` is finished (if you are reading :attr:`all`, it will first yield a :class:`.Fault` to indicate what the error was). To avoid flooding the server with requests, it is desirable to have a delay before each reconnection. There is a base delay set by ``initial_retry_delay`` (which can be overridden by the stream if the server sends a ``retry:`` line). By default, as defined by :meth:`.RetryDelayStrategy.default()`, this delay will double with each subsequent retry, and will also have a pseudo-random jitter subtracted. You can customize this behavior with ``retry_delay_strategy``. """
[docs] def __init__( self, connect: Union[str, ConnectStrategy], initial_retry_delay: float=1, retry_delay_strategy: Optional[RetryDelayStrategy]=None, retry_delay_reset_threshold: float=60, error_strategy: Optional[ErrorStrategy]=None, last_event_id: Optional[str]=None, logger: Optional[logging.Logger]=None ): """ Creates a client instance. The client is created in an inactive state. It will not try to make a stream connection until either you call :meth:`start()`, or you attempt to read events from :attr:`events` or :attr:`all`. For the default HTTP behavior, you may pass a URL string for ``connect``; this is equivalent to ``connect=ConnectStrategy.http(url)``. To set custom HTTP options, call :meth:`.ConnectStrategy.http()` directly: :: sse_client = SSEClient( connect=ConnectStrategy.http( url="https://my-sse-server.com", headers={"Authorization": "abcdef"} ) ) Or, you may provide your own :class:`.ConnectStrategy` implementation to make SSEClient read from another source. :param connect: either a :class:`.ConnectStrategy` instance or a URL string :param initial_retry_delay: the initial delay before reconnecting after a failure, in seconds; this can increase as described in :class:`SSEClient` :param retry_delay_strategy: allows customization of the delay behavior for retries; if not specified, uses :meth:`.RetryDelayStrategy.default()` :param retry_delay_reset_threshold: the minimum amount of time that a connection must stay open before the SSEClient resets its retry delay strategy :param error_strategy: allows customization of the behavior after a stream failure; if not specified: uses :meth:`.ErrorStrategy.always_fail()` :param last_event_id: if provided, the ``Last-Event-Id`` value will be preset to this :param logger: if provided, log messages will be written here """ if isinstance(connect, str): connect = ConnectStrategy.http(connect) elif not isinstance(connect, ConnectStrategy): raise TypeError("request must be either a string or ConnectStrategy") self.__base_retry_delay = initial_retry_delay self.__base_retry_delay_strategy = retry_delay_strategy or RetryDelayStrategy.default() self.__retry_delay_reset_threshold = retry_delay_reset_threshold self.__current_retry_delay_strategy = self.__base_retry_delay_strategy self.__next_retry_delay = 0 self.__base_error_strategy = error_strategy or ErrorStrategy.always_fail() self.__current_error_strategy = self.__base_error_strategy self.__last_event_id = last_event_id if logger is None: logger = logging.getLogger('launchdarkly-eventsource.null') logger.addHandler(logging.NullHandler()) logger.propagate = False self.__logger = logger self.__connection_client = connect.create_client(logger) self.__connection_result: Optional[ConnectionResult] = None self.__connected_time: float = 0 self.__disconnected_time: float = 0 self.__closed = False self.__interrupted = False
[docs] def start(self): """ Attempts to start the stream if it is not already active. If there is not an active stream connection, this method attempts to start one using the previously configured parameters. If successful, it returns and you can proceed to read events. You should only read events on the same thread where you called :meth:`start()`. If the connection fails, the behavior depends on the configured :class:`.ErrorStrategy`. The default strategy is to raise an exception, but you can configure it to continue instead, in which case :meth:`start()` will keep retrying until the :class:`.ErrorStrategy` says to give up. If the stream was previously active and then failed, :meth:`start()` will sleep for some amount of time-- the retry delay-- before trying to make the connection. The retry delay is determined by the ``initial_retry_delay``, ``retry_delay_strategy``, and ``retry_delay_reset_threshold`` parameters to :class:`SSEClient`. """ self._try_start(False)
[docs] def close(self): """ Permanently shuts down this client instance and closes any active connection. """ self.__closed = True self.interrupt()
[docs] def interrupt(self): """ Stops the stream connection if it is currently active. The difference between this method and :meth:`close()` is that this method does not permanently shut down the :class:`SSEClient`. If you try to read more events or call :meth:`start()`, the client will try to reconnect to the stream. The behavior is exactly the same as if the previous stream had been ended by the server. """ if self.__connection_result: self.__interrupted = True self.__connection_result.close() self.__connection_result = None self._compute_next_retry_delay()
@property def all(self) -> Iterable[Action]: """ An iterable series of notifications from the stream. Each of these can be any subclass of :class:`.Action`: :class:`.Event`, :class:`.Comment`, :class:`.Start`, or :class:`.Fault`. You can use :attr:`events` instead if you are only interested in Events. Iterating over this property automatically starts or restarts the stream if it is not already active, so you do not need to call :meth:`start()` unless you want to verify that the stream is connected before trying to read events. """ while True: # Reading implies starting the stream if it isn't already started. We might also # be restarting since we could have been interrupted at any time. while self.__connection_result is None: fault = self._try_start(True) # return either a Start action or a Fault action yield Start() if fault is None else fault lines = _BufferedLineReader.lines_from(self.__connection_result.stream) reader = _SSEReader(lines, self.__last_event_id, None) error: Optional[Exception] = None try: for ec in reader.events_and_comments: yield ec if self.__interrupted: break # If we finished iterating all of reader.events_and_comments, it means the stream # was closed without an error. self.__connection_result = None except Exception as e: if self.__closed: # It's normal to get an I/O error if we force-closed the stream to shut down return error = e self.__connection_result = None finally: self.__last_event_id = reader.last_event_id # We've hit an error, so ask the ErrorStrategy what to do: raise an exception or yield a Fault. self._compute_next_retry_delay() fail_or_continue, self.__current_error_strategy = self.__current_error_strategy.apply(error) if fail_or_continue == ErrorStrategy.FAIL: if error is None: # If error is None, the stream was ended normally by the server. Just stop iterating. yield Fault(None) # this is only visible if you're reading from "all" return raise error yield Fault(error) continue # try to connect again @property def events(self) -> Iterable[Event]: """ An iterable series of :class:`.Event` objects received from the stream. Use :attr:`all` instead if you also want to know about other kinds of occurrences. Iterating over this property automatically starts or restarts the stream if it is not already active, so you do not need to call :meth:`start()` unless you want to verify that the stream is connected before trying to read events. """ for item in self.all: if isinstance(item, Event): yield item @property def next_retry_delay(self) -> float: """ The retry delay that will be used for the next reconnection, in seconds, if the stream has failed or ended. This is initially zero, because SSEClient does not compute a retry delay until there is a failure. If you have just received an exception or a :class:`.Fault`, or if you were iterating through events and the events ran out because the stream closed, the value tells you how long SSEClient will sleep before the next reconnection attempt. The value is computed by applying the configured :class:`.RetryDelayStrategy` to the base retry delay. """ return self.__next_retry_delay def _compute_next_retry_delay(self): if self.__retry_delay_reset_threshold > 0 and self.__connected_time != 0: connection_duration = time.time() - self.__connected_time if connection_duration >= self.__retry_delay_reset_threshold: self.__current_retry_delay_strategy = self.__base_retry_delay_strategy self.__next_retry_delay, self.__current_retry_delay_strategy = \ self.__current_retry_delay_strategy.apply(self.__base_retry_delay) def _try_start(self, can_return_fault: bool) -> Optional[Fault]: if self.__connection_result is not None: return None while True: if self.__next_retry_delay > 0: delay = self.__next_retry_delay if self.__disconnected_time == 0 else \ self.__next_retry_delay - (time.time() - self.__disconnected_time) if delay > 0: self.__logger.info("Will reconnect after delay of %fs" % delay) time.sleep(delay) try: self.__connection_result = self.__connection_client.connect(self.__last_event_id) except Exception as e: self.__disconnected_time = time.time() self._compute_next_retry_delay() fail_or_continue, self.__current_error_strategy = self.__current_error_strategy.apply(e) if fail_or_continue == ErrorStrategy.FAIL: raise e if can_return_fault: return Fault(e) # If can_return_fault is false, it means the caller explicitly called start(), in # which case there's no way to return a Fault so we just keep retrying transparently. continue self.__connected_time = time.time() self.__current_error_strategy = self.__base_error_strategy self.__interrupted = False return None @property def last_event_id(self) -> Optional[str]: """ The ID value, if any, of the last known event. This can be set initially with the ``last_event_id`` parameter to :class:`SSEClient`, and is updated whenever an event is received that has an ID. Whether event IDs are supported depends on the server; it may ignore this value. """ return self.__last_event_id def __enter__(self): return self def __exit__(self, type, value, traceback): self.close()
__all__ = ['SSEClient']