Class: LaunchDarkly::Impl::DataSource::StreamProcessor Private
- Inherits:
-
Object
- Object
- LaunchDarkly::Impl::DataSource::StreamProcessor
- Defined in:
- lib/ldclient-rb/impl/data_source/stream.rb
Overview
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Instance Method Summary collapse
-
#initialize(sdk_key, config, diagnostic_accumulator = nil) ⇒ StreamProcessor
constructor
private
A new instance of StreamProcessor.
- #initialized? ⇒ Boolean private
- #start ⇒ Object private
- #stop ⇒ Object private
Constructor Details
#initialize(sdk_key, config, diagnostic_accumulator = nil) ⇒ StreamProcessor
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of StreamProcessor.
23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/ldclient-rb/impl/data_source/stream.rb', line 23 def initialize(sdk_key, config, diagnostic_accumulator = nil) @sdk_key = sdk_key @config = config @diagnostic_accumulator = diagnostic_accumulator @data_source_update_sink = config.data_source_update_sink @feature_store = config.feature_store @initialized = Concurrent::AtomicBoolean.new(false) @started = Concurrent::AtomicBoolean.new(false) @stopped = Concurrent::AtomicBoolean.new(false) @ready = Concurrent::Event.new @connection_attempt_start_time = 0 end |
Instance Method Details
#initialized? ⇒ Boolean
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
36 37 38 |
# File 'lib/ldclient-rb/impl/data_source/stream.rb', line 36 def initialized? @initialized.value end |
#start ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/ldclient-rb/impl/data_source/stream.rb', line 40 def start return @ready unless @started.make_true @config.logger.info { "[LDClient] Initializing stream connection" } headers = Impl::Util.default_http_headers(@sdk_key, @config) opts = { headers: headers, read_timeout: READ_TIMEOUT_SECONDS, logger: @config.logger, socket_factory: @config.socket_factory, reconnect_time: @config.initial_reconnect_delay, } log_connection_started uri = Impl::Util.add_payload_filter_key(@config.stream_uri + "/all", @config) @es = SSE::Client.new(uri, **opts) do |conn| conn.on_event { |event| (event) } conn.on_error { |err| log_connection_result(false) case err when SSE::Errors::HTTPStatusError status = err.status error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE, status, nil, Time.now) = Util.(status, "streaming connection", "will retry") @config.logger.error { "[LDClient] #{}" } if Util.http_error_recoverable?(status) @data_source_update_sink&.update_status( LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, error_info ) else @ready.set # if client was waiting on us, make it stop waiting - has no effect if already set stop_with_error_info error_info end when SSE::Errors::HTTPContentTypeError, SSE::Errors::HTTPProxyError, SSE::Errors::ReadTimeoutError @data_source_update_sink&.update_status( LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::NETWORK_ERROR, 0, err.to_s, Time.now) ) else @data_source_update_sink&.update_status( LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, 0, err.to_s, Time.now) ) end } end @ready end |
#stop ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
95 96 97 |
# File 'lib/ldclient-rb/impl/data_source/stream.rb', line 95 def stop stop_with_error_info end |