LaunchDarkly Python SSE Client

This is the API reference for the launchdarkly-eventsource package, a Server-Sent Events client for Python. This package is used internally by the LaunchDarkly Python SDK, but may also be useful for other purposes.

ld_eventsource module

class ld_eventsource.SSEClient(connect, initial_retry_delay=1, retry_delay_strategy=None, retry_delay_reset_threshold=60, error_strategy=None, last_event_id=None, logger=None)[source]

A client for reading a Server-Sent Events stream.

This is a synchronous implementation which blocks the caller’s thread when reading events or reconnecting. It can be run on a worker thread. The expected usage is to create an SSEClient instance, then read from it using the iterator properties events or all.

By default, SSEClient uses urllib3 to make HTTP requests to an SSE endpoint. You can customize this behavior using ConnectStrategy.

Connection failures and error responses can be handled in various ways depending on the constructor parameters. The default behavior, if no non-default parameters are passed, is that the client will attempt to reconnect as many times as necessary if a connection is dropped or cannot be made; but if a connection is made and returns an invalid response (non-2xx status, 204 status, or invalid content type), it will not retry. This behavior can be customized with error_strategy. The client will automatically follow 3xx redirects.

For any non-retryable error, if this is the first connection attempt then the constructor will throw an exception (such as HTTPStatusError). Or, if a successful connection was made so the constructor has already returned, but a non-retryable error occurs subsequently, the iterator properties will simply run out of values to indicate that the SSEClient is finished (if you are reading all, it will first yield a Fault to indicate what the error was).

To avoid flooding the server with requests, it is desirable to have a delay before each reconnection. There is a base delay set by initial_retry_delay (which can be overridden by the stream if the server sends a retry: line). By default, as defined by RetryDelayStrategy.default(), this delay will double with each subsequent retry, and will also have a pseudo-random jitter subtracted. You can customize this behavior with retry_delay_strategy.

__init__(connect, initial_retry_delay=1, retry_delay_strategy=None, retry_delay_reset_threshold=60, error_strategy=None, last_event_id=None, logger=None)[source]

Creates a client instance.

The client is created in an inactive state. It will not try to make a stream connection until either you call start(), or you attempt to read events from events or all.

For the default HTTP behavior, you may pass a URL string for connect; this is equivalent to connect=ConnectStrategy.http(url). To set custom HTTP options, call ConnectStrategy.http() directly:

sse_client = SSEClient(
    connect=ConnectStrategy.http(
        url="https://my-sse-server.com",
        headers={"Authorization": "abcdef"}
    )
)

Or, you may provide your own ConnectStrategy implementation to make SSEClient read from another source.

Parameters:
  • connect (Union[str, ConnectStrategy]) – either a ConnectStrategy instance or a URL string
  • initial_retry_delay (float) – the initial delay before reconnecting after a failure, in seconds; this can increase as described in SSEClient
  • retry_delay_strategy (Optional[RetryDelayStrategy]) – allows customization of the delay behavior for retries; if not specified, uses RetryDelayStrategy.default()
  • retry_delay_reset_threshold (float) – the minimum amount of time that a connection must stay open before the SSEClient resets its retry delay strategy
  • error_strategy (Optional[ErrorStrategy]) – allows customization of the behavior after a stream failure; if not specified: uses ErrorStrategy.always_fail()
  • last_event_id (Optional[str]) – if provided, the Last-Event-Id value will be preset to this
  • logger (Optional[Logger]) – if provided, log messages will be written here
all

An iterable series of notifications from the stream.

Each of these can be any subclass of Action: Event, Comment, Start, or Fault.

You can use events instead if you are only interested in Events.

Iterating over this property automatically starts or restarts the stream if it is not already active, so you do not need to call start() unless you want to verify that the stream is connected before trying to read events.

Return type:Iterable[Action]
close()[source]

Permanently shuts down this client instance and closes any active connection.

events

An iterable series of Event objects received from the stream.

Use all instead if you also want to know about other kinds of occurrences.

