Class: LaunchDarkly::Impl::DataSource::StreamProcessor Private

Inherits:
Object
  • Object
show all
Defined in:
lib/ldclient-rb/impl/data_source/stream.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Since:

  • 5.5.0

Instance Method Summary collapse

Constructor Details

#initialize(sdk_key, config, diagnostic_accumulator = nil) ⇒ StreamProcessor

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of StreamProcessor.

Since:

  • 5.5.0



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/ldclient-rb/impl/data_source/stream.rb', line 23

def initialize(sdk_key, config, diagnostic_accumulator = nil)
  @sdk_key = sdk_key
  @config = config
  @diagnostic_accumulator = diagnostic_accumulator
  @data_source_update_sink = config.data_source_update_sink
  @feature_store = config.feature_store
  @initialized = Concurrent::AtomicBoolean.new(false)
  @started = Concurrent::AtomicBoolean.new(false)
  @stopped = Concurrent::AtomicBoolean.new(false)
  @ready = Concurrent::Event.new
  @connection_attempt_start_time = 0
end

Instance Method Details

#initialized?Boolean

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns:

  • (Boolean)

Since:

  • 5.5.0



36
37
38
# File 'lib/ldclient-rb/impl/data_source/stream.rb', line 36

def initialized?
  @initialized.value
end

#startObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 5.5.0



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/ldclient-rb/impl/data_source/stream.rb', line 40

def start
  return @ready unless @started.make_true

  @config.logger.info { "[LDClient] Initializing stream connection" }

  headers = Impl::Util.default_http_headers(@sdk_key, @config)
  opts = {
    headers: headers,
    read_timeout: READ_TIMEOUT_SECONDS,
    logger: @config.logger,
    socket_factory: @config.socket_factory,
    reconnect_time: @config.initial_reconnect_delay,
  }
  log_connection_started

  uri = Impl::Util.add_payload_filter_key(@config.stream_uri + "/all", @config)
  @es = SSE::Client.new(uri, **opts) do |conn|
    conn.on_event { |event| process_message(event) }
    conn.on_error { |err|
      log_connection_result(false)
      case err
      when SSE::Errors::HTTPStatusError
        status = err.status
        error_info = LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
          LaunchDarkly::Interfaces::DataSource::ErrorInfo::ERROR_RESPONSE, status, nil, Time.now)
        message = Util.http_error_message(status, "streaming connection", "will retry")
        @config.logger.error { "[LDClient] #{message}" }

        if Util.http_error_recoverable?(status)
          @data_source_update_sink&.update_status(
            LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED,
            error_info
          )
        else
          @ready.set  # if client was waiting on us, make it stop waiting - has no effect if already set
          stop_with_error_info error_info
        end
      when SSE::Errors::HTTPContentTypeError, SSE::Errors::HTTPProxyError, SSE::Errors::ReadTimeoutError
        @data_source_update_sink&.update_status(
          LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED,
          LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::NETWORK_ERROR, 0, err.to_s, Time.now)
        )

      else
        @data_source_update_sink&.update_status(
          LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED,
          LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, 0, err.to_s, Time.now)
        )
      end
    }
  end

  @ready
end

#stopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 5.5.0



95
96
97
# File 'lib/ldclient-rb/impl/data_source/stream.rb', line 95

def stop
  stop_with_error_info
end