心跳包

在介绍实现之前,首先有必要铺垫一下openresty的执行阶段相关知识。

由于nginx在启动时是多woker进程来处理请求,而openresty将nginx处理请求分为七个阶段。

图片

健康检查的启动应与请求阶段无关,所以应该放在 init_by_lua阶段或者init_worker_by_lua阶段较为合适,但是定时器的生命周期只能在init_worker_by_lua阶段(如下图),所以健康检查的启动器就应在init_worker_by_lua阶段来做。

图片

由于init_worker_by_lua阶段是初始化worker进程阶段,所以在此阶段是存在多个worker进程,也就是可能存在抢占执行定时器的情况。而每个定时器有其依赖的配置,多个worker之间数据不共享,就会导致健康检查数据统计、配置不一致的情况。

图片

所以才会有这样一段逻辑,tl_ops_health_check_get_lock对应的就是加锁逻辑,主要方式是通过 ngx.shared 共享内存来实现,有兴趣可以查看下具体实现代码,这里就不细讲了。

---- 心跳包

if tl_ops_health_check_get_lock(conf) then
    tl_ops_health_check_nodes(conf)
end

我们继续看回代码,在抢占锁后,只会有一个worker进程进入锁内,并执行 tl_ops_health_check_nodes,进行发送心跳包。

对于发送心跳包,我们可以看到是对 服务-节点 依次进行遍历发送socket包(心跳包内容自定义),如心跳周期正常结束,进入 tl_ops_health_check_node_ok 成功逻辑,否则进入tl_ops_health_check_node_failed 失败逻辑

# 代码位置 : health/tl_ops_health_check.lua


-- 对配置的路由机器依次发送心跳包
tl_ops_health_check_nodes = function (conf)

    ...

    for i = 1, #nodes do
        repeat
            local node = nodes[i]
            local node_id = i - 1
            local name = node.ip .. ":" .. node.port

            tlog:dbg("tl_ops_health_check_nodes start connect socket : name=", name)

            local sock, _ = nx_socket()
            if not sock then
                tlog:err("tl_ops_health_check_nodes failed to create stream socket: ", _)
                break
            end
            sock:settimeout(check_timeout)

            -- 心跳socket
            local ok, _ = sock:connect(node.ip, node.port)
            if not ok then
                tlog:err("tl_ops_health_check_nodes failed to connect socket: ", _)
                tl_ops_health_check_node_failed(conf, node_id, node)
                break; 
            end

            tlog:dbg("tl_ops_health_check_nodes connect socket ok : ok=", ok)

            local bytes, _ = sock:send(check_content .. "\r\n\r\n\r\n")
            if not bytes then
                tlog:err("tl_ops_health_check_nodes failed to send socket: ", _)
                sock:close()
                tl_ops_health_check_node_failed(conf, node_id, node)
                break
            end

            tlog:dbg("tl_ops_health_check_nodes send socket ok : byte=", bytes)

            -- socket反馈
            local receive_10k, _ = sock:receiveany(10240)
            if not receive_10k then
                if _ == "check_timeout" then
                    tlog:err("tl_ops_health_check_nodes socket check_timeout: ", _)
                end

                tlog:err("tl_ops_health_check_nodes socket receive failed: ", receive_10k)
                sock:close()
                tl_ops_health_check_node_failed(conf, node_id, node)
                break
            end

            tlog:dbg("tl_ops_health_check_nodes receive socket ok : ", receive_10k)

            local from, to, _ = find(receive_10k, [[^HTTP/\d+\.\d+\s+(\d+)]], "joi", nil, 1)
            if not from then
                tlog:err("tl_ops_health_check_nodes ngx.re.find receive err: ", from, to, _)
                sock:close()
                tl_ops_health_check_node_failed(conf, node_id, node)
                break
            end

            -- 心跳状态
            local status = tonumber(string.sub(receive_10k, from, to))

            tlog:dbg("tl_ops_health_check_nodes get status ok ,name=" ,name, ", status=" , status)
            local statusPass = false;
            for j = 1, #check_success_status do
                if check_success_status[j] == status then
                    statusPass = true
                end
            end

            if statusPass == false then
                tlog:err("tl_ops_health_check_nodes status not pass ,name=" ,name, ", status=" , status)
                tl_ops_health_check_node_failed(conf, node_id, node)
                sock:close()
                break
            end

            -- 心跳成功
            tl_ops_health_check_node_ok(conf, node_id, node)

            tlog:dbg("tl_ops_health_check_nodes node ok")

            sock:close()
            break
        until true
    end

    tlog:dbg("tl_ops_health_check_nodes end ,conf=" , conf, ",nodes=",nodes)
end

心跳成功

心跳成功之后,会累加成功的次数,并清空之前累加的失败次数,当成功累加达到一定的次数后,认为该节点可以用于正常处理请求,即可将改节点状态变更为 上线节点,由此该节点就可以用于请求路由负载节点中

# 代码位置 : health/tl_ops_health_check.lua