Iterating over this property automatically starts or restarts the stream if it is not already active, so you do not need to call start() unless you want to verify that the stream is connected before trying to read events.

Return type:Iterable[Event]
interrupt()[source]

Stops the stream connection if it is currently active.

The difference between this method and close() is that this method does not permanently shut down the SSEClient. If you try to read more events or call start(), the client will try to reconnect to the stream. The behavior is exactly the same as if the previous stream had been ended by the server.

last_event_id

The ID value, if any, of the last known event.

This can be set initially with the last_event_id parameter to SSEClient, and is updated whenever an event is received that has an ID. Whether event IDs are supported depends on the server; it may ignore this value.

Return type:Optional[str]
next_retry_delay

The retry delay that will be used for the next reconnection, in seconds, if the stream has failed or ended.

This is initially zero, because SSEClient does not compute a retry delay until there is a failure. If you have just received an exception or a Fault, or if you were iterating through events and the events ran out because the stream closed, the value tells you how long SSEClient will sleep before the next reconnection attempt. The value is computed by applying the configured RetryDelayStrategy to the base retry delay.

Return type:float
start()[source]

Attempts to start the stream if it is not already active.

If there is not an active stream connection, this method attempts to start one using the previously configured parameters. If successful, it returns and you can proceed to read events. You should only read events on the same thread where you called start().

If the connection fails, the behavior depends on the configured ErrorStrategy. The default strategy is to raise an exception, but you can configure it to continue instead, in which case start() will keep retrying until the ErrorStrategy says to give up.

If the stream was previously active and then failed, start() will sleep for some amount of time– the retry delay– before trying to make the connection. The retry delay is determined by the initial_retry_delay, retry_delay_strategy, and retry_delay_reset_threshold parameters to SSEClient.

ld_eventsource.actions module

class ld_eventsource.actions.Action[source]

Bases: object

Base class for objects that can be returned by SSEClient.all.

class ld_eventsource.actions.Comment(comment)[source]

Bases: ld_eventsource.actions.Action

A comment received by SSEClient.

Comment lines (any line beginning with a colon) have no significance in the SSE specification and can be ignored, but if you want to see them, use SSEClient.all. They will never be returned by SSEClient.events.

__init__(comment)[source]

Initialize self. See help(type(self)) for accurate signature.

comment

The comment text, not including the leading colon.

Return type:str
class ld_eventsource.actions.Event(event='message', data='', id=None, last_event_id=None)[source]

Bases: ld_eventsource.actions.Action

An event received by SSEClient.

Instances of this class are returned by both SSEClient.events and SSEClient.all.

__init__(event='message', data='', id=None, last_event_id=None)[source]

Initialize self. See help(type(self)) for accurate signature.

data

The event data.

Return type:str
event

The event type, or “message” if not specified.

Return type:str
id

The value of the id: field for this event, or None if omitted.

Return type:Optional[str]
last_event_id

The value of the most recent id: field of an event seen in this stream so far.

Return type:Optional[str]
class ld_eventsource.actions.Fault(error)[source]

Bases: ld_eventsource.actions.Action

Indicates that SSEClient encountered an error or end of stream.

Instances of this class are only available from SSEClient.all.

If you receive a Fault, the SSEClient is now in an inactive state since either a connection attempt has failed or an existing connection has been closed. The SSEClient will attempt to reconnect if you either call SSEClient.start() or simply continue reading events after this point.

__init__(error)[source]

Initialize self. See help(type(self)) for accurate signature.

error

The exception that caused the stream to fail, if any. If this is None, it means that the stream simply ran out of data, i.e. the server shut down the connection in an orderly way after sending an EOF chunk as defined by chunked transfer encoding.

Return type:Optional[Exception]
class ld_eventsource.actions.Start[source]

Bases: ld_eventsource.actions.Action

Indicates that SSEClient has successfully connected to a stream.

Instances of this class are only available from SSEClient.all. A Start is returned for the first successful connection. If the client reconnects after a failure, there will be a Fault followed by a Start.

