LaunchDarkly Python SSE Client¶
This is the API reference for the launchdarkly-eventsource package, a Server-Sent Events client for Python. This package is used internally by the LaunchDarkly Python SDK, but may also be useful for other purposes.
ld_eventsource module¶
-
class
ld_eventsource.SSEClient(connect, initial_retry_delay=1, retry_delay_strategy=None, retry_delay_reset_threshold=60, error_strategy=None, last_event_id=None, logger=None)[source]¶ A client for reading a Server-Sent Events stream.
This is a synchronous implementation which blocks the caller’s thread when reading events or reconnecting. It can be run on a worker thread. The expected usage is to create an
SSEClientinstance, then read from it using the iterator propertieseventsorall.By default,
SSEClientusesurllib3to make HTTP requests to an SSE endpoint. You can customize this behavior usingConnectStrategy.Connection failures and error responses can be handled in various ways depending on the constructor parameters. The default behavior, if no non-default parameters are passed, is that the client will attempt to reconnect as many times as necessary if a connection is dropped or cannot be made; but if a connection is made and returns an invalid response (non-2xx status, 204 status, or invalid content type), it will not retry. This behavior can be customized with
error_strategy. The client will automatically follow 3xx redirects.For any non-retryable error, if this is the first connection attempt then the constructor will throw an exception (such as
HTTPStatusError). Or, if a successful connection was made so the constructor has already returned, but a non-retryable error occurs subsequently, the iterator properties will simply run out of values to indicate that theSSEClientis finished (if you are readingall, it will first yield aFaultto indicate what the error was).To avoid flooding the server with requests, it is desirable to have a delay before each reconnection. There is a base delay set by
initial_retry_delay(which can be overridden by the stream if the server sends aretry:line). By default, as defined byRetryDelayStrategy.default(), this delay will double with each subsequent retry, and will also have a pseudo-random jitter subtracted. You can customize this behavior withretry_delay_strategy.-
__init__(connect, initial_retry_delay=1, retry_delay_strategy=None, retry_delay_reset_threshold=60, error_strategy=None, last_event_id=None, logger=None)[source]¶ Creates a client instance.
The client is created in an inactive state. It will not try to make a stream connection until either you call
start(), or you attempt to read events fromeventsorall.For the default HTTP behavior, you may pass a URL string for
connect; this is equivalent toconnect=ConnectStrategy.http(url). To set custom HTTP options, callConnectStrategy.http()directly:sse_client = SSEClient( connect=ConnectStrategy.http( url="https://my-sse-server.com", headers={"Authorization": "abcdef"} ) )
Or, you may provide your own
ConnectStrategyimplementation to make SSEClient read from another source.Parameters: - connect (
Union[str,ConnectStrategy]) – either aConnectStrategyinstance or a URL string - initial_retry_delay (
float) – the initial delay before reconnecting after a failure, in seconds; this can increase as described inSSEClient - retry_delay_strategy (
Optional[RetryDelayStrategy]) – allows customization of the delay behavior for retries; if not specified, usesRetryDelayStrategy.default() - retry_delay_reset_threshold (
float) – the minimum amount of time that a connection must stay open before the SSEClient resets its retry delay strategy - error_strategy (
Optional[ErrorStrategy]) – allows customization of the behavior after a stream failure; if not specified: usesErrorStrategy.always_fail() - last_event_id (
Optional[str]) – if provided, theLast-Event-Idvalue will be preset to this - logger (
Optional[Logger]) – if provided, log messages will be written here
- connect (
-
all¶ An iterable series of notifications from the stream.
Each of these can be any subclass of
Action:Event,Comment,Start, orFault.You can use
eventsinstead if you are only interested in Events.Iterating over this property automatically starts or restarts the stream if it is not already active, so you do not need to call
start()unless you want to verify that the stream is connected before trying to read events.Return type: Iterable[Action]
-
events¶ An iterable series of
Eventobjects received from the stream.Use
allinstead if you also want to know about other kinds of occurrences.Iterating over this property automatically starts or restarts the stream if it is not already active, so you do not need to call
start()unless you want to verify that the stream is connected before trying to read events.Return type: Iterable[Event]
-
interrupt()[source]¶ Stops the stream connection if it is currently active.
The difference between this method and
close()is that this method does not permanently shut down theSSEClient. If you try to read more events or callstart(), the client will try to reconnect to the stream. The behavior is exactly the same as if the previous stream had been ended by the server.
-
last_event_id¶ The ID value, if any, of the last known event.
This can be set initially with the
last_event_idparameter toSSEClient, and is updated whenever an event is received that has an ID. Whether event IDs are supported depends on the server; it may ignore this value.Return type: Optional[str]
-
next_retry_delay¶ The retry delay that will be used for the next reconnection, in seconds, if the stream has failed or ended.
This is initially zero, because SSEClient does not compute a retry delay until there is a failure. If you have just received an exception or a
Fault, or if you were iterating through events and the events ran out because the stream closed, the value tells you how long SSEClient will sleep before the next reconnection attempt. The value is computed by applying the configuredRetryDelayStrategyto the base retry delay.Return type: float
-
start()[source]¶ Attempts to start the stream if it is not already active.
If there is not an active stream connection, this method attempts to start one using the previously configured parameters. If successful, it returns and you can proceed to read events. You should only read events on the same thread where you called
start().If the connection fails, the behavior depends on the configured
ErrorStrategy. The default strategy is to raise an exception, but you can configure it to continue instead, in which casestart()will keep retrying until theErrorStrategysays to give up.If the stream was previously active and then failed,
start()will sleep for some amount of time– the retry delay– before trying to make the connection. The retry delay is determined by theinitial_retry_delay,retry_delay_strategy, andretry_delay_reset_thresholdparameters toSSEClient.
-
ld_eventsource.actions module¶
-
class
ld_eventsource.actions.Action[source]¶ Bases:
objectBase class for objects that can be returned by
SSEClient.all.
-
class
ld_eventsource.actions.Comment(comment)[source]¶ Bases:
ld_eventsource.actions.ActionA comment received by
SSEClient.Comment lines (any line beginning with a colon) have no significance in the SSE specification and can be ignored, but if you want to see them, use
SSEClient.all. They will never be returned bySSEClient.events.-
comment¶ The comment text, not including the leading colon.
Return type: str
-
-
class
ld_eventsource.actions.Event(event='message', data='', id=None, last_event_id=None)[source]¶ Bases:
ld_eventsource.actions.ActionAn event received by
SSEClient.Instances of this class are returned by both
SSEClient.eventsandSSEClient.all.-
__init__(event='message', data='', id=None, last_event_id=None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
data¶ The event data.
Return type: str
-
event¶ The event type, or “message” if not specified.
Return type: str
-
id¶ The value of the
id:field for this event, or None if omitted.Return type: Optional[str]
-
last_event_id¶ The value of the most recent
id:field of an event seen in this stream so far.Return type: Optional[str]
-
-
class
ld_eventsource.actions.Fault(error)[source]¶ Bases:
ld_eventsource.actions.ActionIndicates that
SSEClientencountered an error or end of stream.Instances of this class are only available from
SSEClient.all.If you receive a Fault, the SSEClient is now in an inactive state since either a connection attempt has failed or an existing connection has been closed. The SSEClient will attempt to reconnect if you either call
SSEClient.start()or simply continue reading events after this point.-
error¶ The exception that caused the stream to fail, if any. If this is
None, it means that the stream simply ran out of data, i.e. the server shut down the connection in an orderly way after sending an EOF chunk as defined by chunked transfer encoding.Return type: Optional[Exception]
-
-
class
ld_eventsource.actions.Start[source]¶ Bases:
ld_eventsource.actions.ActionIndicates that
SSEClienthas successfully connected to a stream.Instances of this class are only available from
SSEClient.all. AStartis returned for the first successful connection. If the client reconnects after a failure, there will be aFaultfollowed by aStart.
ld_eventsource.config module¶
-
class
ld_eventsource.config.ConnectStrategy[source]¶ An abstraction for how
SSEClientshould obtain an input stream.The default implementation is
http(), which makes HTTP requests withurllib3. Or, if you want to consume an input stream from some other source, you can create your own subclass ofConnectStrategy.Instances of this class should be immutable and should not contain any state that is specific to one active stream. The
ConnectionClientthat they produce is stateful and belongs to a singleSSEClient.-
create_client(logger)[source]¶ Creates a client instance.
This is called once when an
SSEClientis created. The SSEClient returns the returnedConnectionClientand uses it to perform all subsequent connection attempts.Parameters: logger ( Logger) – the logger being used by the SSEClientReturn type: ConnectionClient
-
static
http(url, headers=None, pool=None, urllib3_request_options=None)[source]¶ Creates the default HTTP implementation, specifying request parameters.
Parameters: - url (
str) – the stream URL - headers (
Optional[dict]) – optional HTTP headers to add to the request - pool (
Optional[PoolManager]) – optional urllib3PoolManagerto provide an HTTP client - urllib3_request_options (
Optional[dict]) – optionalkwargsto add to therequestcall; these can include any parameters supported byurllib3, such astimeout
Return type: ConnectStrategy- url (
-
-
class
ld_eventsource.config.ConnectionClient[source]¶ An object provided by
ConnectStrategythat is retained by a singleSSEClientto perform all connection attempts by that instance.For the default HTTP implementation, this represents an HTTP connection pool.
-
connect(last_event_id)[source]¶ Attempts to connect to a stream. Raises an exception if unsuccessful.
Parameters: last_event_id ( Optional[str]) – the current value ofSSEClient.last_event_id(should be sent to the server to support resuming an interrupted stream)Return type: ConnectionResultReturns: a ConnectionResultrepresenting the stream
-
-
class
ld_eventsource.config.ConnectionResult(stream, closer)[source]¶ The return type of
ConnectionClient.connect().-
stream¶ An iterator that returns chunks of data.
Return type: Iterator[bytes]
-
-
class
ld_eventsource.config.ErrorStrategy[source]¶ Base class of strategies for determining how SSEClient should handle a stream error or the end of a stream.
The parameter that SSEClient passes to
apply()is eitherNoneif the server ended the stream normally, or an exception. If it is an exception, it could be an I/O exception (failure to connect, broken connection, etc.), or one of the error types defined in this package such asHTTPStatusError.The two options for the result are:
FAIL: This means that SSEClient should throw an exception to the caller– or, in the case of a stream ending without an error, it should simply stop iterating through events.CONTINUE: This means that you intend to keep reading events, so SSEClient should transparently retry the connection. If you are reading fromSSEClient.all, you will also receive aFaultdescribing the error.
With either option, it is still always possible to explicitly reconnect the stream by calling
SSEClient.start()again, or simply by trying to read fromSSEClient.eventsorSSEClient.allagain.Subclasses should be immutable. To implement strategies that behave differently on consecutive retries, the strategy should return a new instance of its own class as the second return value from
apply, rather than modifying the state of the existing instance. This makes it easy for SSEClient to reset to the original error-handling state when appropriate by simply reusing the original instance.-
CONTINUE= False¶
-
FAIL= True¶
-
static
always_continue()[source]¶ Specifies that SSEClient should never raise an exception, but should transparently retry or, if
SSEClient.allis being used, return the error as aFault.Be aware that using this mode could cause connection attempts to block indefinitely if the server is unavailable.
Return type: ErrorStrategy
-
static
always_fail()[source]¶ Specifies that SSEClient should always treat an error as a stream failure. This is the default behavior if you do not configure another.
Return type: ErrorStrategy
-
apply(exception)[source]¶ Applies the strategy to determine what to do after a failure.
Parameters: exception ( Optional[Exception]) – an exception, orNoneif the stream simply endedReturn type: Tuple[bool,ErrorStrategy]Returns: a tuple where the first element is either FAILto raise an exception orCONTINUEto continue, and the second element is the strategy object to use next time (which could beself)
-
static
continue_with_max_attempts(max_attempts)[source]¶ Specifies that SSEClient should automatically retry after an error for up to this number of consecutive attempts, but should fail after that point.
Parameters: max_attempts ( int) – the maximum number of consecutive retriesReturn type: ErrorStrategy
-
static
continue_with_time_limit(max_time)[source]¶ Specifies that SSEClient should automatically retry after a failure and can retry repeatedly until this amount of time has elapsed, but should fail after that point.
Parameters: max_time ( float) – the time limit, in secondsReturn type: ErrorStrategy
-
static
from_lambda(fn)[source]¶ Convenience method for creating an ErrorStrategy whose
applymethod is equivalent to the given lambda.The one difference is that the second return value is an
Optional[ErrorStrategy]which can be None to mean “no change”, since the lambda cannot reference the strategy’sself.Return type: ErrorStrategy
-
class
ld_eventsource.config.RetryDelayStrategy[source]¶ Base class of strategies for computing how long to wait before retrying a connection.
The default behavior, provided by
default(), provides customizable exponential backoff and jitter. Applications may also create their own subclasses of RetryDelayStrategy if they desire different behavior. It is generally a best practice to use backoff and jitter, to avoid a reconnect storm during a service interruption.Subclasses should be immutable. To implement strategies where the delay uses different parameters on each subsequent retry (such as exponential backoff), the strategy should return a new instance of its own class as the second return value from
apply, rather than modifying the state of the existing instance. This makes it easy for SSEClient to reset to the original delay state when appropriate by simply reusing the original instance.-
apply(base_delay)[source]¶ Applies the strategy to compute the appropriate retry delay.
Parameters: base_delay ( float) – the initial configured base delay, in seconds, as set in the SSEClient parametersReturn type: Tuple[float,RetryDelayStrategy]Returns: a tuple where the first element is the computed delay, in seconds, and the second element the strategy object to use next time (which could be self)
-
static
default(max_delay=None, backoff_multiplier=2, jitter_multiplier=None)[source]¶ Provides the default retry delay behavior for
SSEClient, which includes customizable backoff and jitter options.The behavior is as follows:
- Start with the configured base delay as set by the
initial_retry_delayparameter toSSEClient. - On each subsequent attempt, multiply the base delay by
backoff_multiplier, giving the current base delay. - If
max_delayis set and is greater than zero, the base delay is pinned to be no greater than that value. - If
jitter_multiplieris set and is greater than zero, the actual delay for each attempt is equal to the current base delay minus a pseudo-random number equal to that ratio times itself. For instance, a jitter multiplier of 0.25 would mean that a base delay of 1000 is changed to a value in the range [750, 1000].
Parameters: - max_delay (
Optional[float]) – the maximum possible delay value, in seconds; default is 30 seconds - backoff_multiplier (
float) – the exponential backoff factor - jitter_multiplier (
Optional[float]) – a fraction from 0.0 to 1.0 for how much of the delay may be pseudo-randomly subtracted
Return type: RetryDelayStrategy- Start with the configured base delay as set by the
-
static
from_lambda(fn)[source]¶ Convenience method for creating a RetryDelayStrategy whose
applymethod is equivalent to the given lambda.The one difference is that the second return value is an
Optional[RetryDelayStrategy]which can be None to mean “no change”, since the lambda cannot reference the strategy’sself.Return type: RetryDelayStrategy
-
ld_eventsource.errors module¶
-
exception
ld_eventsource.errors.HTTPContentTypeError(content_type)[source]¶ Bases:
ExceptionThis exception indicates that the HTTP response did not have the expected content type of “text/event-stream”.
-
content_type¶ Return type: str
-