-- 心跳检查成功
tl_ops_health_check_node_ok = function (conf, node_id, node)
    tlog:dbg("tl_ops_health_check_node_ok start ,conf=" , conf, ",node=" , node)

    local shared = shared
    local check_success_max_count = conf.check_success_max_count
    local check_service_name = conf.check_service_name

    local key = tl_ops_utils_func:gen_node_key(tl_ops_constant_health.cache_key.success, check_service_name, node_id)
    local cur_success_count, _ = shared:get(key)

    tlog:dbg("tl_ops_health_check_node_ok success key= " , key ,", cur_success_count=" , cur_success_count)

    if not cur_success_count then
        cur_success_count = 1
        local ok, _ = shared:set(key, cur_success_count)
        if not ok then 
            tlog:err("tl_ops_health_check_node_ok failed to set node ok key: " , key)
        end
    else
        cur_success_count = cur_success_count + 1
        local ok, _ = shared:incr(key, 1)
        if not ok then
            tlog:err("tl_ops_health_check_node_ok failed to incr node ok key: "  , key)
        end
    end

    -- 心跳包成功后,重置之前有过累计的失败次数
    if cur_success_count == 1 then
        key = tl_ops_utils_func:gen_node_key(tl_ops_constant_health.cache_key.failed, check_service_name, node_id)
        local fails, _ = shared:get(key)

        tlog:dbg("tl_ops_health_check_node_ok success key= " , key , ",cur_success_count=", cur_success_count, ", check_failed_max_count=" , fails)

        if not fails or fails == 0 then
            if _ then
                tlog:err("tl_ops_health_check_node_ok failed to get node nok key: " , key)
            end
        else
            local ok, _ = shared:set(key, 0)
            if not ok then
                tlog:err("tl_ops_health_check_node_ok failed to set node nok key: " , key)
            end
        end
    end

    -- 该机器当前状态:下线 && 心跳包成功次数 > 配置的次数,将shareDict中该机器的状态置为上线,
    -- {tl_ops_health_check_donw_state:resin-site0:nil}
    if not node.state and cur_success_count >= check_success_max_count then
        local name = node.port .. ":" .. node.ip


        tlog:dbg("tl_ops_health_check_node_ok success count > max success count , state=",node.state," cur_success_count=",cur_success_count, ",ip=" .. node.ip .. ":" .. node.port) 

        key = tl_ops_utils_func:gen_node_key(tl_ops_constant_health.cache_key.state, check_service_name, node_id)
        local ok, _ = shared:set(key, true)
        if not ok then
            tlog:err("tl_ops_health_check_node_ok failed to set node down state:", _)
        end
        node.state = true

        conf.service_version = tl_ops_health_check_version.incr_service_version(check_service_name);

        tlog:dbg("tl_ops_health_check_node_ok conf.service_version=" , conf.service_version)
    end

    tlog:dbg("tl_ops_health_check_node_ok end ,node=" , node)
end

心跳失败

心跳失败之后,会累加失败的次数,并清空之前累加的成功次数,当失败累加达到一定的次数后,认为该节点不可以用于正常处理请求,即可将改节点状态变更为 下线节点,由此该节点就应该剔除在正常服务中,就不能用于路由负载使用

# 代码位置 : health/tl_ops_health_check.lua

-- 心跳检查失败
tl_ops_health_check_node_failed = function (conf, node_id, node)
    tlog:dbg("tl_ops_health_check_node_failed start ,conf=" , conf, ",node=" , node)

    local check_failed_max_count = conf.check_failed_max_count
    local check_service_name = conf.check_service_name

    -- key=tl_ops_health_check_failed_count:resin-site0 (health check not ok)
    local key = tl_ops_utils_func:gen_node_key(tl_ops_constant_health.cache_key.failed, check_service_name, node_id)
    local cur_failed_count, _ = shared:get(key)

    tlog:dbg("tl_ops_health_check_node_failed failed key= " , key ,", cur_failed_count=" , cur_failed_count)

    if not cur_failed_count then
        cur_failed_count = 1
        local ok, _ = shared:set(key, cur_failed_count)
        if not ok then 
            tlog:err("tl_ops_health_check_node_failed failed to set node check_failed_max_count key: " , key)
        end
    else
        cur_failed_count = cur_failed_count + 1
        local ok, _ = shared:incr(key, 1)
        if not ok then
            tlog:err("tl_ops_health_check_node_failed failed to incr node check_failed_max_count key: "  , key)
        end
    end

    -- 心跳包失败后,重置之前有过累计的成功次数
    if cur_failed_count == 1 then
        key = tl_ops_utils_func:gen_node_key(tl_ops_constant_health.cache_key.success, check_service_name, node_id)
        local succ, _ = shared:get(key)

        tlog:dbg("tl_ops_health_check_node_failed success key= " , key ,", succ_count=" , succ)

        if not succ or succ == 0 then
            tlog:err("tl_ops_health_check_node_failed failed to get node check_success_max_count key: " , key , " or check_success_max_count = 0")
        else
            local ok, _ = shared:set(key, 0)
            if not ok then
                tlog:err("tl_ops_health_check_node_failed failed to set node check_success_max_count key: " .. key)
            end
        end
    end

    -- 该机器当前状态:在线 && 心跳包失败次数 > 配置的次数,将shareDict中该机器的状态置为下线,
    -- {tl_ops_health_check_donw_state:resin-site0:true}
    if node.state and cur_failed_count > check_failed_max_count then
        local name =  node.ip .. ":" .. node.port

        tlog:dbg("tl_ops_health_check_node_failed failed count > max failed count , cur_failed_count=",cur_failed_count, ",ip=" .. node.ip .. ":" .. node.port) 

        key = tl_ops_utils_func:gen_node_key(tl_ops_constant_health.cache_key.state, check_service_name, node_id)
        local ok, _ = shared:set(key, nil)
        if not ok then
            tlog:err("tl_ops_health_check_node_failed failed to set node down state:", _)
        end
        node.state = false

        conf.service_version = tl_ops_health_check_version.incr_service_version(check_service_name);

        tlog:dbg("tl_ops_health_check_node_failed conf.service_version=" , conf.service_version)
    end

    tlog:dbg("tl_ops_health_check_node_failed end ,node=" , node)

end