ld_eventsource.config module

class ld_eventsource.config.ConnectStrategy[source]

An abstraction for how SSEClient should obtain an input stream.

The default implementation is http(), which makes HTTP requests with urllib3. Or, if you want to consume an input stream from some other source, you can create your own subclass of ConnectStrategy.

Instances of this class should be immutable and should not contain any state that is specific to one active stream. The ConnectionClient that they produce is stateful and belongs to a single SSEClient.

create_client(logger)[source]

Creates a client instance.

This is called once when an SSEClient is created. The SSEClient returns the returned ConnectionClient and uses it to perform all subsequent connection attempts.

Parameters:logger (Logger) – the logger being used by the SSEClient
Return type:ConnectionClient
static http(url, headers=None, pool=None, urllib3_request_options=None)[source]

Creates the default HTTP implementation, specifying request parameters.

Parameters:
  • url (str) – the stream URL
  • headers (Optional[dict]) – optional HTTP headers to add to the request
  • pool (Optional[PoolManager]) – optional urllib3 PoolManager to provide an HTTP client
  • urllib3_request_options (Optional[dict]) – optional kwargs to add to the request call; these can include any parameters supported by urllib3, such as timeout
Return type:

ConnectStrategy

class ld_eventsource.config.ConnectionClient[source]

An object provided by ConnectStrategy that is retained by a single SSEClient to perform all connection attempts by that instance.

For the default HTTP implementation, this represents an HTTP connection pool.

close()[source]

Does whatever is necessary to release resources when the SSEClient is closed.

connect(last_event_id)[source]

Attempts to connect to a stream. Raises an exception if unsuccessful.

Parameters:last_event_id (Optional[str]) – the current value of SSEClient.last_event_id (should be sent to the server to support resuming an interrupted stream)
Return type:ConnectionResult
Returns:a ConnectionResult representing the stream
class ld_eventsource.config.ConnectionResult(stream, closer)[source]

The return type of ConnectionClient.connect().

close()[source]

Does whatever is necessary to release the connection.

stream

An iterator that returns chunks of data.

Return type:Iterator[bytes]
class ld_eventsource.config.ErrorStrategy[source]

Base class of strategies for determining how SSEClient should handle a stream error or the end of a stream.

The parameter that SSEClient passes to apply() is either None if the server ended the stream normally, or an exception. If it is an exception, it could be an I/O exception (failure to connect, broken connection, etc.), or one of the error types defined in this package such as HTTPStatusError.

The two options for the result are:

  • FAIL: This means that SSEClient should throw an exception to the caller– or, in the case of a stream ending without an error, it should simply stop iterating through events.
  • CONTINUE: This means that you intend to keep reading events, so SSEClient should transparently retry the connection. If you are reading from SSEClient.all, you will also receive a Fault describing the error.

With either option, it is still always possible to explicitly reconnect the stream by calling SSEClient.start() again, or simply by trying to read from SSEClient.events or SSEClient.all again.

Subclasses should be immutable. To implement strategies that behave differently on consecutive retries, the strategy should return a new instance of its own class as the second return value from apply, rather than modifying the state of the existing instance. This makes it easy for SSEClient to reset to the original error-handling state when appropriate by simply reusing the original instance.

CONTINUE = False
FAIL = True
static always_continue()[source]

Specifies that SSEClient should never raise an exception, but should transparently retry or, if SSEClient.all is being used, return the error as a Fault.

Be aware that using this mode could cause connection attempts to block indefinitely if the server is unavailable.

Return type:ErrorStrategy
static always_fail()[source]

Specifies that SSEClient should always treat an error as a stream failure. This is the default behavior if you do not configure another.

Return type:ErrorStrategy
apply(exception)[source]

Applies the strategy to determine what to do after a failure.

