Class: LaunchDarkly::Impl::Integrations::Consul::ConsulFeatureStoreCore

Inherits:
Object
  • Object
show all
Defined in:
lib/ldclient-rb/impl/integrations/consul_impl.rb

Overview

Internal implementation of the Consul feature store, intended to be used with CachingStoreWrapper.

Since:

  • 5.5.0

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ ConsulFeatureStoreCore

Returns a new instance of ConsulFeatureStoreCore.

Since:

  • 5.5.0



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 18

def initialize(opts)
  unless CONSUL_ENABLED
    raise RuntimeError.new("can't use Consul feature store without the 'diplomat' gem")
  end

  @prefix = (opts[:prefix] || LaunchDarkly::Integrations::Consul.default_prefix) + '/'
  @logger = opts[:logger] || Config.default_logger
  Diplomat.configuration = opts[:consul_config] unless opts[:consul_config].nil?
  Diplomat.configuration.url = opts[:url] unless opts[:url].nil?
  @logger.info("ConsulFeatureStore: using Consul host at #{Diplomat.configuration.url}")
end

Instance Method Details

#available?Boolean

Returns:

  • (Boolean)

Since:

  • 5.5.0



122
123
124
125
126
127
128
129
130
131
132
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 122

def available?
  # Most implementations use the initialized_internal? method as a
  # proxy for this check. However, since `initialized_internal?`
  # catches a KeyNotFound exception, and that exception can be raised
  # when the server goes away, we have to modify our behavior
  # slightly.
  Diplomat::Kv.get(inited_key, {}, :return, :return)
  true
rescue
  false
end

#get_all_internal(kind) ⇒ Object

Since:

  • 5.5.0



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 68

def get_all_internal(kind)
  items_out = {}
  results = Diplomat::Kv.get(kind_key(kind), { recurse: true }, :return)
  (results == "" ? [] : results).each do |result|
    value = result[:value]
    unless value.nil?
      item = Model.deserialize(kind, value)
      items_out[item[:key].to_sym] = item
    end
  end
  items_out
end

#get_internal(kind, key) ⇒ Object

Since:

  • 5.5.0



63
64
65
66
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 63

def get_internal(kind, key)
  value = Diplomat::Kv.get(item_key(kind, key), {}, :return)  # :return means "don't throw an error if not found"
  (value.nil? || value == "") ? nil : Model.deserialize(kind, value)
end

#init_internal(all_data) ⇒ Object

Since:

  • 5.5.0



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 30

def init_internal(all_data)
  # Start by reading the existing keys; we will later delete any of these that weren't in all_data.
  unused_old_keys = Set.new
  keys = Diplomat::Kv.get(@prefix, { keys: true, recurse: true }, :return)
  unused_old_keys.merge(keys) if keys != ""

  ops = []
  num_items = 0

  # Insert or update every provided item
  all_data.each do |kind, items|
    items.values.each do |item|
      value = Model.serialize(kind, item)
      key = item_key(kind, item[:key])
      ops.push({ 'KV' => { 'Verb' => 'set', 'Key' => key, 'Value' => value } })
      unused_old_keys.delete(key)
      num_items = num_items + 1
    end
  end

  # Now delete any previously existing items whose keys were not in the current data
  unused_old_keys.each do |key|
    ops.push({ 'KV' => { 'Verb' => 'delete', 'Key' => key } })
  end

  # Now set the special key that we check in initialized_internal?
  ops.push({ 'KV' => { 'Verb' => 'set', 'Key' => inited_key, 'Value' => '' } })

  ConsulUtil.batch_operations(ops)

  @logger.info { "Initialized database with #{num_items} items" }
end

#initialized_internal?Boolean

Returns:

  • (Boolean)

Since:

  • 5.5.0



111
112
113
114
115
116
117
118
119
120
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 111

def initialized_internal?
  # Unfortunately we need to use exceptions here, instead of the :return parameter, because with
  # :return there's no way to distinguish between a missing value and an empty string.
  begin
    Diplomat::Kv.get(inited_key, {})
    true
  rescue Diplomat::KeyNotFound
    false
  end
end

#stopObject

Since:

  • 5.5.0



134
135
136
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 134

def stop
  # There's no Consul client instance to dispose of
end

#upsert_internal(kind, new_item) ⇒ Object

Since:

  • 5.5.0



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 81

def upsert_internal(kind, new_item)
  key = item_key(kind, new_item[:key])
  json = Model.serialize(kind, new_item)

  # We will potentially keep retrying indefinitely until someone's write succeeds
  while true
    old_value = Diplomat::Kv.get(key, { decode_values: true }, :return)
    if old_value.nil? || old_value == ""
      mod_index = 0
    else
      old_item = Model.deserialize(kind, old_value[0]["Value"])
      # Check whether the item is stale. If so, don't do the update (and return the existing item to
      # FeatureStoreWrapper so it can be cached)
      if old_item[:version] >= new_item[:version]
        return old_item
      end
      mod_index = old_value[0]["ModifyIndex"]
    end

    # Otherwise, try to write. We will do a compare-and-set operation, so the write will only succeed if
    # the key's ModifyIndex is still equal to the previous value. If the previous ModifyIndex was zero,
    # it means the key did not previously exist and the write will only succeed if it still doesn't exist.
    success = Diplomat::Kv.put(key, json, cas: mod_index)
    return new_item if success

    # If we failed, retry the whole shebang
    @logger.debug { "Concurrent modification detected, retrying" }
  end
end