任务配置

在tl-ops-manage中。动态配置是一大特性,支持动态更新定时任务中的配置,无需重启nginx或定时任务即可修改规则策略等, 如 “健康检查配置”,“熔断限流配置”,“服务节点配置” 这些需要依赖定时任务的都属于定时任务配置。

依靠shared共享内存实现,多个worker中依赖一个公共的配置版本号,若有某个worker检测到配置的新增或变动,自增版本号,其他worker执行定时任务前,与共享版本号对比自身配置版本号,若存在新增或变动,同步最新配置到worker内即可

例子

以健康检查配置同步为例,我们可以看到代码如下,该文件提供两个接口, tl_ops_health_check_version_incr_service_versiontl_ops_health_check_version_incr_service_option_version,一个用于控制服务节点的配置数据版本号,一个用于控制服务配置数据版本号。

服务节点配置数据版本号 :例如,某服务下新增一个节点,那么此节点应该被自检定时器所识别,并加入自检列表中,这样才能达到动态节点注册的效果。

服务配置数据版本号 : 例如,健康检查配置的某一项变动,如,心跳包状态码新增 203 也为成功,那么这个改动应该同步到自检中的定时器中,以达到动态修改的目的。

健康检查周期时间 : 例如,在某些情况下,需要调整健康检查的定时器周期时间,在调整后动态变更定时器的运行时间间隔。

# 代码位置 : health/tl_ops_health_check_version.lua

-- 更新当前service的状态版本,用于通知其他worker进程同步最新conf
local tl_ops_health_check_version_incr_service_version = function( service_name )
    if not service_name then
        tlog:err(" service_name nil ")
        return
    end
    local key = tl_ops_utils_func:gen_node_key(tl_ops_constant_health.cache_key.service_version, service_name)
    local service_version, _ = cache_dict:get(key)

    if not service_version then
        service_version, _ = cache_dict:add(key, 1);
        if not service_version then 
            tlog:err(" failed to publish new service_version:" , _)
        end
    else 
        service_version, _ = cache_dict:incr(key, 1);
        if not service_version then 
            tlog:err(" failed to publish new service_version:" , _)
        end
    end

    return service_version
end


-- 对service_options_version更新,通知timer检查是否有新增service
local tl_ops_health_check_version_incr_service_option_version = function(  )
    local res, _ = cache_dict:set(tl_ops_constant_health.cache_key.service_options_version, true)

    if not res then
        tlog:err(" set service_options_version err " , _)
    end
end

目前支持的配置同步为两种,增量配置同步修改配置同步。即将支持 删除配置同步。对于增量配置同步和修改配置同步所用的定时器逻辑是不同的。因为增量配置同步需要启动新的定时器,而修改配置是在定时器的基础上去同步配置即可,无需新增定时器。

增量配置同步

代码位置 : health/tl_ops_health_check_dynamic_conf.lua

- 获取当前健康检查的所有service,并对新增的service启动定时器
local tl_ops_health_check_dynamic_conf_add_core = function(options, services)

    -- 暂时还有service的option未同步对应的,先不执行,等到option准备完毕再执行后续逻辑
    local all_service_option_asynced = tl_ops_health_check_dynamic_conf_all_service_option_asynced(options, services)
    if not all_service_option_asynced then
        return
    end

    -- 查看现在已有的service timer,如果没有,说明首次启动,为所有service启动timer
    local timers_str = shared:get(tl_ops_constant_health.cache_key.timers)
    if not timers_str then

        require("health.tl_ops_health_check"):new(options, services):tl_ops_health_check_start();
        shared:set(tl_ops_constant_health.cache_key.service_options_version, nil)
        return
    end

    -- 如果有,查看cache service中的所有服务是否都已启动timer,如果没有, 补充启动相应service timer
    local timers_list = cjson.decode(timers_str)
    for service_name, nodes in pairs(services) do
        local service_name_exist = false
        for i = 1, #timers_list do
            if service_name == timers_list[i] then
                service_name_exist = true;
            end
        end
        if service_name_exist == true then
            tlog:dbg("[add-check] timer exist , service_name=",service_name)
        else
            local matcher_options = tl_ops_health_check_dynamic_conf_get_option( options, service_name)

            require("health.tl_ops_health_check"):new(matcher_options, services):tl_ops_health_check_start();
            shared:set(tl_ops_constant_health.cache_key.service_options_version, nil)
        end
    end
end

-- 同步新增的service option
local tl_ops_health_check_dynamic_conf_add_check = function()

    local version, _ = shared:get(tl_ops_constant_health.cache_key.service_options_version)
    if not version then
        return
    end

    local options_str, _ = cache_health:get(tl_ops_constant_health.cache_key.options_list)
    if not options_str then
        tlog:dbg("[add-check] load dynamic options failed , options_str=",options_str)
        return
    end
    local dynamic_options = cjson.decode(options_str)

    local cache_service = require("cache.tl_ops_cache_core"):new("tl-ops-service");
    local service_str, _ = cache_service:get(tl_ops_constant_service.cache_key.service_list)
    if not service_str then
        tlog:dbg("[add-check] load dynamic service failed , service_str=",service_str)
        return
    end
    local dynamic_service = cjson.decode(service_str)

    if dynamic_options and dynamic_service then
        tl_ops_health_check_dynamic_conf_add_core(dynamic_options, dynamic_service)
        tlog:dbg("[add-check] async dynamic conf done")
    end
end

修改配置同步

代码位置 : health/tl_ops_health_check_dynamic_conf.lua

-- 同步变更的service信息
local tl_ops_health_check_dynamic_conf_change_core = function( conf, service_version )

    -- 保证更新顺序,service/options > service.nodes > node.state 
    tl_ops_health_check_dynamic_conf_change_service_options_async(conf)
    tl_ops_health_check_dynamic_conf_change_service_node_async(conf)
    tl_ops_health_check_dynamic_conf_change_state_async(conf)
    conf.service_version = service_version

end

-- 校验是否需要同步conf变更
local tl_ops_health_check_dynamic_conf_change_check = function( conf )
    local key = tl_ops_utils_func:gen_node_key(tl_ops_constant_health.cache_key.service_version, conf.check_service_name)
    local service_version, _ = shared:get(key)

    if not service_version then
        local ok ,_ = shared:add(key, 1)
        if not ok then
            tlog:err("[change-check] failed to init service_version key, " , _)
        end
        return
    end

    if service_version > conf.service_version then

        tl_ops_health_check_dynamic_conf_change_core( conf, service_version )
    end
end