Parameters:exception (Optional[Exception]) – an exception, or None if the stream simply ended
Return type:Tuple[bool, ErrorStrategy]
Returns:a tuple where the first element is either FAIL to raise an exception or CONTINUE to continue, and the second element is the strategy object to use next time (which could be self)
static continue_with_max_attempts(max_attempts)[source]

Specifies that SSEClient should automatically retry after an error for up to this number of consecutive attempts, but should fail after that point.

Parameters:max_attempts (int) – the maximum number of consecutive retries
Return type:ErrorStrategy
static continue_with_time_limit(max_time)[source]

Specifies that SSEClient should automatically retry after a failure and can retry repeatedly until this amount of time has elapsed, but should fail after that point.

Parameters:max_time (float) – the time limit, in seconds
Return type:ErrorStrategy
static from_lambda(fn)[source]

Convenience method for creating an ErrorStrategy whose apply method is equivalent to the given lambda.

The one difference is that the second return value is an Optional[ErrorStrategy] which can be None to mean “no change”, since the lambda cannot reference the strategy’s self.

Return type:ErrorStrategy
class ld_eventsource.config.RetryDelayStrategy[source]

Base class of strategies for computing how long to wait before retrying a connection.

The default behavior, provided by default(), provides customizable exponential backoff and jitter. Applications may also create their own subclasses of RetryDelayStrategy if they desire different behavior. It is generally a best practice to use backoff and jitter, to avoid a reconnect storm during a service interruption.

Subclasses should be immutable. To implement strategies where the delay uses different parameters on each subsequent retry (such as exponential backoff), the strategy should return a new instance of its own class as the second return value from apply, rather than modifying the state of the existing instance. This makes it easy for SSEClient to reset to the original delay state when appropriate by simply reusing the original instance.

apply(base_delay)[source]

Applies the strategy to compute the appropriate retry delay.

Parameters:base_delay (float) – the initial configured base delay, in seconds, as set in the SSEClient parameters
Return type:Tuple[float, RetryDelayStrategy]
Returns:a tuple where the first element is the computed delay, in seconds, and the second element the strategy object to use next time (which could be self)
static default(max_delay=None, backoff_multiplier=2, jitter_multiplier=None)[source]

Provides the default retry delay behavior for SSEClient, which includes customizable backoff and jitter options.

The behavior is as follows:

  • Start with the configured base delay as set by the initial_retry_delay parameter to SSEClient.
  • On each subsequent attempt, multiply the base delay by backoff_multiplier, giving the current base delay.
  • If max_delay is set and is greater than zero, the base delay is pinned to be no greater than that value.
  • If jitter_multiplier is set and is greater than zero, the actual delay for each attempt is equal to the current base delay minus a pseudo-random number equal to that ratio times itself. For instance, a jitter multiplier of 0.25 would mean that a base delay of 1000 is changed to a value in the range [750, 1000].
Parameters:
  • max_delay (Optional[float]) – the maximum possible delay value, in seconds; default is 30 seconds
  • backoff_multiplier (float) – the exponential backoff factor
  • jitter_multiplier (Optional[float]) – a fraction from 0.0 to 1.0 for how much of the delay may be pseudo-randomly subtracted
Return type:

RetryDelayStrategy

static from_lambda(fn)[source]

Convenience method for creating a RetryDelayStrategy whose apply method is equivalent to the given lambda.

The one difference is that the second return value is an Optional[RetryDelayStrategy] which can be None to mean “no change”, since the lambda cannot reference the strategy’s self.

Return type:RetryDelayStrategy

ld_eventsource.errors module

exception ld_eventsource.errors.HTTPContentTypeError(content_type)[source]

Bases: Exception

This exception indicates that the HTTP response did not have the expected content type of “text/event-stream”.

__init__(content_type)[source]

Initialize self. See help(type(self)) for accurate signature.

content_type
Return type:str
exception ld_eventsource.errors.HTTPStatusError(status)[source]

Bases: Exception

This exception indicates that the client was able to connect to the server, but that the HTTP response had an error status.

__init__(status)[source]

Initialize self. See help(type(self)) for accurate signature.

status
Return type:int