耗时告警插件

请求耗时告警主要用来对一些异常请求进行监控,如果请求时间过长,周期内大量请求超时等等。实现思路主要是依赖一个生产消费队列组。

队列组

因为告警插件生命周期为log_by_lua阶段,虽然此阶段是异步执行,但是考虑到可能在请求量较大的情况下,每次都重复执行一些逻辑,可能对worker有一定压力,于是考虑将这些重复逻辑组合成 消息 格式,周期性的去消费。

在设计上,我将队列长度,队列数量都做成配置化,以方便适应不同场景,配置需要依赖实际情况来做权衡设置。

默认最大队列数量 = 10,默认单个队列最大消息长度 = 100。也就是说,在单个消息体大小为1kb的情况下,shared共享内存中,最多占用 1kb * 10 * 100 ~= 1M

注意

队列的实现是循环覆盖,也就是说如果全部队列满了,或者消费速度太慢,消息将会从头开始覆盖,从而丢失部分告警消息。

所以具体的数值配置需要对线上机器环境情况进行权衡来设定,在保证性能的情况下,减少消息的丢失。

告警信息生产

对于告警消息的生产,是每一个请求都会产生的,在触发告警规则后,在插件的 tl_ops_process_after_init_log 阶段,每次都向合适的队列中去插入一条告警消息。

合适的队列是由写入指针不断偏移来决定的,实现方式如下

# 代码位置 : plugins/tl_ops_time_alert/time_alert.lua

-- 定时告警数据生产
local tl_ops_time_alert_produce = function(ctx, option, alert_mode)

    -- 告警缓冲组cache_key前缀
    local list_cache_key = time_alert_constant.cache_key.list
    -- 告警消息组生产指针
    local produce_cache_key = time_alert_constant.cache_key.produce
    -- 单个缓冲组告警消息最大长度
    local max_list_len = time_alert_constant.max_list_len
    -- 最多缓冲组数量
    local max_list_count = time_alert_constant.max_list_count

    -- 当前告警消息组生产指针
    local produce_count = shared:get(produce_cache_key)
    if not produce_count then
        local res = shared:set(produce_cache_key, 0)
        if not res then
            tlog:err("tl_ops_time_alert_produce get produce_count nil , init produce_count err, timer exit")
            return
        end

        tlog:dbg("tl_ops_time_alert_produce get produce_count nil , init produce_count = 0")
        produce_count = 0
    end

    produce_count = tonumber(produce_count)

    -- 当前缓冲组的长度
    local len = shared:llen(list_cache_key ..  produce_count)
    if len == nil then
        tlog:err("tl_ops_time_alert_produce get len nil err")
        return
    end

    -- 当前缓冲组还没満,写入告警消息
    if len + 1 <= max_list_len then
        local content = time_alert_content.time_alert_content_wrap(ctx, option, alert_mode)
        shared:rpush(list_cache_key .. produce_count, content)

        tlog:dbg("tl_ops_time_alert_produce rpush done ,key=",list_cache_key .. produce_count)
        return
    end

    -- 缓冲组满了 && 当前生产组指针到达最大,从头开始覆盖
    if produce_count == max_list_count then
        local res, _ = shared:delete(list_cache_key ..  0)
        if not res then
            tlog:err("tl_ops_time_alert_produce list full ! reset 0 err, err=",_)
            return
        end

        produce_count = -1
    end

    -- 更新生产指针
    local new_produce_count = math.min(max_list_count, produce_count + 1)
    local res, _ = shared:set(produce_cache_key, new_produce_count)
    if not res then
        tlog:err("tl_ops_time_alert_produce incr produce_count err, err=",_)
        return 
    end

    -- 放入新的缓冲组
    local content = time_alert_content.time_alert_content_wrap(ctx, option, alert_mode)
    shared:rpush(list_cache_key ..  new_produce_count, content)

    return
end

告警信息消费

对于告警消息的消费,是依赖定时任务去消费,在配置的周期内执行消费逻辑,一次消费一个合适的队列。

合适的队列是由消费指针不断偏移来决定的,实现方式如下

# 代码位置 : plugins/tl_ops_time_alert/time_alert.lua

-- 定时告警数据消费
local tl_ops_time_alert_consume = function( )

    -- 告警缓冲组cache_key前缀
    local list_cache_key = time_alert_constant.cache_key.list
    -- 告警消息组消费指针
    local consume_cache_key = time_alert_constant.cache_key.consume
    -- 单个缓冲组告警消息最大长度
    local max_list_len = time_alert_constant.max_list_len
    -- 最多缓冲组数量
    local max_list_count = time_alert_constant.max_list_count

    -- 当前消费指针值
    local consume_count = shared:get(consume_cache_key)
    if not consume_count then
        local res = shared:set(consume_cache_key, 0)
        if not res then
            tlog:err("tl_ops_time_alert_consume get consume_count nil , init consume_count err, timer exit")
            return
        end

        consume_count = 0
    end

    consume_count = tonumber(consume_count)
    if consume_count < 0 then
        tlog:err("tl_ops_time_alert_consume consume_count err ! consume_count=",list_count)
        return
    end

    -- 当前消费缓冲组的长度
    local len = shared:llen(list_cache_key ..  consume_count)
    if len == nil then
        tlog:err("tl_ops_time_alert_consume get len err")
        return
    end

    -- 当前消费指针队列为空
    if len == 0 then
        local new_consume_count = math.min(max_list_count, consume_count + 1)

        -- 消费指针达到最大,从头开始继续
        if consume_count == max_list_count then
            new_consume_count = 0
        end

        -- 更新指针后跳出
        local res, _ = shared:set(consume_cache_key, new_consume_count)
        if not res then
            tlog:err("tl_ops_time_alert_consume incr consume_count err, err=",_)
            return
        end

        return
    end

    -- 一次消费一个缓冲组
    while true do 
        local content_json = shared:rpop(list_cache_key .. consume_count)
        if not content_json then
            tlog:dbg("tl_ops_time_alert_consume content json nil, ",content_json)
            break
        end

        local content = cjson.decode(content_json)
        if not content then
            tlog:err("tl_ops_time_alert_consume decode content json err")
            break
        end

        -- 写入磁盘/发送邮件
        local option = content.option
        local alert_type = content.alert_type
        local mode = option.mode

        -- 日志
        if mode == ALERT_MODE.log then
            time_alert_log:handler(option, content)
        end

        -- 邮件
        if mode == ALERT_MODE.log then
            time_alert_email:handler(option, content)
        end

    end

    -- 更新消费指针
    local new_consume_count = math.min(max_list_count, consume_count + 1)

    -- 消费指针达到最大,从0组开始覆盖消费
    if consume_count == max_list_count then
        new_consume_count = 0
    end

    local res, _ = shared:set(consume_cache_key, new_consume_count)
    if not res then
        tlog:err("tl_ops_time_alert_consume decri consume_count err, err=",_)
        return 
    end
end

弊端

看完实现和思路,想必大家也看出弊端了,但是总体来说,对于告警信息,对业务影响不是特别重要,只是一种辅助性的功能,所以暂时只做了一个雏形

1 . 在生产时,如果没有设置合理的队列大小和数量,会造成消息丢失。

2 . 消息在内存中,如果重启,消息丢失。

3 . 消费时,如果队列设置长度过长,还是会对性能造成影响

4 . 暂时不支持消费时一次消费的消息数量。