Class: LaunchDarkly::Impl::Integrations::Consul::ConsulFeatureStoreCore
- Inherits:
-
Object
- Object
- LaunchDarkly::Impl::Integrations::Consul::ConsulFeatureStoreCore
- Defined in:
- lib/ldclient-rb/impl/integrations/consul_impl.rb
Overview
Internal implementation of the Consul feature store, intended to be used with CachingStoreWrapper.
Instance Method Summary collapse
- #available? ⇒ Boolean
- #get_all_internal(kind) ⇒ Object
- #get_internal(kind, key) ⇒ Object
- #init_internal(all_data) ⇒ Object
-
#initialize(opts) ⇒ ConsulFeatureStoreCore
constructor
A new instance of ConsulFeatureStoreCore.
- #initialized_internal? ⇒ Boolean
- #stop ⇒ Object
- #upsert_internal(kind, new_item) ⇒ Object
Constructor Details
#initialize(opts) ⇒ ConsulFeatureStoreCore
Returns a new instance of ConsulFeatureStoreCore.
19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 19 def initialize(opts) unless CONSUL_ENABLED raise RuntimeError.new("can't use Consul feature store without the 'diplomat' gem") end @prefix = (opts[:prefix] || LaunchDarkly::Integrations::Consul.default_prefix) + '/' @logger = opts[:logger] || Config.default_logger Diplomat.configuration = opts[:consul_config] unless opts[:consul_config].nil? Diplomat.configuration.url = opts[:url] unless opts[:url].nil? @logger.info("ConsulFeatureStore: using Consul host at #{Diplomat.configuration.url}") end |
Instance Method Details
#available? ⇒ Boolean
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 123 def available? # Most implementations use the initialized_internal? method as a # proxy for this check. However, since `initialized_internal?` # catches a KeyNotFound exception, and that exception can be raised # when the server goes away, we have to modify our behavior # slightly. Diplomat::Kv.get(inited_key, {}, :return, :return) true rescue false end |
#get_all_internal(kind) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 69 def get_all_internal(kind) items_out = {} results = Diplomat::Kv.get(kind_key(kind), { recurse: true }, :return) (results == "" ? [] : results).each do |result| value = result[:value] unless value.nil? item = Model.deserialize(kind, value) items_out[item[:key].to_sym] = item end end items_out end |
#get_internal(kind, key) ⇒ Object
64 65 66 67 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 64 def get_internal(kind, key) value = Diplomat::Kv.get(item_key(kind, key), {}, :return) # :return means "don't throw an error if not found" (value.nil? || value == "") ? nil : Model.deserialize(kind, value) end |
#init_internal(all_data) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 31 def init_internal(all_data) # Start by reading the existing keys; we will later delete any of these that weren't in all_data. unused_old_keys = Set.new keys = Diplomat::Kv.get(@prefix, { keys: true, recurse: true }, :return) unused_old_keys.merge(keys) if keys != "" ops = [] num_items = 0 # Insert or update every provided item all_data.each do |kind, items| items.values.each do |item| value = Model.serialize(kind, item) key = item_key(kind, item[:key]) ops.push({ 'KV' => { 'Verb' => 'set', 'Key' => key, 'Value' => value } }) unused_old_keys.delete(key) num_items = num_items + 1 end end # Now delete any previously existing items whose keys were not in the current data unused_old_keys.each do |key| ops.push({ 'KV' => { 'Verb' => 'delete', 'Key' => key } }) end # Now set the special key that we check in initialized_internal? ops.push({ 'KV' => { 'Verb' => 'set', 'Key' => inited_key, 'Value' => '' } }) ConsulUtil.batch_operations(ops) @logger.info { "Initialized database with #{num_items} items" } end |
#initialized_internal? ⇒ Boolean
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 112 def initialized_internal? # Unfortunately we need to use exceptions here, instead of the :return parameter, because with # :return there's no way to distinguish between a missing value and an empty string. begin Diplomat::Kv.get(inited_key, {}) true rescue Diplomat::KeyNotFound false end end |
#stop ⇒ Object
135 136 137 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 135 def stop # There's no Consul client instance to dispose of end |
#upsert_internal(kind, new_item) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/ldclient-rb/impl/integrations/consul_impl.rb', line 82 def upsert_internal(kind, new_item) key = item_key(kind, new_item[:key]) json = Model.serialize(kind, new_item) # We will potentially keep retrying indefinitely until someone's write succeeds while true old_value = Diplomat::Kv.get(key, { decode_values: true }, :return) if old_value.nil? || old_value == "" mod_index = 0 else old_item = Model.deserialize(kind, old_value[0]["Value"]) # Check whether the item is stale. If so, don't do the update (and return the existing item to # FeatureStoreWrapper so it can be cached) if old_item[:version] >= new_item[:version] return old_item end mod_index = old_value[0]["ModifyIndex"] end # Otherwise, try to write. We will do a compare-and-set operation, so the write will only succeed if # the key's ModifyIndex is still equal to the previous value. If the previous ModifyIndex was zero, # it means the key did not previously exist and the write will only succeed if it still doesn't exist. success = Diplomat::Kv.put(key, json, cas: mod_index) return new_item if success # If we failed, retry the whole shebang @logger.debug { "Concurrent modification detected, retrying" } end end |