Skip to content
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions apisix/plugins/ai-rate-limiting.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ local ipairs = ipairs
local type = type
local core = require("apisix.core")
local limit_count = require("apisix.plugins.limit-count.init")
local policy_to_additional_properties = require("apisix.utils.redis-schema").schema

local plugin_name = "ai-rate-limiting"

Expand Down Expand Up @@ -56,6 +57,12 @@ local schema = {
rejected_msg = {
type = "string", minLength = 1
},
policy = {
type = "string",
enum = {"local", "redis", "redis-cluster"},
default = "local",
},
allow_degradation = {type = "boolean", default = false},
},
dependencies = {
limit = {"time_window"},
Expand All @@ -68,6 +75,24 @@ local schema = {
{
required = {"instances"}
}
},
["if"] = {
properties = {
policy = {
enum = {"redis"},
},
},
},
["then"] = policy_to_additional_properties.redis,
["else"] = {
["if"] = {
properties = {
policy = {
enum = {"redis-cluster"},
},
},
},
["then"] = policy_to_additional_properties["redis-cluster"],
}
}

Expand Down Expand Up @@ -99,7 +124,8 @@ local function transform_limit_conf(plugin_conf, instance_conf, instance_name)
limit = instance_conf.limit
time_window = instance_conf.time_window
end
return {

local limit_conf = {
_vid = key,

key = key,
Expand All @@ -109,15 +135,36 @@ local function transform_limit_conf(plugin_conf, instance_conf, instance_name)
rejected_msg = plugin_conf.rejected_msg,
show_limit_quota_header = plugin_conf.show_limit_quota_header,
-- limit-count need these fields
policy = "local",
policy = plugin_conf.policy or "local",
key_type = "constant",
allow_degradation = false,
allow_degradation = plugin_conf.allow_degradation or false,
sync_interval = -1,

limit_header = "X-AI-RateLimit-Limit-" .. name,
remaining_header = "X-AI-RateLimit-Remaining-" .. name,
reset_header = "X-AI-RateLimit-Reset-" .. name,
}

-- Pass through Redis configuration if policy is redis or redis-cluster
if plugin_conf.policy == "redis" then
limit_conf.redis_host = plugin_conf.redis_host
limit_conf.redis_port = plugin_conf.redis_port
limit_conf.redis_username = plugin_conf.redis_username
limit_conf.redis_password = plugin_conf.redis_password
limit_conf.redis_database = plugin_conf.redis_database
limit_conf.redis_timeout = plugin_conf.redis_timeout
limit_conf.redis_ssl = plugin_conf.redis_ssl
limit_conf.redis_ssl_verify = plugin_conf.redis_ssl_verify
elseif plugin_conf.policy == "redis-cluster" then
limit_conf.redis_cluster_nodes = plugin_conf.redis_cluster_nodes
limit_conf.redis_cluster_name = plugin_conf.redis_cluster_name
limit_conf.redis_password = plugin_conf.redis_password
limit_conf.redis_timeout = plugin_conf.redis_timeout
limit_conf.redis_cluster_ssl = plugin_conf.redis_cluster_ssl
limit_conf.redis_cluster_ssl_verify = plugin_conf.redis_cluster_ssl_verify
end

return limit_conf
end


Expand Down
11 changes: 9 additions & 2 deletions apisix/plugins/limit-count/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
--
local core = require("apisix.core")
local apisix_plugin = require("apisix.plugin")
local ngx = ngx
local tab_insert = table.insert
local ipairs = ipairs
local pairs = pairs
Expand Down Expand Up @@ -234,6 +235,7 @@ local function gen_limit_obj(conf, ctx, plugin_name)
return core.lrucache.plugin_ctx(lrucache, ctx, extra_key, create_limit_obj, conf, plugin_name)
end


function _M.rate_limit(conf, ctx, name, cost, dry_run)
core.log.info("ver: ", ctx.conf_version)
core.log.info("conf: ", core.json.delay_encode(conf, true))
Expand Down Expand Up @@ -275,11 +277,17 @@ function _M.rate_limit(conf, ctx, name, cost, dry_run)
key = gen_limit_key(conf, ctx, key)
core.log.info("limit key: ", key)

local phase = get_phase()
local delay, remaining, reset
if not conf.policy or conf.policy == "local" then
delay, remaining, reset = lim:incoming(key, not dry_run, conf, cost)
elseif phase == "log" then
local ok, err = lim:log_phase_incoming(key, cost, dry_run)
if not ok then
core.log.error("failed to record rate limit: ", err)
end
else
delay, remaining, reset = lim:incoming(key, cost)
delay, remaining, reset = lim:incoming(key, cost, dry_run)
end

local metadata = apisix_plugin.plugin_metadata("limit-count")
Expand All @@ -295,7 +303,6 @@ function _M.rate_limit(conf, ctx, name, cost, dry_run)
remaining_header = conf.remaining_header or metadata.remaining_header,
reset_header = conf.reset_header or metadata.reset_header,
}
local phase = get_phase()
local set_header = phase ~= "log"

if not delay then
Expand Down
53 changes: 25 additions & 28 deletions apisix/plugins/limit-count/limit-count-redis-cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

local redis_cluster = require("apisix.utils.rediscluster")
local core = require("apisix.core")
local ngx = ngx
local setmetatable = setmetatable
local tostring = tostring
local util = require("apisix.plugins.limit-count.util")
local ngx_timer_at = ngx.timer.at

local _M = {}

Expand All @@ -28,17 +29,6 @@ local mt = {
}


