字段同步器
对于字段同步器的实现,我将字段同步器做成配置形式,通过配置不同模块名称,来达到自定义同步部分模块的字段的目的
执行时机
字段同步器在整个nginx生命周期中,只会执行一次,也就是在 init_worker
阶段执行,在此阶段会读取配置的模块的中的静态数据文件,也就是 tl_ops_constant_xxx.lua
文件中的demo字段,将demo字段与各模块的store文件的最新配置进行对比,对增量字段进行补充到store文件中
注意 : 由于是需要依赖各模块的demo字段来做判断依据,所以demo字段对于同步插件来说必不可少。
实现代码
# 代码位置 : plugins/tl_ops_sync/sync_fields.lua
-- 服务节点数据同步
local sync_fields_service = function ()
local cache_key = constant_service.cache_key.service_list
local cache_rule_key = constant_service.cache_key.service_rule
local demo = constant_service.demo
local data_str, _ = cache_service:get(cache_key);
if not data_str then
local res, _ = cache_service:set(cache_key, cjson.encode(constant_service.list))
if not res then
tlog:err("sync_fields_service new store list err, res=",res)
return tl_ops_rt.error
end
data_str, _ = cache_service:get(cache_key);
end
local data_rule_str, _ = cache_service:get(cache_rule_key);
if not data_rule_str then
local res, _ = cache_service:set(cache_rule_key, constant_service.rule.auto_load)
if not res then
tlog:err("sync_fields_service new store rule err, res=",res)
return tl_ops_rt.error
end
end
local data = cjson.decode(data_str);
if not data and type(data) ~= 'table' then
tlog:err("sync_fields_service err, old=",data)
return tl_ops_rt.error
end
local add_keys = {}
for service , _ in pairs(data) do
local nodes = data[service]
if nodes then
-- demo fileds check
for key , _ in pairs(demo) do
-- data fileds check
for i = 1, #nodes do
-- add keys
if nodes[i][key] == nil then
nodes[i][key] = demo[key]
table.insert(add_keys , key)
end
end
end
end
end
local res = cache_service:set(cache_key, cjson.encode(data))
if not res then
tlog:err("sync_fields_service err, res=",res,",new=",data)
return tl_ops_rt.error
end
return tl_ops_rt.ok
end
-- 同步字段主逻辑
function _M:sync_fields_module( module )
if module == 'service' then
return sync_fields_service()
elseif module == 'health' then
return sync_fields_health()
elseif module == 'limit' then
sync_fields_limit_token()
sync_fields_limit_leak()
return sync_fields_limit()
elseif module == 'balance' then
return sync_fields_balance()
elseif module == 'balance_api' then
return sync_fields_balance_api()
elseif module == 'balance_cookie' then
return sync_fields_balance_cookie()
elseif module == 'balance_header' then
return sync_fields_balance_header()
elseif module == 'balance_param' then
return sync_fields_balance_param()
elseif module == 'waf' then
return sync_fields_waf()
elseif module == 'waf_api' then
return sync_fields_waf_api()
elseif module == 'waf_ip' then
return sync_fields_waf_ip()
elseif module == 'waf_header' then
return sync_fields_waf_header()
elseif module == 'waf_cookie' then
return sync_fields_waf_cookie()
elseif module == 'waf_param' then
return sync_fields_waf_param()
elseif module == 'waf_cc' then
return sync_fields_waf_cc()
else
-- plugin
return sync_fields_plugin(module)
end
end
支持插件
考虑到插件也会存在静态数据的变化,同步器插件提供了外部接口来支持其他插件的字段同步, sync_fields
,同步器插件不负责插件的字段同步,只负责外部接口的调用,所以各个插件的字段同步需要自行实现 sync_fields
逻辑代码。
# 代码位置 : plugins/tl_ops_sync/sync_fields.lua
-- 获取某个插件
local sync_fields_get_plugin = function(name)
for i = 1, #tlops.plugins do
local plugin = tlops.plugins[i]
if plugin.name == name then
return plugin
end
end
return nil
end
-- 插件静态配置数据
local sync_fields_plugin = function (module)
local plugin = sync_fields_get_plugin(module)
if not plugin then
tlog:err("sync_fields_plugin not plugin, module=",module)
return tl_ops_rt.error
end
if type(plugin.func.sync_fields) == 'function' then
local ok, _ = plugin.func:sync_fields()
if not ok then
tlog:err("sync_fields_plugin sync_fields err, module=",module,",err=",_)
return tl_ops_rt.error
end
end
tlog:dbg("sync_fields_plugin done, module=",module)
return tl_ops_rt.ok
end