LaunchDarkly Python SSE Client¶
This is the API reference for the launchdarkly-eventsource package, a Server-Sent Events client for Python. This package is used internally by the LaunchDarkly Python SDK, but may also be useful for other purposes.
ld_eventsource module¶
-
class
ld_eventsource.
SSEClient
(connect, initial_retry_delay=1, retry_delay_strategy=None, retry_delay_reset_threshold=60, error_strategy=None, last_event_id=None, logger=None)[source]¶ A client for reading a Server-Sent Events stream.
This is a synchronous implementation which blocks the caller’s thread when reading events or reconnecting. It can be run on a worker thread. The expected usage is to create an
SSEClient
instance, then read from it using the iterator propertiesevents
orall
.By default,
SSEClient
usesurllib3
to make HTTP requests to an SSE endpoint. You can customize this behavior usingConnectStrategy
.Connection failures and error responses can be handled in various ways depending on the constructor parameters. The default behavior, if no non-default parameters are passed, is that the client will attempt to reconnect as many times as necessary if a connection is dropped or cannot be made; but if a connection is made and returns an invalid response (non-2xx status, 204 status, or invalid content type), it will not retry. This behavior can be customized with
error_strategy
. The client will automatically follow 3xx redirects.For any non-retryable error, if this is the first connection attempt then the constructor will throw an exception (such as
HTTPStatusError
). Or, if a successful connection was made so the constructor has already returned, but a non-retryable error occurs subsequently, the iterator properties will simply run out of values to indicate that theSSEClient
is finished (if you are readingall
, it will first yield aFault
to indicate what the error was).To avoid flooding the server with requests, it is desirable to have a delay before each reconnection. There is a base delay set by
initial_retry_delay
(which can be overridden by the stream if the server sends aretry:
line). By default, as defined byRetryDelayStrategy.default()
, this delay will double with each subsequent retry, and will also have a pseudo-random jitter subtracted. You can customize this behavior withretry_delay_strategy
.-
__init__
(connect, initial_retry_delay=1, retry_delay_strategy=None, retry_delay_reset_threshold=60, error_strategy=None, last_event_id=None, logger=None)[source]¶ Creates a client instance.
The client is created in an inactive state. It will not try to make a stream connection until either you call
start()
, or you attempt to read events fromevents
orall
.For the default HTTP behavior, you may pass a URL string for
connect
; this is equivalent toconnect=ConnectStrategy.http(url)
. To set custom HTTP options, callConnectStrategy.http()
directly:sse_client = SSEClient( connect=ConnectStrategy.http( url="https://my-sse-server.com", headers={"Authorization": "abcdef"} ) )
Or, you may provide your own
ConnectStrategy
implementation to make SSEClient read from another source.Parameters: - connect (
Union
[str
,ConnectStrategy
]) – either aConnectStrategy
instance or a URL string - initial_retry_delay (
float
) – the initial delay before reconnecting after a failure, in seconds; this can increase as described inSSEClient
- retry_delay_strategy (
Optional
[RetryDelayStrategy
]) – allows customization of the delay behavior for retries; if not specified, usesRetryDelayStrategy.default()
- retry_delay_reset_threshold (
float
) – the minimum amount of time that a connection must stay open before the SSEClient resets its retry delay strategy - error_strategy (
Optional
[ErrorStrategy
]) – allows customization of the behavior after a stream failure; if not specified: usesErrorStrategy.always_fail()
- last_event_id (
Optional
[str
]) – if provided, theLast-Event-Id
value will be preset to this - logger (
Optional
[Logger
]) – if provided, log messages will be written here
- connect (
-
all
¶ An iterable series of notifications from the stream.
Each of these can be any subclass of
Action
:Event
,Comment
,Start
, orFault
.You can use
events
instead if you are only interested in Events.Iterating over this property automatically starts or restarts the stream if it is not already active, so you do not need to call
start()
unless you want to verify that the stream is connected before trying to read events.Return type: Iterable
[Action
]
-
events
¶ An iterable series of
Event
objects received from the stream.Use
all
instead if you also want to know about other kinds of occurrences.Iterating over this property automatically starts or restarts the stream if it is not already active, so you do not need to call
start()
unless you want to verify that the stream is connected before trying to read events.Return type: Iterable
[Event
]
-
interrupt
()[source]¶ Stops the stream connection if it is currently active.
The difference between this method and
close()
is that this method does not permanently shut down theSSEClient
. If you try to read more events or callstart()
, the client will try to reconnect to the stream. The behavior is exactly the same as if the previous stream had been ended by the server.
-
last_event_id
¶ The ID value, if any, of the last known event.
This can be set initially with the
last_event_id
parameter toSSEClient
, and is updated whenever an event is received that has an ID. Whether event IDs are supported depends on the server; it may ignore this value.Return type: Optional
[str
]
-
next_retry_delay
¶ The retry delay that will be used for the next reconnection, in seconds, if the stream has failed or ended.
This is initially zero, because SSEClient does not compute a retry delay until there is a failure. If you have just received an exception or a
Fault
, or if you were iterating through events and the events ran out because the stream closed, the value tells you how long SSEClient will sleep before the next reconnection attempt. The value is computed by applying the configuredRetryDelayStrategy
to the base retry delay.Return type: float
-
start
()[source]¶ Attempts to start the stream if it is not already active.
If there is not an active stream connection, this method attempts to start one using the previously configured parameters. If successful, it returns and you can proceed to read events. You should only read events on the same thread where you called
start()
.If the connection fails, the behavior depends on the configured
ErrorStrategy
. The default strategy is to raise an exception, but you can configure it to continue instead, in which casestart()
will keep retrying until theErrorStrategy
says to give up.If the stream was previously active and then failed,
start()
will sleep for some amount of time– the retry delay– before trying to make the connection. The retry delay is determined by theinitial_retry_delay
,retry_delay_strategy
, andretry_delay_reset_threshold
parameters toSSEClient
.
-
ld_eventsource.actions module¶
-
class
ld_eventsource.actions.
Action
[source]¶ Bases:
object
Base class for objects that can be returned by
SSEClient.all
.
-
class
ld_eventsource.actions.
Comment
(comment)[source]¶ Bases:
ld_eventsource.actions.Action
A comment received by
SSEClient
.Comment lines (any line beginning with a colon) have no significance in the SSE specification and can be ignored, but if you want to see them, use
SSEClient.all
. They will never be returned bySSEClient.events
.-
comment
¶ The comment text, not including the leading colon.
Return type: str
-
-
class
ld_eventsource.actions.
Event
(event='message', data='', id=None, last_event_id=None)[source]¶ Bases:
ld_eventsource.actions.Action
An event received by
SSEClient
.Instances of this class are returned by both
SSEClient.events
andSSEClient.all
.-
__init__
(event='message', data='', id=None, last_event_id=None)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
data
¶ The event data.
Return type: str
-
event
¶ The event type, or “message” if not specified.
Return type: str
-
id
¶ The value of the
id:
field for this event, or None if omitted.Return type: Optional
[str
]
-
last_event_id
¶ The value of the most recent
id:
field of an event seen in this stream so far.Return type: Optional
[str
]
-
-
class
ld_eventsource.actions.
Fault
(error)[source]¶ Bases:
ld_eventsource.actions.Action
Indicates that
SSEClient
encountered an error or end of stream.Instances of this class are only available from
SSEClient.all
.If you receive a Fault, the SSEClient is now in an inactive state since either a connection attempt has failed or an existing connection has been closed. The SSEClient will attempt to reconnect if you either call
SSEClient.start()
or simply continue reading events after this point.-
error
¶ The exception that caused the stream to fail, if any. If this is
None
, it means that the stream simply ran out of data, i.e. the server shut down the connection in an orderly way after sending an EOF chunk as defined by chunked transfer encoding.Return type: Optional
[Exception
]
-
-
class
ld_eventsource.actions.
Start
[source]¶ Bases:
ld_eventsource.actions.Action
Indicates that
SSEClient
has successfully connected to a stream.Instances of this class are only available from
SSEClient.all
. AStart
is returned for the first successful connection. If the client reconnects after a failure, there will be aFault
followed by aStart
.
ld_eventsource.config module¶
-
class
ld_eventsource.config.
ConnectStrategy
[source]¶ An abstraction for how
SSEClient
should obtain an input stream.The default implementation is
http()
, which makes HTTP requests withurllib3
. Or, if you want to consume an input stream from some other source, you can create your own subclass ofConnectStrategy
.Instances of this class should be immutable and should not contain any state that is specific to one active stream. The
ConnectionClient
that they produce is stateful and belongs to a singleSSEClient
.-
create_client
(logger)[source]¶ Creates a client instance.
This is called once when an
SSEClient
is created. The SSEClient returns the returnedConnectionClient
and uses it to perform all subsequent connection attempts.Parameters: logger ( Logger
) – the logger being used by the SSEClientReturn type: ConnectionClient
-
static
http
(url, headers=None, pool=None, urllib3_request_options=None)[source]¶ Creates the default HTTP implementation, specifying request parameters.
Parameters: - url (
str
) – the stream URL - headers (
Optional
[dict
]) – optional HTTP headers to add to the request - pool (
Optional
[PoolManager
]) – optional urllib3PoolManager
to provide an HTTP client - urllib3_request_options (
Optional
[dict
]) – optionalkwargs
to add to therequest
call; these can include any parameters supported byurllib3
, such astimeout
Return type: ConnectStrategy
- url (
-
-
class
ld_eventsource.config.
ConnectionClient
[source]¶ An object provided by
ConnectStrategy
that is retained by a singleSSEClient
to perform all connection attempts by that instance.For the default HTTP implementation, this represents an HTTP connection pool.
-
connect
(last_event_id)[source]¶ Attempts to connect to a stream. Raises an exception if unsuccessful.
Parameters: last_event_id ( Optional
[str
]) – the current value ofSSEClient.last_event_id
(should be sent to the server to support resuming an interrupted stream)Return type: ConnectionResult
Returns: a ConnectionResult
representing the stream
-
-
class
ld_eventsource.config.
ConnectionResult
(stream, closer)[source]¶ The return type of
ConnectionClient.connect()
.-
stream
¶ An iterator that returns chunks of data.
Return type: Iterator
[bytes
]
-
-
class
ld_eventsource.config.
ErrorStrategy
[source]¶ Base class of strategies for determining how SSEClient should handle a stream error or the end of a stream.
The parameter that SSEClient passes to
apply()
is eitherNone
if the server ended the stream normally, or an exception. If it is an exception, it could be an I/O exception (failure to connect, broken connection, etc.), or one of the error types defined in this package such asHTTPStatusError
.The two options for the result are:
FAIL
: This means that SSEClient should throw an exception to the caller– or, in the case of a stream ending without an error, it should simply stop iterating through events.CONTINUE
: This means that you intend to keep reading events, so SSEClient should transparently retry the connection. If you are reading fromSSEClient.all
, you will also receive aFault
describing the error.
With either option, it is still always possible to explicitly reconnect the stream by calling
SSEClient.start()
again, or simply by trying to read fromSSEClient.events
orSSEClient.all
again.Subclasses should be immutable. To implement strategies that behave differently on consecutive retries, the strategy should return a new instance of its own class as the second return value from
apply
, rather than modifying the state of the existing instance. This makes it easy for SSEClient to reset to the original error-handling state when appropriate by simply reusing the original instance.-
CONTINUE
= False¶
-
FAIL
= True¶
-
static
always_continue
()[source]¶ Specifies that SSEClient should never raise an exception, but should transparently retry or, if
SSEClient.all
is being used, return the error as aFault
.Be aware that using this mode could cause connection attempts to block indefinitely if the server is unavailable.
Return type: ErrorStrategy
-
static
always_fail
()[source]¶ Specifies that SSEClient should always treat an error as a stream failure. This is the default behavior if you do not configure another.
Return type: ErrorStrategy
-
apply
(exception)[source]¶ Applies the strategy to determine what to do after a failure.
Parameters: exception ( Optional
[Exception
]) – an exception, orNone
if the stream simply endedReturn type: Tuple
[bool
,ErrorStrategy
]Returns: a tuple where the first element is either FAIL
to raise an exception orCONTINUE
to continue, and the second element is the strategy object to use next time (which could beself
)
-
static
continue_with_max_attempts
(max_attempts)[source]¶ Specifies that SSEClient should automatically retry after an error for up to this number of consecutive attempts, but should fail after that point.
Parameters: max_attempts ( int
) – the maximum number of consecutive retriesReturn type: ErrorStrategy
-
static
continue_with_time_limit
(max_time)[source]¶ Specifies that SSEClient should automatically retry after a failure and can retry repeatedly until this amount of time has elapsed, but should fail after that point.
Parameters: max_time ( float
) – the time limit, in secondsReturn type: ErrorStrategy
-
static
from_lambda
(fn)[source]¶ Convenience method for creating an ErrorStrategy whose
apply
method is equivalent to the given lambda.The one difference is that the second return value is an
Optional[ErrorStrategy]
which can be None to mean “no change”, since the lambda cannot reference the strategy’sself
.Return type: ErrorStrategy
-
class
ld_eventsource.config.
RetryDelayStrategy
[source]¶ Base class of strategies for computing how long to wait before retrying a connection.
The default behavior, provided by
default()
, provides customizable exponential backoff and jitter. Applications may also create their own subclasses of RetryDelayStrategy if they desire different behavior. It is generally a best practice to use backoff and jitter, to avoid a reconnect storm during a service interruption.Subclasses should be immutable. To implement strategies where the delay uses different parameters on each subsequent retry (such as exponential backoff), the strategy should return a new instance of its own class as the second return value from
apply
, rather than modifying the state of the existing instance. This makes it easy for SSEClient to reset to the original delay state when appropriate by simply reusing the original instance.-
apply
(base_delay)[source]¶ Applies the strategy to compute the appropriate retry delay.
Parameters: base_delay ( float
) – the initial configured base delay, in seconds, as set in the SSEClient parametersReturn type: Tuple
[float
,RetryDelayStrategy
]Returns: a tuple where the first element is the computed delay, in seconds, and the second element the strategy object to use next time (which could be self
)
-
static
default
(max_delay=None, backoff_multiplier=2, jitter_multiplier=None)[source]¶ Provides the default retry delay behavior for
SSEClient
, which includes customizable backoff and jitter options.The behavior is as follows:
- Start with the configured base delay as set by the
initial_retry_delay
parameter toSSEClient
. - On each subsequent attempt, multiply the base delay by
backoff_multiplier
, giving the current base delay. - If
max_delay
is set and is greater than zero, the base delay is pinned to be no greater than that value. - If
jitter_multiplier
is set and is greater than zero, the actual delay for each attempt is equal to the current base delay minus a pseudo-random number equal to that ratio times itself. For instance, a jitter multiplier of 0.25 would mean that a base delay of 1000 is changed to a value in the range [750, 1000].
Parameters: - max_delay (
Optional
[float
]) – the maximum possible delay value, in seconds; default is 30 seconds - backoff_multiplier (
float
) – the exponential backoff factor - jitter_multiplier (
Optional
[float
]) – a fraction from 0.0 to 1.0 for how much of the delay may be pseudo-randomly subtracted
Return type: RetryDelayStrategy
- Start with the configured base delay as set by the
-
static
from_lambda
(fn)[source]¶ Convenience method for creating a RetryDelayStrategy whose
apply
method is equivalent to the given lambda.The one difference is that the second return value is an
Optional[RetryDelayStrategy]
which can be None to mean “no change”, since the lambda cannot reference the strategy’sself
.Return type: RetryDelayStrategy
-
ld_eventsource.errors module¶
-
exception
ld_eventsource.errors.
HTTPContentTypeError
(content_type)[source]¶ Bases:
Exception
This exception indicates that the HTTP response did not have the expected content type of “text/event-stream”.
-
content_type
¶ Return type: str
-