Skip to content

Commit 867e639

Browse files
committed
A little more safety for threads
1 parent 1666e1f commit 867e639

File tree

2 files changed

+26
-17
lines changed

2 files changed

+26
-17
lines changed

lib/flipper/adapters/http.rb

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ module Adapters
1010
class Http
1111
include Flipper::Adapter
1212

13-
attr_reader :client, :last_get_all_response
13+
attr_reader :client
1414

1515
def initialize(options = {})
1616
@client = Client.new(url: options.fetch(:url),
@@ -24,6 +24,8 @@ def initialize(options = {})
2424
debug_output: options[:debug_output])
2525
@last_get_all_etag = nil
2626
@last_get_all_result = nil
27+
@last_get_all_response = nil
28+
@get_all_mutex = Mutex.new
2729
end
2830

2931
def get(feature)
@@ -58,31 +60,30 @@ def get_multi(features)
5860
end
5961

6062
def get_all(cache_bust: false)
63+
options = {}
6164
path = "/features?exclude_gate_names=true"
6265
path += "&_cb=#{Time.now.to_i}" if cache_bust
66+
etag = @get_all_mutex.synchronize { @last_get_all_etag }
6367

64-
# Pass If-None-Match header if we have an ETag
65-
options = {}
66-
if @last_get_all_etag
67-
options[:headers] = { if_none_match: @last_get_all_etag }
68+
if etag
69+
options[:headers] = { if_none_match: etag }
6870
end
6971

7072
response = @client.get(path, options)
71-
72-
@last_get_all_response = response
73+
@get_all_mutex.synchronize { @last_get_all_response = response }
7374

7475
if response.is_a?(Net::HTTPNotModified)
75-
if @last_get_all_result
76-
return @last_get_all_result
76+
cached_result = @get_all_mutex.synchronize { @last_get_all_result }
77+
78+
if cached_result
79+
return cached_result
7780
else
7881
raise Error, response
7982
end
8083
end
8184

8285
raise Error, response unless response.is_a?(Net::HTTPOK)
8386

84-
@last_get_all_etag = response['etag'] if response['etag']
85-
8687
parsed_response = response.body.empty? ? {} : Typecast.from_json(response.body)
8788
parsed_features = parsed_response['features'] || []
8889
gates_by_key = parsed_features.each_with_object({}) do |parsed_feature, hash|
@@ -96,11 +97,18 @@ def get_all(cache_bust: false)
9697
result[feature.key] = result_for_feature(feature, gates_by_key[feature.key])
9798
end
9899

99-
# Cache the result for 304 responses
100-
@last_get_all_result = result
100+
@get_all_mutex.synchronize do
101+
@last_get_all_etag = response['etag'] if response['etag']
102+
@last_get_all_result = result
103+
end
104+
101105
result
102106
end
103107

108+
def last_get_all_response
109+
@get_all_mutex.synchronize { @last_get_all_response }
110+
end
111+
104112
def features
105113
response = @client.get('/features?exclude_gate_names=true')
106114
raise Error, response unless response.is_a?(Net::HTTPOK)

lib/flipper/poller.rb

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
require 'concurrent/utility/monotonic_time'
33
require 'concurrent/map'
44
require 'concurrent/atomic/atomic_fixnum'
5+
require 'concurrent/atomic/atomic_boolean'
56

67
module Flipper
78
class Poller
@@ -31,7 +32,7 @@ def initialize(options = {})
3132
@interval = options.fetch(:interval, 10).to_f
3233
@last_synced_at = Concurrent::AtomicFixnum.new(0)
3334
@adapter = Adapters::Memory.new(nil, threadsafe: true)
34-
@shutdown_requested = false
35+
@shutdown_requested = Concurrent::AtomicBoolean.new(false)
3536

3637
if @interval < MINIMUM_POLL_INTERVAL
3738
warn "Flipper::Cloud poll interval must be greater than or equal to #{MINIMUM_POLL_INTERVAL} but was #{@interval}. Setting @interval to #{MINIMUM_POLL_INTERVAL}."
@@ -47,7 +48,7 @@ def initialize(options = {})
4748

4849
def start
4950
reset if forked?
50-
return if @shutdown_requested
51+
return if @shutdown_requested.true?
5152
ensure_worker_running
5253
end
5354

@@ -82,7 +83,7 @@ def sync
8283
ensure
8384
if @remote_adapter.respond_to?(:last_get_all_response) && @remote_adapter.last_get_all_response
8485
if Flipper::Typecast.to_boolean(@remote_adapter.last_get_all_response["poll-shutdown"])
85-
@shutdown_requested = true
86+
@shutdown_requested.make_true
8687
@instrumenter.instrument("poller.#{InstrumentationNamespace}", {
8788
operation: :shutdown_requested,
8889
})
@@ -130,7 +131,7 @@ def thread_alive?
130131

131132
def reset
132133
@pid = Process.pid
133-
@shutdown_requested = false
134+
@shutdown_requested.make_false
134135
mutex.unlock if mutex.locked?
135136
end
136137
end

0 commit comments

Comments
 (0)