local script = core.string.compress_script([=[
assert(tonumber(ARGV[3]) >= 1, "cost must be at least 1")
local ttl = redis.call('ttl', KEYS[1])
if ttl < 0 then
redis.call('set', KEYS[1], ARGV[1] - ARGV[3], 'EX', ARGV[2])
return {ARGV[1] - ARGV[3], ARGV[2]}
end
return {redis.call('incrby', KEYS[1], 0 - ARGV[3]), ttl}
]=])


function _M.new(plugin_name, limit, window, conf)
local red_cli, err = redis_cluster.new(conf, "plugin-limit-count-redis-cluster-slot-lock")
if not red_cli then
Expand All @@ -57,26 +47,33 @@ function _M.new(plugin_name, limit, window, conf)
end


function _M.incoming(self, key, cost)
local red = self.red_cli
local limit = self.limit
local window = self.window
key = self.plugin_name .. tostring(key)
function _M.incoming(self, key, cost, dry_run)
local commit = true
if dry_run ~= nil then
commit = not dry_run
end

local ttl = 0
local res, err = red:eval(script, 1, key, limit, window, cost or 1)
return util.redis_incoming(self, self.red_cli, key, commit, cost)
end

if err then
return nil, err, ttl
end

local remaining = res[1]
ttl = res[2]
local function log_phase_incoming_thread(premature, self, key, cost)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should be placed in the ai-rate-limiting plugin, not in the limit-count plugin itself, because this code is only useful for ai-rate-limiting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. Similar logic also appears in the limit-conn.

local function leaving_thread(premature, self, key, req_latency)
local conf = self.conf
local red, err = redis.new(conf)
if not red then
return red, err
end
return util.leaving(self, red, key, req_latency)
end
function _M.leaving(self, key, req_latency)
-- log_by_lua can't use cosocket
local ok, err = ngx_timer_at(0, leaving_thread, self, key, req_latency)
if not ok then
core.log.error("failed to create timer: ", err)
return nil, err
end
return ok
end

If I put this logic into ai-rate-limiting plugin, it will make the plugin too complicated. I have to copy a lot of logic from limit-count. Currently, the ai-rate-limiting is just a simple wrapper of limit-count.

Or, do you think I need to rewrite the ai-rate-limiting to something like limit-ai-redis.lua and limit-ai-redis-cluster.lua?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is quite large and hard to review. If you agree, I can split it into 3 separate PRs:

  1. A PR to improve the tests for the rate-limiting plugins.
  2. A PR to add support for the log phase.
  3. A PR to add Redis support to the ai-rate-limiting.

return util.redis_log_phase_incoming(self, self.red_cli, key, cost)
end


function _M.log_phase_incoming(self, key, cost, dry_run)
if dry_run then
return true
end

if remaining < 0 then
return nil, "rejected", ttl
local ok, err = ngx_timer_at(0, log_phase_incoming_thread, self, key, cost)
if not ok then
core.log.error("failed to create timer: ", err)
return nil, err
end
return 0, remaining, ttl

return ok
end


Expand Down
66 changes: 38 additions & 28 deletions apisix/plugins/limit-count/limit-count-redis.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
--
local redis = require("apisix.utils.redis")
local core = require("apisix.core")
local ngx = ngx
local assert = assert
local setmetatable = setmetatable
local tostring = tostring
local util = require("apisix.plugins.limit-count.util")
local ngx_timer_at = ngx.timer.at


local _M = {version = 0.3}
Expand All @@ -29,17 +31,6 @@ local mt = {
}


local script = core.string.compress_script([=[
assert(tonumber(ARGV[3]) >= 1, "cost must be at least 1")
local ttl = redis.call('ttl', KEYS[1])
if ttl < 0 then
redis.call('set', KEYS[1], ARGV[1] - ARGV[3], 'EX', ARGV[2])
return {ARGV[1] - ARGV[3], ARGV[2]}
end
return {redis.call('incrby', KEYS[1], 0 - ARGV[3]), ttl}
]=])


function _M.new(plugin_name, limit, window, conf)
assert(limit > 0 and window > 0)

Expand All @@ -52,37 +43,56 @@ function _M.new(plugin_name, limit, window, conf)
return setmetatable(self, mt)
end

function _M.incoming(self, key, cost)

function _M.incoming(self, key, cost, dry_run)
local conf = self.conf
local red, err = redis.new(conf)
if not red then
return red, err, 0
end

local limit = self.limit
local window = self.window
local res
key = self.plugin_name .. tostring(key)

local ttl = 0
res, err = red:eval(script, 1, key, limit, window, cost or 1)

if err then
return nil, err, ttl
local commit = true
if dry_run ~= nil then
commit = not dry_run
end

local remaining = res[1]
ttl = res[2]
local delay, remaining, ttl = util.redis_incoming(self, red, key, commit, cost)
if not delay then
local err = remaining
return nil, err, ttl or 0
end

local ok, err = red:set_keepalive(10000, 100)
if not ok then
return nil, err, ttl
end

if remaining < 0 then
return nil, "rejected", ttl
return delay, remaining, ttl
end


local function log_phase_incoming_thread(premature, self, key, cost)
local conf = self.conf
local red, err = redis.new(conf)
if not red then
return red, err
end
return util.redis_log_phase_incoming(self, red, key, cost)
end


function _M.log_phase_incoming(self, key, cost, dry_run)
if dry_run then
return true
end
return 0, remaining, ttl

local ok, err = ngx_timer_at(0, log_phase_incoming_thread, self, key, cost)
if not ok then
core.log.error("failed to create timer: ", err)
return nil, err
end

return ok
end


Expand Down
Loading
Loading