diff --git a/lib/dalli.rb b/lib/dalli.rb index fae6ae51..c1c02466 100644 --- a/lib/dalli.rb +++ b/lib/dalli.rb @@ -2,6 +2,7 @@ require 'dalli/compressor' require 'dalli/client' require 'dalli/ring' +require 'dalli/protocol/base' require 'dalli/server' require 'dalli/socket' require 'dalli/version' diff --git a/lib/dalli/client.rb b/lib/dalli/client.rb index 63df68c7..5ef7d589 100644 --- a/lib/dalli/client.rb +++ b/lib/dalli/client.rb @@ -30,6 +30,7 @@ class Client # - :compressor - defaults to zlib # - :cache_nils - defaults to false, if true Dalli will not treat cached nil values as 'not found' for #fetch operations. # - :digest_class - defaults to Digest::MD5, allows you to pass in an object that responds to the hexdigest method, useful for injecting a FIPS compliant hash object. + # - :protocol - defaults to :binary. Set to :meta to use the meta text protocol (requires memcached 1.6+). # def initialize(servers=nil, options={}) @servers = normalize_servers(servers || ENV["MEMCACHE_SERVERS"] || '127.0.0.1:11211') @@ -92,7 +93,7 @@ def fetch(key, ttl=nil, options=nil) options = options.nil? ? CACHE_NILS : options.merge(CACHE_NILS) if @options[:cache_nils] val = get(key, options) not_found = @options[:cache_nils] ? - val == Dalli::Server::NOT_FOUND : + val == Dalli::Protocol::Base::NOT_FOUND : val.nil? if not_found && block_given? val = yield @@ -347,18 +348,27 @@ def normalize_servers(servers) def ring @ring ||= Dalli::Ring.new( @servers.map do |s| - server_options = {} + server_options = {} if s =~ %r{\Amemcached://} uri = URI.parse(s) server_options[:username] = uri.user server_options[:password] = uri.password s = "#{uri.host}:#{uri.port}" end - Dalli::Server.new(s, @options.merge(server_options)) + protocol_class.new(s, @options.merge(server_options)) end, @options ) end + def protocol_class + if @options[:protocol] == :meta + require 'dalli/protocol/meta' + Dalli::Protocol::Meta + else + Dalli::Server + end + end + # Chokepoint method for instrumentation def perform(*all_args) return yield if block_given? diff --git a/lib/dalli/protocol/base.rb b/lib/dalli/protocol/base.rb new file mode 100644 index 00000000..b6e7ce10 --- /dev/null +++ b/lib/dalli/protocol/base.rb @@ -0,0 +1,333 @@ +# frozen_string_literal: true +require 'socket' +require 'timeout' + +require 'dalli/pid_cache' + +module Dalli + module Protocol + class Base + attr_accessor :hostname + attr_accessor :port + attr_accessor :weight + attr_accessor :options + attr_reader :sock + attr_reader :socket_type + + DEFAULT_PORT = 11211 + DEFAULT_WEIGHT = 1 + DEFAULTS = { + :down_retry_delay => 60, + :socket_timeout => 0.5, + :socket_max_failures => 2, + :socket_failure_delay => 0.01, + :value_max_bytes => 1024 * 1024, + :error_when_over_max_size => false, + :compressor => Compressor, + :compression_min_size => 1024, + :compression_max_size => false, + :serializer => Marshal, + :username => nil, + :password => nil, + :keepalive => true, + :sndbuf => nil, + :rcvbuf => nil + } + + ALLOWED_MULTI_OPS = %i[set setq delete deleteq add addq replace replaceq].freeze + + # http://www.hjp.at/zettel/m/memcached_flags.rxml + FLAG_SERIALIZED = 0x1 + FLAG_COMPRESSED = 0x2 + + MAX_ACCEPTABLE_EXPIRATION_INTERVAL = 30*24*60*60 # 30 days + + class NilObject; end + NOT_FOUND = NilObject.new + + def initialize(attribs, options = {}) + @hostname, @port, @weight, @socket_type = parse_hostname(attribs) + @fail_count = 0 + @down_at = nil + @last_down_at = nil + @options = DEFAULTS.merge(options) + @sock = nil + @msg = nil + @error = nil + @pid = nil + @inprogress = nil + @pending_multi_response = nil + end + + def name + if socket_type == :unix + hostname + else + "#{hostname}:#{port}" + end + end + + # Chokepoint method for instrumentation + def request(op, *args) + verify_state + raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}. If you are sure it is running, ensure memcached version is > 1.4." unless alive? + begin + if @pending_multi_response && (!multi? || !self.class::ALLOWED_MULTI_OPS.include?(op)) + noop + @pending_multi_response = false + end + send(op, *args) + rescue Dalli::MarshalError => ex + Dalli.logger.error "Marshalling error for key '#{args.first}': #{ex.message}" + Dalli.logger.error "You are trying to cache a Ruby object which cannot be serialized to memcached." + Dalli.logger.error ex.backtrace.join("\n\t") + false + rescue Timeout::Error + close + raise + rescue Dalli::DalliError, Dalli::NetworkError, Dalli::ValueOverMaxSize + raise + rescue => ex + Dalli.logger.error "Unexpected exception during Dalli request: #{ex.class.name}: #{ex.message}" + Dalli.logger.error ex.backtrace.join("\n\t") + down! + end + end + + def alive? + return true if @sock + + if @last_down_at && @last_down_at + options[:down_retry_delay] >= Time.now + time = @last_down_at + options[:down_retry_delay] - Time.now + Dalli.logger.debug { "down_retry_delay not reached for #{name} (%.3f seconds left)" % time } + return false + end + + connect + !!@sock + rescue Dalli::NetworkError + false + end + + def close + return unless @sock + @sock.close rescue nil + @sock = nil + @pid = nil + @inprogress = false + end + + def lock! + end + + def unlock! + end + + def serializer + @options[:serializer] + end + + def compressor + @options[:compressor] + end + + # NOTE: Additional public methods should be overridden in Dalli::Threadsafe + + private + + def verify_state + failure!(RuntimeError.new('Already writing to socket')) if @inprogress + if @pid && @pid != PIDCache.pid + message = 'Fork detected, re-connecting child process...' + Dalli.logger.info { message } + reconnect! message + end + end + + def reconnect!(message) + close + sleep(options[:socket_failure_delay]) if options[:socket_failure_delay] + raise Dalli::NetworkError, message + end + + def failure!(exception) + message = "#{name} failed (count: #{@fail_count}) #{exception.class}: #{exception.message}" + Dalli.logger.warn { message } + + @fail_count += 1 + if @fail_count >= options[:socket_max_failures] + down! + else + reconnect! 'Socket operation failed, retrying...' + end + end + + def down! + close + + @last_down_at = Time.now + + if @down_at + time = Time.now - @down_at + Dalli.logger.debug { "#{name} is still down (for %.3f seconds now)" % time } + else + @down_at = @last_down_at + Dalli.logger.warn { "#{name} is down" } + end + + @error = $! && $!.class.name + @msg = @msg || ($! && $!.message && !$!.message.empty? && $!.message) + raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}" + end + + def up! + if @down_at + time = Time.now - @down_at + Dalli.logger.warn { "#{name} is back (downtime was %.3f seconds)" % time } + end + + @fail_count = 0 + @down_at = nil + @last_down_at = nil + @msg = nil + @error = nil + end + + def multi? + Thread.current[:dalli_multi] + end + + def write(bytes) + begin + @inprogress = true + result = @sock.write(bytes) + @inprogress = false + result + rescue SystemCallError, Timeout::Error => e + failure!(e) + end + end + + def read(count) + begin + @inprogress = true + data = @sock.readfull(count) + @inprogress = false + data + rescue SystemCallError, Timeout::Error, EOFError => e + failure!(e) + end + end + + def serialize(key, value, options=nil) + marshalled = false + value = unless options && options[:raw] + marshalled = true + begin + self.serializer.dump(value) + rescue Timeout::Error => e + raise e + rescue => ex + exc = Dalli::MarshalError.new(ex.message) + exc.set_backtrace ex.backtrace + raise exc + end + else + value.to_s + end + compressed = false + set_compress_option = true if options && options[:compress] + if (@options[:compress] || set_compress_option) && value.bytesize >= @options[:compression_min_size] && + (!@options[:compression_max_size] || value.bytesize <= @options[:compression_max_size]) + value = self.compressor.compress(value) + compressed = true + end + + flags = 0 + flags |= FLAG_COMPRESSED if compressed + flags |= FLAG_SERIALIZED if marshalled + [value, flags] + end + + def deserialize(value, flags) + value = self.compressor.decompress(value) if (flags & FLAG_COMPRESSED) != 0 + value = self.serializer.load(value) if (flags & FLAG_SERIALIZED) != 0 + value + rescue TypeError + raise if $!.message !~ /needs to have method `_load'|exception class\/object expected|instance of IO needed|incompatible marshal file format/ + raise UnmarshalError, "Unable to unmarshal value: #{$!.message}" + rescue ArgumentError + raise if $!.message !~ /undefined class|marshal data too short/ + raise UnmarshalError, "Unable to unmarshal value: #{$!.message}" + rescue NameError + raise if $!.message !~ /uninitialized constant/ + raise UnmarshalError, "Unable to unmarshal value: #{$!.message}" + rescue Zlib::Error + raise UnmarshalError, "Unable to uncompress value: #{$!.message}" + end + + def guard_max_value(key, value) + if value.bytesize <= @options[:value_max_bytes] + yield + else + message = "Value for #{key} over max size: #{@options[:value_max_bytes]} <= #{value.bytesize}" + raise Dalli::ValueOverMaxSize, message if @options[:error_when_over_max_size] + + Dalli.logger.error "#{message} - this value may be truncated by memcached" + false + end + end + + def sanitize_ttl(ttl) + ttl_as_i = ttl.to_i + return ttl_as_i if ttl_as_i <= MAX_ACCEPTABLE_EXPIRATION_INTERVAL + now = Time.now.to_i + return ttl_as_i if ttl_as_i > now + Dalli.logger.debug "Expiration interval (#{ttl_as_i}) too long for Memcached, converting to an expiration timestamp" + now + ttl_as_i + end + + def connect + Dalli.logger.debug { "#{self.class.name}#connect #{name}" } + + begin + @pid = PIDCache.pid + @sock = if socket_type == :unix + Dalli::Socket::UNIX.open(hostname, self, options) + else + Dalli::Socket::TCP.open(hostname, port, self, options) + end + post_connect + @version = version + up! + rescue Dalli::DalliError + raise + rescue SystemCallError, Timeout::Error, EOFError, SocketError => e + failure!(e) + end + end + + def post_connect + end + + def parse_hostname(str) + res = str.match(/\A(\[([\h:]+)\]|[^:]+)(?::(\d+))?(?::(\d+))?\z/) + raise Dalli::DalliError, "Could not parse hostname #{str}" if res.nil? || res[1] == '[]' + hostnam = res[2] || res[1] + if hostnam.start_with?("/") + socket_type = :unix + raise Dalli::DalliError, "Could not parse hostname #{str}" if res[4] + weigh = res[3] + else + socket_type = :tcp + por = res[3] || DEFAULT_PORT + por = Integer(por) + weigh = res[4] + end + weigh ||= DEFAULT_WEIGHT + weigh = Integer(weigh) + return hostnam, por, weigh, socket_type + end + end + end +end diff --git a/lib/dalli/protocol/meta.rb b/lib/dalli/protocol/meta.rb new file mode 100644 index 00000000..ed65a9c4 --- /dev/null +++ b/lib/dalli/protocol/meta.rb @@ -0,0 +1,272 @@ +# frozen_string_literal: true +require 'dalli/protocol/base' + +module Dalli + module Protocol + class Meta < Base + TERMINATOR = "\r\n" + ALLOWED_MULTI_OPS = %i[set delete add replace append prepend].freeze + + def close + super + @response_processor = nil + end + + # Start reading key/value pairs from this connection. This is usually called + # after a series of quiet mg commands. A mn (meta noop) is sent, and the + # server begins flushing responses for kv pairs that were found. + # + # Returns nothing. + def multi_response_start + verify_state + write_noop + @multi_buffer = +'' + @multi_position = 0 + @inprogress = true + end + + def multi_response_completed? + @multi_buffer.nil? + end + + # Attempt to receive and parse as many key/value pairs as possible from + # this server. After #multi_response_start, invoke repeatedly whenever + # this server's socket is readable until #multi_response_completed?. + # + # Returns a Hash of kv pairs received. + def multi_response_nonblock + raise 'multi_response has completed' if @multi_buffer.nil? + + @multi_buffer << @sock.read_available + buf = @multi_buffer + pos = @multi_position + values = {} + + loop do + advance, is_terminal, cas, key, value = response_processor.getk_response_from_buffer(buf, pos) + + if advance.zero? + break + elsif is_terminal && key.nil? + @multi_buffer = nil + @multi_position = nil + @inprogress = false + break + elsif key + begin + values[key] = [value, cas] + rescue DalliError + end + end + + pos += advance + end + + @multi_position = pos if @multi_buffer + values + rescue SystemCallError, Timeout::Error, EOFError => e + failure!(e) + end + + def multi_response_abort + @multi_buffer = nil + @multi_position = nil + @inprogress = false + failure!(RuntimeError.new('External timeout')) + rescue NetworkError + true + end + + # NOTE: Additional public methods should be overridden in Dalli::Threadsafe + + private + + def response_processor + @response_processor ||= ResponseProcessor.new(self) + end + + # Make deserialize accessible to ResponseProcessor + public :deserialize + + # Retrieval Commands + def get(key, options = nil) + encoded_key, base64 = KeyRegularizer.encode(key) + req = RequestFormatter.meta_get(key: encoded_key, base64: base64) + write(req) + response_processor.meta_get_with_value(cache_nils: !!(options && options.is_a?(Hash) && options[:cache_nils])) + end + + def send_multiget(keys) + req = +"" + keys.each do |key| + encoded_key, base64 = KeyRegularizer.encode(key) + req << RequestFormatter.meta_get(key: encoded_key, return_cas: true, base64: base64, quiet: true) + end + write(req) + end + + # Storage Commands + def set(key, value, ttl, cas, options) + write_storage_req(:set, key, value, ttl, cas, options) + @pending_multi_response ||= multi? + response_processor.meta_set_with_cas unless multi? + end + + def add(key, value, ttl, options) + write_storage_req(:add, key, value, ttl, nil, options) + @pending_multi_response ||= multi? + response_processor.meta_set_with_cas unless multi? + end + + def replace(key, value, ttl, cas, options) + write_storage_req(:replace, key, value, ttl, cas, options) + @pending_multi_response ||= multi? + response_processor.meta_set_with_cas unless multi? + end + + def append(key, value) + write_append_prepend_req(:append, key, value) + @pending_multi_response ||= multi? + response_processor.meta_set_append_prepend unless multi? + end + + def prepend(key, value) + write_append_prepend_req(:prepend, key, value) + @pending_multi_response ||= multi? + response_processor.meta_set_append_prepend unless multi? + end + + # Delete Commands + def delete(key, cas) + encoded_key, base64 = KeyRegularizer.encode(key) + req = RequestFormatter.meta_delete(key: encoded_key, cas: cas, base64: base64, quiet: multi?) + write(req) + @pending_multi_response ||= multi? + response_processor.meta_delete unless multi? + end + + # Arithmetic Commands + def decr(key, count, ttl, default) + decr_incr(false, key, count, ttl, default) + end + + def incr(key, count, ttl, default) + decr_incr(true, key, count, ttl, default) + end + + # Other Commands + def flush(ttl) + delay = ttl.to_i + write(RequestFormatter.flush(delay: delay > 0 ? delay : nil)) + response_processor.flush + end + + def noop + write_noop + response_processor.consume_all_responses_until_mn + end + + def stats(info = '') + write(RequestFormatter.stats(info)) + response_processor.stats + end + + def reset_stats + write(RequestFormatter.stats('reset')) + response_processor.reset + end + + def cas(key) + encoded_key, base64 = KeyRegularizer.encode(key) + req = RequestFormatter.meta_get(key: encoded_key, value: true, return_cas: true, base64: base64) + write(req) + response_processor.meta_get_with_value_and_cas + end + + def version + write(RequestFormatter.version) + response_processor.version + end + + def touch(key, ttl) + ttl = sanitize_ttl(ttl) + encoded_key, base64 = KeyRegularizer.encode(key) + req = RequestFormatter.meta_get(key: encoded_key, ttl: ttl, value: false, base64: base64) + write(req) + response_processor.meta_get_without_value + end + + def write_noop + write(RequestFormatter.meta_noop) + end + + def write_storage_req(mode, key, raw_value, ttl, cas, options) + (value, bitflags) = serialize(key, raw_value, options) + ttl = sanitize_ttl(ttl) + encoded_key, base64 = KeyRegularizer.encode(key) + + guard_max_value(key, value) do + req = RequestFormatter.meta_set( + key: encoded_key, value: value, bitflags: bitflags, + cas: cas, ttl: ttl, mode: mode, base64: base64, quiet: multi? + ) + req << value << TERMINATOR + write(req) + end + end + + def write_append_prepend_req(mode, key, value) + encoded_key, base64 = KeyRegularizer.encode(key) + req = RequestFormatter.meta_set( + key: encoded_key, value: value, base64: base64, + mode: mode, quiet: multi? + ) + req << value << TERMINATOR + write(req) + end + + def decr_incr(incr, key, delta, ttl, initial) + ttl = initial ? sanitize_ttl(ttl) : nil + encoded_key, base64 = KeyRegularizer.encode(key) + write(RequestFormatter.meta_arithmetic( + key: encoded_key, delta: delta, initial: initial, + incr: incr, ttl: ttl, base64: base64 + )) + response_processor.decr_incr + end + + def post_connect + if @options[:username] || ENV['MEMCACHE_USERNAME'] + raise Dalli::DalliError, "Meta protocol does not support SASL authentication" + end + @sock.clear_read_buffer if @sock.respond_to?(:clear_read_buffer) + end + + def write(bytes) + begin + @inprogress = true + result = @sock.write(bytes) + @inprogress = false + result + rescue SystemCallError, Timeout::Error => e + failure!(e) + end + end + + def read(count) + begin + @inprogress = true + data = @sock.readfull(count) + @inprogress = false + data + rescue SystemCallError, Timeout::Error, EOFError => e + failure!(e) + end + end + + require_relative 'meta/key_regularizer' + require_relative 'meta/request_formatter' + require_relative 'meta/response_processor' + end + end +end diff --git a/lib/dalli/protocol/meta/key_regularizer.rb b/lib/dalli/protocol/meta/key_regularizer.rb new file mode 100644 index 00000000..c820166b --- /dev/null +++ b/lib/dalli/protocol/meta/key_regularizer.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module Dalli + module Protocol + class Meta + # The meta protocol requires ASCII keys without whitespace. Keys containing + # non-ASCII characters or whitespace are base64-encoded and sent with the + # 'b' flag so the server knows to decode them. + class KeyRegularizer + INVALID_META_KEY_CHARS = /[\s[:cntrl:]]/ + + def self.encode(key) + return [key, false] if key.ascii_only? && !INVALID_META_KEY_CHARS.match?(key) + + [([key].pack('m0')), true] + end + + def self.decode(encoded_key, base64_encoded) + return encoded_key unless base64_encoded + + encoded_key.unpack1('m0') + end + end + end + end +end diff --git a/lib/dalli/protocol/meta/request_formatter.rb b/lib/dalli/protocol/meta/request_formatter.rb new file mode 100644 index 00000000..28471521 --- /dev/null +++ b/lib/dalli/protocol/meta/request_formatter.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +module Dalli + module Protocol + class Meta + class RequestFormatter + TERMINATOR = "\r\n" + + def self.meta_get(key:, value: true, return_cas: false, ttl: nil, base64: false, quiet: false) + if quiet && value && return_cas && !ttl + base64 ? "mg #{key} v f c b k q s\r\n" : "mg #{key} v f c k q s\r\n" + elsif !quiet && value && !return_cas && !ttl + base64 ? "mg #{key} v f b\r\n" : "mg #{key} v f\r\n" + else + cmd = "mg #{key}" + cmd << ' v f' if value + cmd << ' c' if return_cas + cmd << ' b' if base64 + cmd << " T#{ttl}" if ttl + cmd << ' k q s' if quiet + cmd << TERMINATOR + end + end + + APPEND_PREPEND_MODES = %i[append prepend].freeze + + def self.meta_set(key:, value:, bitflags: nil, cas: nil, ttl: nil, mode: :set, base64: false, quiet: false) + cmd = "ms #{key} #{value.bytesize}" + cmd << ' c' unless APPEND_PREPEND_MODES.include?(mode) + cmd << ' b' if base64 + cmd << " F#{bitflags}" if bitflags && bitflags != 0 + cmd << " C#{cas}" if cas && cas != 0 + cmd << " T#{ttl}" if ttl + cmd << " M#{MODE_TOKENS[mode] || 'S'}" + cmd << ' q' if quiet + cmd << TERMINATOR + end + + def self.meta_delete(key:, cas: nil, base64: false, quiet: false) + cmd = "md #{key}" + cmd << ' b' if base64 + cmd << cas_string(cas) + cmd << ' q' if quiet + cmd << TERMINATOR + end + + def self.meta_arithmetic(key:, delta:, initial:, incr: true, cas: nil, ttl: nil, base64: false, quiet: false) + cmd = "ma #{key} v" + cmd << ' b' if base64 + cmd << " D#{delta}" if delta + cmd << " J#{initial}" if initial + cmd << " N#{ttl || 0}" if ttl || initial + cmd << cas_string(cas) + cmd << ' q' if quiet + cmd << " M#{incr ? 'I' : 'D'}" + cmd << TERMINATOR + end + + def self.meta_noop + "mn\r\n" + end + + def self.version + "version\r\n" + end + + def self.flush(delay: nil) + cmd = +'flush_all' + cmd << " #{delay.to_i}" if delay + cmd << TERMINATOR + end + + def self.stats(arg = nil) + cmd = +'stats' + cmd << " #{arg}" if arg && !arg.empty? + cmd << TERMINATOR + end + + MODE_TOKENS = { add: 'E', replace: 'R', append: 'A', prepend: 'P', set: 'S' }.freeze + + def self.cas_string(cas) + cas && cas != 0 ? " C#{cas}" : '' + end + end + end + end +end diff --git a/lib/dalli/protocol/meta/response_processor.rb b/lib/dalli/protocol/meta/response_processor.rb new file mode 100644 index 00000000..985f7917 --- /dev/null +++ b/lib/dalli/protocol/meta/response_processor.rb @@ -0,0 +1,208 @@ +# frozen_string_literal: true + +module Dalli + module Protocol + class Meta + class ResponseProcessor + TERMINATOR = "\r\n" + + EN = 'EN' # Key not found (meta get miss) + END_TOKEN = 'END' # Terminator for stats output + EX = 'EX' # Exists/CAS conflict + HD = 'HD' # Success header-only response + MN = 'MN' # Meta noop/pipeline terminator + NF = 'NF' # Not found (non-get operations) + NS = 'NS' # Not stored + OK = 'OK' # Generic success + RESET = 'RESET' # stats reset acknowledgement + STAT = 'STAT' # stats line prefix + VA = 'VA' # Value response header + VERSION = 'VERSION' # version response prefix + SERVER_ERROR = 'SERVER_ERROR' # Server-side error response + + def initialize(server) + @server = server + end + + def meta_get_with_value(cache_nils: false) + line = read_line + return cache_nils ? Dalli::Protocol::Base::NOT_FOUND : nil if line.start_with?(EN) + return true if line.start_with?(HD) + raise Dalli::DalliError, "Response error: #{line}" unless line.start_with?(VA) + + sp1 = 3 + sp2 = line.index(' ', sp1) || line.length + data_size = line.byteslice(sp1, sp2 - sp1).to_i + bitflags = extract_flag_value(line, 'f', sp2) || 0 + @server.deserialize(read_data(data_size), bitflags) + end + + def meta_get_with_value_and_cas + line = read_line + return [nil, 0] if line.start_with?(EN) + raise Dalli::DalliError, "Response error: #{line}" unless line.start_with?(VA) || line.start_with?(HD) + + cas = extract_flag_value(line, 'c', 2) || 0 + return [nil, cas] unless line.start_with?(VA) + + sp1 = 3 + sp2 = line.index(' ', sp1) || line.length + data_size = line.byteslice(sp1, sp2 - sp1).to_i + bitflags = extract_flag_value(line, 'f', sp2) || 0 + [@server.deserialize(read_data(data_size), bitflags), cas] + end + + def meta_get_without_value + line = read_line + return nil if line.start_with?(EN) + return true if line.start_with?(HD) + raise Dalli::DalliError, "Response error: #{line}" + end + + def meta_set_with_cas + line = read_line + return false if line.start_with?(NS) || line.start_with?(NF) || line.start_with?(EX) + raise Dalli::DalliError, "Response error: #{line}" unless line.start_with?(HD) + + extract_flag_value(line, 'c', 2) || 0 + end + + def meta_set_append_prepend + line = read_line + return false if line.start_with?(NS) || line.start_with?(NF) || line.start_with?(EX) + raise Dalli::DalliError, "Response error: #{line}" unless line.start_with?(HD) + + true + end + + def meta_delete + line = read_line + return false if line.start_with?(NF) || line.start_with?(EX) + raise Dalli::DalliError, "Response error: #{line}" unless line.start_with?(HD) + + true + end + + def decr_incr + line = read_line + return false if line.start_with?(NS) || line.start_with?(EX) + return nil if line.start_with?(NF) + raise Dalli::DalliError, "Response error: #{line}" unless line.start_with?(VA) + + read_line.to_i + end + + def stats + line = read_line + values = {} + while !line.start_with?(END_TOKEN) + raise Dalli::DalliError, "Response error: #{line}" unless line.start_with?(STAT) + parts = line.split(nil, 3) + values[parts[1]] = parts[2] + line = read_line + end + values + end + + def flush + line = read_line + raise Dalli::DalliError, "Response error: #{line}" unless line.start_with?(OK) + true + end + + def reset + line = read_line + raise Dalli::DalliError, "Response error: #{line}" unless line.start_with?(RESET) + true + end + + def version + line = read_line + raise Dalli::DalliError, "Response error: #{line}" unless line.start_with?(VERSION) + line.split(nil, 2).last + end + + def consume_all_responses_until_mn + line = read_line + line = read_line while !line.start_with?(MN) + true + end + + # Parse a single getk-style response from the buffer starting at pos. + # Returns [advance, is_terminal, cas, key, value] where advance is + # bytes consumed (0 if incomplete). + def getk_response_from_buffer(buf, pos = 0) + term_idx = buf.index(TERMINATOR, pos) + return [0, nil, nil, nil, nil] unless term_idx + + first_byte = buf.getbyte(pos) + + if first_byte == 77 # 'M' (MN - pipeline complete) + return [term_idx + 2 - pos, true, nil, nil, nil] + end + + unless first_byte == 86 # 'V' (VA - value response) + return [term_idx + 2 - pos, false, nil, nil, nil] + end + + sp1 = pos + 3 + sp2 = buf.index(' ', sp1) || term_idx + body_len = buf.byteslice(sp1, sp2 - sp1).to_i + + header_len = term_idx + 2 - pos + resp_size = header_len + body_len + 2 + return [0, nil, nil, nil, nil] unless buf.bytesize >= pos + resp_size + + body = buf.byteslice(term_idx + 2, body_len) + + key = nil + cas = 0 + bitflags = 0 + base64 = false + scan = sp2 + while scan < term_idx + scan += 1 + flag_byte = buf.getbyte(scan) + next_sp = buf.index(' ', scan + 1) || term_idx + case flag_byte + when 107 then key = buf.byteslice(scan + 1, next_sp - scan - 1) # 'k' + when 99 then cas = buf.byteslice(scan + 1, next_sp - scan - 1).to_i # 'c' + when 102 then bitflags = buf.byteslice(scan + 1, next_sp - scan - 1).to_i # 'f' + when 98 then base64 = true # 'b' + end + scan = next_sp + end + + key = KeyRegularizer.decode(key, true) if key && base64 + value = @server.deserialize(body, bitflags) + + [resp_size, false, cas, key, value] + end + + private + + # Extract a flag value from a response line by scanning for " " + # without splitting the entire line into tokens. + def extract_flag_value(line, flag_char, start_pos) + search = " #{flag_char}" + idx = line.index(search, start_pos) + return nil unless idx + + val_start = idx + 2 + val_end = line.index(' ', val_start) || line.length + return nil if val_start == val_end + + line.byteslice(val_start, val_end - val_start).to_i + end + + def read_line + @server.sock.read_line&.chomp!(TERMINATOR) + end + + def read_data(data_size) + @server.sock.read_from_buffer(data_size + TERMINATOR.bytesize)&.chomp!(TERMINATOR) + end + end + end + end +end diff --git a/lib/dalli/server.rb b/lib/dalli/server.rb index 8e089a73..6285cd59 100644 --- a/lib/dalli/server.rb +++ b/lib/dalli/server.rb @@ -1,148 +1,10 @@ # frozen_string_literal: true -require 'socket' -require 'timeout' - -require 'dalli/pid_cache' +require 'dalli/protocol/base' module Dalli - class Server - attr_accessor :hostname - attr_accessor :port - attr_accessor :weight - attr_accessor :options - attr_reader :sock - attr_reader :socket_type # possible values: :unix, :tcp - - DEFAULT_PORT = 11211 - DEFAULT_WEIGHT = 1 - DEFAULTS = { - # seconds between trying to contact a remote server - :down_retry_delay => 60, - # connect/read/write timeout for socket operations - :socket_timeout => 0.5, - # times a socket operation may fail before considering the server dead - :socket_max_failures => 2, - # amount of time to sleep between retries when a failure occurs - :socket_failure_delay => 0.01, - # max size of value in bytes (default is 1 MB, can be overriden with "memcached -I ") - :value_max_bytes => 1024 * 1024, - # surpassing value_max_bytes either warns (false) or throws (true) - :error_when_over_max_size => false, - :compressor => Compressor, - # min byte size to attempt compression - :compression_min_size => 1024, - # max byte size for compression - :compression_max_size => false, - :serializer => Marshal, - :username => nil, - :password => nil, - :keepalive => true, - # max byte size for SO_SNDBUF - :sndbuf => nil, - # max byte size for SO_RCVBUF - :rcvbuf => nil - } - + class Server < Protocol::Base ALLOWED_MULTI_OPS = %i[set setq delete deleteq add addq replace replaceq].freeze - def initialize(attribs, options = {}) - @hostname, @port, @weight, @socket_type = parse_hostname(attribs) - @fail_count = 0 - @down_at = nil - @last_down_at = nil - @options = DEFAULTS.merge(options) - @sock = nil - @msg = nil - @error = nil - @pid = nil - @inprogress = nil - @pending_multi_response = nil - end - - def name - if socket_type == :unix - hostname - else - "#{hostname}:#{port}" - end - end - - # Chokepoint method for instrumentation - def request(op, *args) - verify_state - raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}. If you are sure it is running, ensure memcached version is > 1.4." unless alive? - begin - # if we have exited a multi block, flush any responses that might still be pending - if @pending_multi_response && (!multi? || !ALLOWED_MULTI_OPS.include?(op)) - noop - @pending_multi_response = false - end - send(op, *args) - rescue Dalli::MarshalError => ex - Dalli.logger.error "Marshalling error for key '#{args.first}': #{ex.message}" - Dalli.logger.error "You are trying to cache a Ruby object which cannot be serialized to memcached." - Dalli.logger.error ex.backtrace.join("\n\t") - false - rescue Timeout::Error - # A Timeout::Error can be injected asynchronously by Thread#raise - # (from Timeout.timeout's watchdog thread) at ANY point in execution. - # If it fires between write(req) and reading the response for ANY - # operation (GET, SET, DELETE, etc.), the server's response remains - # in the socket's receive buffer. Without closing, the next operation - # reads those stale bytes as if they were its own response. - # - # Closing the socket discards any potentially stale data and forces - # a fresh connection on the next operation. This may occasionally - # close a clean socket (if the timeout fired at a "safe" point), but - # the performance cost of an extra reconnect is negligible. - close - raise - rescue Dalli::DalliError, Dalli::NetworkError, Dalli::ValueOverMaxSize - raise - rescue => ex - Dalli.logger.error "Unexpected exception during Dalli request: #{ex.class.name}: #{ex.message}" - Dalli.logger.error ex.backtrace.join("\n\t") - down! - end - end - - def alive? - return true if @sock - - if @last_down_at && @last_down_at + options[:down_retry_delay] >= Time.now - time = @last_down_at + options[:down_retry_delay] - Time.now - Dalli.logger.debug { "down_retry_delay not reached for #{name} (%.3f seconds left)" % time } - return false - end - - connect - !!@sock - rescue Dalli::NetworkError - false - end - - def close - return unless @sock - @sock.close rescue nil - @sock = nil - @pid = nil - @inprogress = false - end - - def lock! - end - - def unlock! - end - - def serializer - @options[:serializer] - end - - def compressor - @options[:compressor] - end - # Start reading key/value pairs from this connection. This is usually called # after a series of GETKQ commands. A NOOP is sent, and the server begins # flushing responses for kv pairs that were found. @@ -228,66 +90,26 @@ def multi_response_abort private - def verify_state - failure!(RuntimeError.new('Already writing to socket')) if @inprogress - if @pid && @pid != PIDCache.pid - message = 'Fork detected, re-connecting child process...' - Dalli.logger.info { message } - reconnect! message - end - end - - def reconnect!(message) - close - sleep(options[:socket_failure_delay]) if options[:socket_failure_delay] - raise Dalli::NetworkError, message - end - - def failure!(exception) - message = "#{name} failed (count: #{@fail_count}) #{exception.class}: #{exception.message}" - Dalli.logger.warn { message } - - @fail_count += 1 - if @fail_count >= options[:socket_max_failures] - down! - else - reconnect! 'Socket operation failed, retrying...' - end - end - - def down! - close - - @last_down_at = Time.now - - if @down_at - time = Time.now - @down_at - Dalli.logger.debug { "#{name} is still down (for %.3f seconds now)" % time } - else - @down_at = @last_down_at - Dalli.logger.warn { "#{name} is down" } + def write(bytes) + begin + @inprogress = true + result = @sock.write(bytes) + @inprogress = false + result + rescue SystemCallError, Timeout::Error => e + failure!(e) end - - @error = $! && $!.class.name - @msg = @msg || ($! && $!.message && !$!.message.empty? && $!.message) - raise Dalli::NetworkError, "#{name} is down: #{@error} #{@msg}" end - def up! - if @down_at - time = Time.now - @down_at - Dalli.logger.warn { "#{name} is back (downtime was %.3f seconds)" % time } + def read(count) + begin + @inprogress = true + data = @sock.readfull(count) + @inprogress = false + data + rescue SystemCallError, Timeout::Error, EOFError => e + failure!(e) end - - @fail_count = 0 - @down_at = nil - @last_down_at = nil - @msg = nil - @error = nil - end - - def multi? - Thread.current[:dalli_multi] end def get(key, options=nil) @@ -301,7 +123,6 @@ def send_multiget(keys) keys.each do |key| req << [REQUEST, OPCODES[:getkq], key.bytesize, 0, 0, 0, key.bytesize, 0, 0, key].pack(FORMAT[:getkq]) end - # Could send noop here instead of in multi_response_start write(req) end @@ -427,61 +248,6 @@ def touch(key, ttl) write_generic [REQUEST, OPCODES[:touch], key.bytesize, 4, 0, 0, key.bytesize + 4, 0, 0, ttl, key].pack(FORMAT[:touch]) end - # http://www.hjp.at/zettel/m/memcached_flags.rxml - # Looks like most clients use bit 0 to indicate native language serialization - # and bit 1 to indicate gzip compression. - FLAG_SERIALIZED = 0x1 - FLAG_COMPRESSED = 0x2 - - def serialize(key, value, options=nil) - marshalled = false - value = unless options && options[:raw] - marshalled = true - begin - self.serializer.dump(value) - rescue Timeout::Error => e - raise e - rescue => ex - # Marshalling can throw several different types of generic Ruby exceptions. - # Convert to a specific exception so we can special case it higher up the stack. - exc = Dalli::MarshalError.new(ex.message) - exc.set_backtrace ex.backtrace - raise exc - end - else - value.to_s - end - compressed = false - set_compress_option = true if options && options[:compress] - if (@options[:compress] || set_compress_option) && value.bytesize >= @options[:compression_min_size] && - (!@options[:compression_max_size] || value.bytesize <= @options[:compression_max_size]) - value = self.compressor.compress(value) - compressed = true - end - - flags = 0 - flags |= FLAG_COMPRESSED if compressed - flags |= FLAG_SERIALIZED if marshalled - [value, flags] - end - - def deserialize(value, flags) - value = self.compressor.decompress(value) if (flags & FLAG_COMPRESSED) != 0 - value = self.serializer.load(value) if (flags & FLAG_SERIALIZED) != 0 - value - rescue TypeError - raise if $!.message !~ /needs to have method `_load'|exception class\/object expected|instance of IO needed|incompatible marshal file format/ - raise UnmarshalError, "Unable to unmarshal value: #{$!.message}" - rescue ArgumentError - raise if $!.message !~ /undefined class|marshal data too short/ - raise UnmarshalError, "Unable to unmarshal value: #{$!.message}" - rescue NameError - raise if $!.message !~ /uninitialized constant/ - raise UnmarshalError, "Unable to unmarshal value: #{$!.message}" - rescue Zlib::Error - raise UnmarshalError, "Unable to uncompress value: #{$!.message}" - end - def data_cas_response (extras, _, status, count, _, cas) = read_header.unpack(CAS_HEADER) data = read(count) if count > 0 @@ -501,34 +267,6 @@ def data_cas_response NORMAL_HEADER = '@4CCnN' KV_HEADER = '@2n@6nN@16Q' - def guard_max_value(key, value) - if value.bytesize <= @options[:value_max_bytes] - yield - else - message = "Value for #{key} over max size: #{@options[:value_max_bytes]} <= #{value.bytesize}" - raise Dalli::ValueOverMaxSize, message if @options[:error_when_over_max_size] - - Dalli.logger.error "#{message} - this value may be truncated by memcached" - false - end - end - - # https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L79 - # > An expiration time, in seconds. Can be up to 30 days. After 30 days, is treated as a unix timestamp of an exact date. - MAX_ACCEPTABLE_EXPIRATION_INTERVAL = 30*24*60*60 # 30 days - def sanitize_ttl(ttl) - ttl_as_i = ttl.to_i - return ttl_as_i if ttl_as_i <= MAX_ACCEPTABLE_EXPIRATION_INTERVAL - now = Time.now.to_i - return ttl_as_i if ttl_as_i > now # already a timestamp - Dalli.logger.debug "Expiration interval (#{ttl_as_i}) too long for Memcached, converting to an expiration timestamp" - now + ttl_as_i - end - - # Implements the NullObject pattern to store an application-defined value for 'Key not found' responses. - class NilObject; end - NOT_FOUND = NilObject.new - def generic_response(unpack=false, cache_nils=false) (extras, _, status, count) = read_header.unpack(NORMAL_HEADER) data = read(count) if count > 0 @@ -549,7 +287,7 @@ def generic_response(unpack=false, cache_nils=false) def cas_response (_, _, status, count, _, cas) = read_header.unpack(CAS_HEADER) - read(count) if count > 0 # this is potential data that we don't care about + read(count) if count > 0 if status == 1 nil elsif status == 2 || status == 5 @@ -580,62 +318,21 @@ def flush_response end end - def write(bytes) - begin - @inprogress = true - result = @sock.write(bytes) - @inprogress = false - result - rescue SystemCallError, Timeout::Error => e - failure!(e) - end - end - - def read(count) - begin - @inprogress = true - data = @sock.readfull(count) - @inprogress = false - data - rescue SystemCallError, Timeout::Error, EOFError => e - failure!(e) - end - end - def read_header read(24) || raise(Dalli::NetworkError, 'No response') end - def connect - Dalli.logger.debug { "Dalli::Server#connect #{name}" } - - begin - @pid = PIDCache.pid - @sock = if socket_type == :unix - Dalli::Socket::UNIX.open(hostname, self, options) - else - Dalli::Socket::TCP.open(hostname, port, self, options) - end - sasl_authentication if need_auth? - @version = version # trigger actual connect - up! - rescue Dalli::DalliError # SASL auth failure - raise - rescue SystemCallError, Timeout::Error, EOFError, SocketError => e - # SocketError = DNS resolution failure - failure!(e) - end - end - def split(n) [n >> 32, 0xFFFFFFFF & n] end + def post_connect + sasl_authentication if need_auth? + end + REQUEST = 0x80 RESPONSE = 0x81 - # Response codes taken from: - # https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped#response-status RESPONSE_CODES = { 0 => 'No error', 1 => 'Key not found', @@ -749,28 +446,6 @@ def sasl_authentication raise Dalli::DalliError, "Error authenticating: #{status}" unless status == 0x21 raise NotImplementedError, "No two-step authentication mechanisms supported" - # (step, msg) = sasl.receive('challenge', content) - # raise Dalli::NetworkError, "Authentication failed" if sasl.failed? || step != 'response' - end - - def parse_hostname(str) - res = str.match(/\A(\[([\h:]+)\]|[^:]+)(?::(\d+))?(?::(\d+))?\z/) - raise Dalli::DalliError, "Could not parse hostname #{str}" if res.nil? || res[1] == '[]' - hostnam = res[2] || res[1] - if hostnam.start_with?("/") - socket_type = :unix - # in case of unix socket, allow only setting of weight, not port - raise Dalli::DalliError, "Could not parse hostname #{str}" if res[4] - weigh = res[3] - else - socket_type = :tcp - por = res[3] || DEFAULT_PORT - por = Integer(por) - weigh = res[4] - end - weigh ||= DEFAULT_WEIGHT - weigh = Integer(weigh) - return hostnam, por, weigh, socket_type end end end diff --git a/lib/dalli/socket.rb b/lib/dalli/socket.rb index 13d2124b..6eca06ec 100644 --- a/lib/dalli/socket.rb +++ b/lib/dalli/socket.rb @@ -38,6 +38,53 @@ def read_available value end + # Read a single \r\n-terminated line from the socket. Uses an internal + # buffer so that bytes read past the line boundary are preserved for + # subsequent read_line or read_from_buffer calls. + def read_line + @read_buffer ||= +"" + loop do + if (idx = @read_buffer.index("\r\n")) + return @read_buffer.slice!(0, idx + 2) + end + result = read_nonblock(8196, exception: false) + case result + when :wait_readable + raise Timeout::Error, "IO timeout: #{safe_options.inspect}" unless IO.select([self], nil, nil, options[:socket_timeout]) + when :wait_writable + raise Timeout::Error, "IO timeout: #{safe_options.inspect}" unless IO.select(nil, [self], nil, options[:socket_timeout]) + when nil + raise Errno::ECONNRESET, "Connection reset: #{safe_options.inspect}" + else + @read_buffer << result + end + end + end + + # Read exactly +count+ bytes from the socket, consuming from the + # internal read buffer first (populated by read_line overshoots). + def read_from_buffer(count) + @read_buffer ||= +"" + while @read_buffer.bytesize < count + result = read_nonblock([count - @read_buffer.bytesize, 8196].max, exception: false) + case result + when :wait_readable + raise Timeout::Error, "IO timeout: #{safe_options.inspect}" unless IO.select([self], nil, nil, options[:socket_timeout]) + when :wait_writable + raise Timeout::Error, "IO timeout: #{safe_options.inspect}" unless IO.select(nil, [self], nil, options[:socket_timeout]) + when nil + raise Errno::ECONNRESET, "Connection reset: #{safe_options.inspect}" + else + @read_buffer << result + end + end + @read_buffer.slice!(0, count) + end + + def clear_read_buffer + @read_buffer = nil + end + def safe_options options.reject { |k, v| [:username, :password].include? k } end diff --git a/test/test_meta_key_regularizer.rb b/test/test_meta_key_regularizer.rb new file mode 100644 index 00000000..de208a3a --- /dev/null +++ b/test/test_meta_key_regularizer.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true +require_relative 'helper' +require 'dalli/protocol/meta' + +describe 'Dalli::Protocol::Meta::KeyRegularizer' do + let(:kr) { Dalli::Protocol::Meta::KeyRegularizer } + + describe '.encode' do + it 'returns ASCII keys unchanged' do + key, base64 = kr.encode('simple_key') + assert_equal 'simple_key', key + assert_equal false, base64 + end + + it 'base64-encodes keys with whitespace' do + key, base64 = kr.encode('key with spaces') + assert_equal true, base64 + refute_match(/\s/, key) + end + + it 'base64-encodes keys with non-ASCII characters' do + key, base64 = kr.encode("key\xC3\xA9") + assert_equal true, base64 + assert key.ascii_only? + end + + it 'base64-encodes keys with tabs' do + key, base64 = kr.encode("key\twith\ttabs") + assert_equal true, base64 + end + + it 'base64-encodes keys with newlines' do + key, base64 = kr.encode("key\nwith\nnewlines") + assert_equal true, base64 + end + + it 'base64-encodes keys with control bytes' do + key, base64 = kr.encode("key\x00with\x01controls") + assert_equal true, base64 + assert key.ascii_only? + end + end + + describe '.decode' do + it 'returns non-base64 keys unchanged' do + assert_equal 'simple_key', kr.decode('simple_key', false) + end + + it 'decodes base64-encoded keys' do + original = 'key with spaces' + encoded, _ = kr.encode(original) + assert_equal original, kr.decode(encoded, true) + end + + it 'round-trips non-ASCII keys' do + original = +"key\xC3\xA9value" + original.force_encoding(Encoding::UTF_8) + encoded, base64 = kr.encode(original) + decoded = kr.decode(encoded, base64) + assert_equal original.bytes, decoded.bytes + end + + it 'preserves binary bytes for non-utf8 keys' do + original = +"key\xFFvalue" + original.force_encoding(Encoding::BINARY) + encoded, base64 = kr.encode(original) + decoded = kr.decode(encoded, base64) + + assert_equal original.bytes, decoded.bytes + assert_equal Encoding::BINARY, decoded.encoding + end + end +end diff --git a/test/test_meta_protocol.rb b/test/test_meta_protocol.rb new file mode 100644 index 00000000..4ec9ea7f --- /dev/null +++ b/test/test_meta_protocol.rb @@ -0,0 +1,374 @@ +# frozen_string_literal: true +require_relative 'helper' +require 'dalli/protocol/meta' + +describe 'Dalli Meta Protocol' do + # Use a dedicated port range for meta protocol tests + META_PORT = 21500 + + def memcached_meta_persistent(port = META_PORT) + dc = start_and_flush_with_retry(port, '', {protocol: :meta}) + yield dc, port if block_given? + end + + describe 'basic operations' do + it 'supports set and get' do + memcached_meta_persistent do |dc| + dc.set('meta_key', 'meta_value') + assert_equal 'meta_value', dc.get('meta_key') + end + end + + it 'supports get returning nil for missing keys' do + memcached_meta_persistent do |dc| + assert_nil dc.get('nonexistent_key_abc123') + end + end + + it 'supports set with TTL' do + memcached_meta_persistent do |dc| + dc.set('ttl_key', 'ttl_value', 1) + assert_equal 'ttl_value', dc.get('ttl_key') + sleep 1.2 + assert_nil dc.get('ttl_key') + end + end + + it 'supports set returning CAS value' do + memcached_meta_persistent do |dc| + result = dc.set('cas_test', 'value1') + assert result.is_a?(Integer) + assert result > 0 + end + end + + it 'supports delete' do + memcached_meta_persistent do |dc| + dc.set('del_key', 'del_value') + assert_equal 'del_value', dc.get('del_key') + assert dc.delete('del_key') + assert_nil dc.get('del_key') + end + end + + it 'supports add' do + memcached_meta_persistent do |dc| + dc.delete('add_key') rescue nil + result = dc.add('add_key', 'add_value') + assert result + assert_equal 'add_value', dc.get('add_key') + + result2 = dc.add('add_key', 'other_value') + assert_equal false, result2 + assert_equal 'add_value', dc.get('add_key') + end + end + + it 'supports replace' do + memcached_meta_persistent do |dc| + dc.set('rep_key', 'original') + result = dc.replace('rep_key', 'replaced') + assert result + assert_equal 'replaced', dc.get('rep_key') + end + end + + it 'supports replace failing on missing key' do + memcached_meta_persistent do |dc| + dc.delete('rep_missing') rescue nil + result = dc.replace('rep_missing', 'value') + assert_equal false, result + end + end + end + + describe 'counter operations' do + it 'supports incr' do + memcached_meta_persistent do |dc| + dc.set('counter', 10, 0, raw: true) + result = dc.incr('counter', 5) + assert_equal 15, result + end + end + + it 'supports decr' do + memcached_meta_persistent do |dc| + dc.set('counter2', 10, 0, raw: true) + result = dc.decr('counter2', 3) + assert_equal 7, result + end + end + + it 'supports incr with default' do + memcached_meta_persistent do |dc| + dc.delete('new_counter') rescue nil + result = dc.incr('new_counter', 1, 0, 100) + assert_equal 100, result + end + end + + it 'supports decr with default' do + memcached_meta_persistent do |dc| + dc.delete('new_dcounter') rescue nil + result = dc.decr('new_dcounter', 1, 0, 50) + assert_equal 50, result + end + end + + it 'incr returns nil when key does not exist and no default' do + memcached_meta_persistent do |dc| + dc.delete('missing_counter') rescue nil + result = dc.incr('missing_counter', 1) + assert_nil result + end + end + end + + describe 'touch' do + it 'updates expiration on existing key' do + memcached_meta_persistent do |dc| + dc.set('touch_key', 'touch_val', 1) + result = dc.touch('touch_key', 60) + assert_equal true, result + sleep 1.2 + assert_equal 'touch_val', dc.get('touch_key') + end + end + + it 'returns nil for missing key' do + memcached_meta_persistent do |dc| + dc.delete('touch_missing') rescue nil + result = dc.touch('touch_missing', 60) + assert_nil result + end + end + end + + describe 'append and prepend' do + it 'supports append' do + memcached_meta_persistent do |dc| + dc.set('append_key', 'hello', 0, raw: true) + dc.append('append_key', ' world') + assert_equal 'hello world', dc.get('append_key', raw: true) + end + end + + it 'supports prepend' do + memcached_meta_persistent do |dc| + dc.set('prepend_key', 'world', 0, raw: true) + dc.prepend('prepend_key', 'hello ') + assert_equal 'hello world', dc.get('prepend_key', raw: true) + end + end + end + + describe 'CAS operations' do + it 'supports cas (compare and swap)' do + memcached_meta_persistent do |dc| + dc.set('cas_key', 'original') + result = dc.cas('cas_key') do |val| + assert_equal 'original', val + 'updated' + end + assert result + assert_equal 'updated', dc.get('cas_key') + end + end + + it 'cas returns nil for missing key' do + memcached_meta_persistent do |dc| + dc.delete('cas_missing') rescue nil + result = dc.cas('cas_missing') { 'value' } + assert_nil result + end + end + end + + describe 'serialization' do + it 'handles complex Ruby objects' do + memcached_meta_persistent do |dc| + value = {name: 'test', count: 42, nested: [1, 2, 3]} + dc.set('complex_key', value) + assert_equal value, dc.get('complex_key') + end + end + + it 'handles raw string values' do + memcached_meta_persistent do |dc| + dc.set('raw_key', 'raw_value', 0, raw: true) + assert_equal 'raw_value', dc.get('raw_key', raw: true) + end + end + + it 'handles nil values' do + memcached_meta_persistent do |dc| + dc.set('nil_key', nil) + assert_nil dc.get('nil_key') + end + end + + it 'handles integer values' do + memcached_meta_persistent do |dc| + dc.set('int_key', 42) + assert_equal 42, dc.get('int_key') + end + end + end + + describe 'multi-get' do + it 'supports get_multi' do + memcached_meta_persistent do |dc| + dc.set('mk1', 'val1') + dc.set('mk2', 'val2') + dc.set('mk3', 'val3') + + result = dc.get_multi('mk1', 'mk2', 'mk3') + assert_equal 'val1', result['mk1'] + assert_equal 'val2', result['mk2'] + assert_equal 'val3', result['mk3'] + end + end + + it 'omits missing keys from get_multi results' do + memcached_meta_persistent do |dc| + dc.set('mgk1', 'val1') + dc.delete('mgk2') rescue nil + + result = dc.get_multi('mgk1', 'mgk2') + assert_equal 'val1', result['mgk1'] + refute result.key?('mgk2') + end + end + + it 'handles empty key list' do + memcached_meta_persistent do |dc| + result = dc.get_multi + assert_equal({}, result) + end + end + + it 'handles get_multi with complex objects' do + memcached_meta_persistent do |dc| + dc.set('mgo1', {a: 1}) + dc.set('mgo2', [1, 2, 3]) + + result = dc.get_multi('mgo1', 'mgo2') + assert_equal({a: 1}, result['mgo1']) + assert_equal [1, 2, 3], result['mgo2'] + end + end + end + + describe 'multi (quiet) operations' do + it 'supports pipelined set and delete' do + memcached_meta_persistent do |dc| + dc.multi do + dc.set('mq1', 'val1') + dc.set('mq2', 'val2') + end + assert_equal 'val1', dc.get('mq1') + assert_equal 'val2', dc.get('mq2') + + dc.multi do + dc.delete('mq1') + dc.delete('mq2') + end + assert_nil dc.get('mq1') + assert_nil dc.get('mq2') + end + end + end + + describe 'server info' do + it 'supports version' do + memcached_meta_persistent do |dc| + versions = dc.version + refute_empty versions + versions.each_value do |v| + assert_match(/\d+\.\d+/, v) + end + end + end + + it 'supports stats' do + memcached_meta_persistent do |dc| + stats = dc.stats + refute_empty stats + stats.each_value do |s| + assert s.is_a?(Hash) + assert s.key?('pid') + assert s.key?('uptime') + end + end + end + + it 'supports flush' do + memcached_meta_persistent do |dc| + dc.set('flush_test', 'value') + dc.flush + assert_nil dc.get('flush_test') + end + end + end + + describe 'fetch' do + it 'returns cached value' do + memcached_meta_persistent do |dc| + dc.set('fetch_key', 'cached') + result = dc.fetch('fetch_key') { 'computed' } + assert_equal 'cached', result + end + end + + it 'computes and caches on miss' do + memcached_meta_persistent do |dc| + dc.delete('fetch_miss') rescue nil + result = dc.fetch('fetch_miss') { 'computed' } + assert_equal 'computed', result + assert_equal 'computed', dc.get('fetch_miss') + end + end + end + + describe 'cache_nils' do + it 'supports cache_nils option' do + memcached_meta_persistent do |dc, port| + dc_nils = Dalli::Client.new( + ["localhost:#{port}", "127.0.0.1:#{port}"], + protocol: :meta, cache_nils: true + ) + dc_nils.set('nil_cache_key', nil) + result = dc_nils.fetch('nil_cache_key') { 'fallback' } + assert_nil result + end + end + end + + describe 'namespace' do + it 'supports namespaced keys' do + memcached_meta_persistent do |dc, port| + dc_ns = Dalli::Client.new( + ["localhost:#{port}", "127.0.0.1:#{port}"], + protocol: :meta, namespace: 'test_ns' + ) + dc_ns.set('nskey', 'nsval') + assert_equal 'nsval', dc_ns.get('nskey') + + dc_raw = Dalli::Client.new( + ["localhost:#{port}", "127.0.0.1:#{port}"], + protocol: :meta + ) + assert_equal 'nsval', dc_raw.get('test_ns:nskey') + end + end + end + + describe 'error handling' do + it 'rejects SASL with meta protocol' do + assert_raises(Dalli::DalliError) do + dc = Dalli::Client.new('localhost:11211', protocol: :meta, username: 'user', password: 'pass') + dc.alive! + end + end + end +end diff --git a/test/test_meta_request_formatter.rb b/test/test_meta_request_formatter.rb new file mode 100644 index 00000000..3b9838a9 --- /dev/null +++ b/test/test_meta_request_formatter.rb @@ -0,0 +1,160 @@ +# frozen_string_literal: true +require_relative 'helper' +require 'dalli/protocol/meta' + +describe 'Dalli::Protocol::Meta::RequestFormatter' do + let(:fmt) { Dalli::Protocol::Meta::RequestFormatter } + + describe '.meta_get' do + it 'builds a basic get with value and flags' do + assert_equal "mg mykey v f\r\n", fmt.meta_get(key: 'mykey') + end + + it 'omits value flag when value: false' do + assert_equal "mg mykey\r\n", fmt.meta_get(key: 'mykey', value: false) + end + + it 'includes cas flag' do + assert_equal "mg mykey v f c\r\n", fmt.meta_get(key: 'mykey', return_cas: true) + end + + it 'includes base64 flag' do + assert_equal "mg mykey v f b\r\n", fmt.meta_get(key: 'mykey', base64: true) + end + + it 'includes TTL flag' do + assert_equal "mg mykey v f T300\r\n", fmt.meta_get(key: 'mykey', ttl: 300) + end + + it 'includes quiet mode flags for multi-get' do + assert_equal "mg mykey v f c k q s\r\n", fmt.meta_get(key: 'mykey', return_cas: true, quiet: true) + end + + it 'builds touch command (no value)' do + assert_equal "mg mykey T60\r\n", fmt.meta_get(key: 'mykey', value: false, ttl: 60) + end + end + + describe '.meta_set' do + it 'builds a set command' do + result = fmt.meta_set(key: 'mykey', value: 'hello', bitflags: 1, ttl: 300, mode: :set) + assert_equal "ms mykey 5 c F1 T300 MS\r\n", result + end + + it 'builds an add command' do + result = fmt.meta_set(key: 'mykey', value: 'hello', bitflags: 1, ttl: 300, mode: :add) + assert_equal "ms mykey 5 c F1 T300 ME\r\n", result + end + + it 'builds a replace command' do + result = fmt.meta_set(key: 'mykey', value: 'hello', bitflags: 1, ttl: 300, mode: :replace) + assert_equal "ms mykey 5 c F1 T300 MR\r\n", result + end + + it 'builds an append command without cas flag' do + result = fmt.meta_set(key: 'mykey', value: 'data', mode: :append) + assert_equal "ms mykey 4 MA\r\n", result + end + + it 'builds a prepend command without cas flag' do + result = fmt.meta_set(key: 'mykey', value: 'data', mode: :prepend) + assert_equal "ms mykey 4 MP\r\n", result + end + + it 'includes CAS value when provided' do + result = fmt.meta_set(key: 'mykey', value: 'hi', bitflags: 0, cas: 42, ttl: 60, mode: :set) + assert_equal "ms mykey 2 c C42 T60 MS\r\n", result + end + + it 'omits CAS when zero' do + result = fmt.meta_set(key: 'mykey', value: 'hi', bitflags: 0, cas: 0, ttl: 60, mode: :set) + assert_equal "ms mykey 2 c T60 MS\r\n", result + end + + it 'includes quiet flag' do + result = fmt.meta_set(key: 'mykey', value: 'hi', mode: :set, quiet: true) + assert_equal "ms mykey 2 c MS q\r\n", result + end + + it 'includes base64 flag' do + result = fmt.meta_set(key: 'encoded', value: 'x', base64: true, mode: :set) + assert_equal "ms encoded 1 c b MS\r\n", result + end + end + + describe '.meta_delete' do + it 'builds a basic delete' do + assert_equal "md mykey\r\n", fmt.meta_delete(key: 'mykey') + end + + it 'includes CAS value' do + assert_equal "md mykey C99\r\n", fmt.meta_delete(key: 'mykey', cas: 99) + end + + it 'includes quiet flag' do + assert_equal "md mykey q\r\n", fmt.meta_delete(key: 'mykey', quiet: true) + end + + it 'includes base64 flag' do + assert_equal "md mykey b\r\n", fmt.meta_delete(key: 'mykey', base64: true) + end + end + + describe '.meta_arithmetic' do + it 'builds an increment command' do + result = fmt.meta_arithmetic(key: 'cnt', delta: 5, initial: nil, incr: true) + assert_equal "ma cnt v D5 MI\r\n", result + end + + it 'builds a decrement command' do + result = fmt.meta_arithmetic(key: 'cnt', delta: 3, initial: nil, incr: false) + assert_equal "ma cnt v D3 MD\r\n", result + end + + it 'includes initial and TTL when initial is provided' do + result = fmt.meta_arithmetic(key: 'cnt', delta: 1, initial: 100, incr: true, ttl: 60) + assert_equal "ma cnt v D1 J100 N60 MI\r\n", result + end + + it 'sets N0 when initial provided but no ttl' do + result = fmt.meta_arithmetic(key: 'cnt', delta: 1, initial: 10, incr: true) + assert_equal "ma cnt v D1 J10 N0 MI\r\n", result + end + end + + describe '.meta_noop' do + it 'returns mn command' do + assert_equal "mn\r\n", fmt.meta_noop + end + end + + describe '.version' do + it 'returns version command' do + assert_equal "version\r\n", fmt.version + end + end + + describe '.flush' do + it 'returns flush_all without delay' do + assert_equal "flush_all\r\n", fmt.flush + end + + it 'returns flush_all with delay' do + assert_equal "flush_all 5\r\n", fmt.flush(delay: 5) + end + end + + describe '.stats' do + it 'returns stats command' do + assert_equal "stats\r\n", fmt.stats + end + + it 'returns stats with argument' do + assert_equal "stats items\r\n", fmt.stats('items') + end + + it 'handles empty string argument' do + assert_equal "stats\r\n", fmt.stats('') + end + end +end diff --git a/test/test_meta_response_processor.rb b/test/test_meta_response_processor.rb new file mode 100644 index 00000000..ac303842 --- /dev/null +++ b/test/test_meta_response_processor.rb @@ -0,0 +1,205 @@ +# frozen_string_literal: true +require_relative 'helper' +require 'dalli/protocol/meta' + +describe 'Dalli::Protocol::Meta::ResponseProcessor' do + # A minimal mock server that provides sock and deserialize for ResponseProcessor + class MockMetaServer + attr_reader :sock + + def initialize(response_data) + @sock = MockMetaSocket.new(response_data) + end + + def deserialize(value, flags) + if flags && (flags & 0x1) != 0 + Marshal.load(value) + else + value + end + end + end + + class MockMetaSocket + def initialize(data) + @data = +data + end + + def read_line + idx = @data.index("\r\n") + return nil unless idx + @data.slice!(0, idx + 2) + end + + def read_from_buffer(count) + @data.slice!(0, count) + end + + def read_available + result = @data.dup + @data.clear + result + end + end + + def build_processor(response_str) + server = MockMetaServer.new(response_str) + Dalli::Protocol::Meta::ResponseProcessor.new(server) + end + + describe '#meta_get_with_value' do + it 'returns value on VA response' do + proc = build_processor("VA 5 f0\r\nhello\r\n") + assert_equal 'hello', proc.meta_get_with_value + end + + it 'returns nil on EN (not found)' do + proc = build_processor("EN\r\n") + assert_nil proc.meta_get_with_value + end + + it 'returns NOT_FOUND sentinel when cache_nils is true' do + proc = build_processor("EN\r\n") + result = proc.meta_get_with_value(cache_nils: true) + assert_equal Dalli::Protocol::Base::NOT_FOUND, result + end + + it 'returns true on HD (header only, e.g. touch)' do + proc = build_processor("HD\r\n") + assert_equal true, proc.meta_get_with_value + end + end + + describe '#meta_get_with_value_and_cas' do + it 'returns [value, cas] on VA response' do + proc = build_processor("VA 3 f0 c42\r\nfoo\r\n") + value, cas = proc.meta_get_with_value_and_cas + assert_equal 'foo', value + assert_equal 42, cas + end + + it 'returns [nil, 0] on EN' do + proc = build_processor("EN\r\n") + value, cas = proc.meta_get_with_value_and_cas + assert_nil value + assert_equal 0, cas + end + end + + describe '#meta_get_without_value' do + it 'returns true on HD' do + proc = build_processor("HD\r\n") + assert_equal true, proc.meta_get_without_value + end + + it 'returns nil on EN' do + proc = build_processor("EN\r\n") + assert_nil proc.meta_get_without_value + end + end + + describe '#meta_set_with_cas' do + it 'returns CAS on HD' do + proc = build_processor("HD c123\r\n") + assert_equal 123, proc.meta_set_with_cas + end + + it 'returns false on NS (not stored)' do + proc = build_processor("NS\r\n") + assert_equal false, proc.meta_set_with_cas + end + + it 'returns false on NF (not found)' do + proc = build_processor("NF\r\n") + assert_equal false, proc.meta_set_with_cas + end + + it 'returns false on EX (exists / CAS conflict)' do + proc = build_processor("EX\r\n") + assert_equal false, proc.meta_set_with_cas + end + + it 'raises DalliError on unexpected response' do + proc = build_processor("SERVER_ERROR out of memory\r\n") + assert_raises(Dalli::DalliError) { proc.meta_set_with_cas } + end + end + + describe '#meta_delete' do + it 'returns true on HD' do + proc = build_processor("HD\r\n") + assert_equal true, proc.meta_delete + end + + it 'returns false on NF' do + proc = build_processor("NF\r\n") + assert_equal false, proc.meta_delete + end + + it 'raises DalliError on unexpected response' do + proc = build_processor("SERVER_ERROR out of memory\r\n") + assert_raises(Dalli::DalliError) { proc.meta_delete } + end + end + + describe '#decr_incr' do + it 'returns integer value on VA' do + proc = build_processor("VA 2\r\n42\r\n") + assert_equal 42, proc.decr_incr + end + + it 'returns nil on NF' do + proc = build_processor("NF\r\n") + assert_nil proc.decr_incr + end + + it 'returns false on NS' do + proc = build_processor("NS\r\n") + assert_equal false, proc.decr_incr + end + end + + describe '#version' do + it 'parses version response' do + proc = build_processor("VERSION 1.6.40\r\n") + assert_equal '1.6.40', proc.version + end + end + + describe '#flush' do + it 'returns true on OK' do + proc = build_processor("OK\r\n") + assert_equal true, proc.flush + end + end + + describe '#stats' do + it 'parses multiple stat lines' do + data = "STAT pid 12345\r\nSTAT uptime 100\r\nEND\r\n" + proc = build_processor(data) + result = proc.stats + assert_equal '12345', result['pid'] + assert_equal '100', result['uptime'] + end + end + + describe '#consume_all_responses_until_mn' do + it 'consumes responses until MN' do + data = "HD c1\r\nHD c2\r\nMN\r\n" + proc = build_processor(data) + assert_equal true, proc.consume_all_responses_until_mn + end + end + + describe 'error handling' do + it 'raises DalliError on unexpected response' do + proc = build_processor("UNKNOWN\r\n") + assert_raises(Dalli::DalliError) { proc.meta_get_with_value } + end + + it 'raises DalliError on SERVER_ERROR' do + proc = build_processor("SERVER_ERROR out of memory\r\n") + assert_raises(Dalli::DalliError) { proc.meta_get_with_value } + end + end +end