上篇传送门:

对于大型项目来说,虽然在运维那边通过Prometheus通过频率控制定时拉取项目,但是后端这边仍然需要进行基于本地缓存的聚合上传。


在官方文档的性能考虑中,提到这么一句话:

labels()的结果应该是可缓存的。带有标签的指标的并行映射往往相对较慢。而不带标签的默认指标则查找性能更高。labels在执行inc()/dec()/set()等时应避免阻塞,因为在进行拉取时,整个应用程序都会被阻塞。

也就是说,在之前的设计中。对于每次拦截的请求都做一次labels提交操作是非常影响性能的。
本地聚合上传的思路就是:在拦截非拉取端口的请求时,对所有的累加操作都只在本地进行。只有当走拉取端口的请求时,才通过labels.set()方法将缓存结果打上去。

代码如下:
PrometheusSubmitFilter

/**
 * Prometheus提交过滤器
 *
 * @author CaiTianXin
 * @date 2021/03/01
 */
public class PrometheusSubmitFilter implements Filter {

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        HttpServletRequest req = (HttpServletRequest) request;
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
        String requestUri = req.getRequestURI();
        String time = simpleDateFormat.format(new Date());
        // 如果访问的是该暴露的页面才提交
        if (requestUri.contains("/actuator/prometheus")) {
            MyMetrics.submit(time);
            // 过滤静态文件
        }else if (!requestUri.contains("/static/")) {
            String ip = GetIpAddressUtil.getIpAddr(req);
            MyMetrics.increase(ip, time);
        }
        chain.doFilter(request, response);
    }
}

InitPrometheus

/**
 * init普罗米修斯
 *
 * @author CaiTianXin
 * @date 2021/03/01
 */
@Component
public class InitPrometheus implements ApplicationListener<ContextRefreshedEvent> {

    @Resource
    PrometheusMeterRegistry meterRegistry;

    /**
     * 将Metrics标签注册到暴露给Prometheus监听的端口上
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        CollectorRegistry prometheusRegistry = meterRegistry.getPrometheusRegistry();
        prometheusRegistry.register(MyMetrics.baseGauge);
    }
}

MyMetrics

/**
 * 描述:Gauge实现的自定义标签、指标  (Counter没有set()方法)
 * 当Prometheus获取实例的HTTP端点时,客户库发送所有跟踪的度量指标数据到服务器上。而要做的则是在跟踪时做一些缓存。
 * 在时间维度下,对ip、localIp进行缓存处理; 原先的path、method、code维度改从自带的标签http_server_requests_seconds_count中查看。
 *
 * @author CaiTianXin
 * @date 2021/03/01
 */
@Slf4j
public class MyMetrics {
    private static final Logger logger = LoggerFactory.getLogger(MyMetrics.class);

    /**
     * ConcurrentSkipListMap:有序的哈希表,线程安全,通过跳表实现。(key有序、支持高并发)
     * ######
     * ConcurrentSkipListMap<String, ConcurrentSkipListMap<String, Integer>>;
     * ↓
     * timeMap<{time}, ipMap<String, Integer>>>;
     */
    private static Map<Object, Object> map = new ConcurrentSkipListMap<>();

    /**
     * 项目名,采用动态获取避免写死在一个系统
     * 格式:xxx_tps
     */
    public static String projectName;

    /**
     * 通过自定义工具类获取到application.yml上的service.code
     */
    static {
        projectName = GetYmlPropertiesUtil.getCommonYml("service.code") + "_tps";
    }

    /**
     * 指标:{projectName}_tps
     * 标签:ip、time、localIp
     * 格式:antispam_v2_tps{ip="127.0.0.1",time="202103020943",localIp="10.10.10.234",} 2.0
     */
    public static Gauge baseGauge = Gauge.build()
            .name(projectName)
            .labelNames("ip", "time", "localIp")
            .help("Three metric for this label.")
            .register();

    /**
     * 缓存一次累加请求
     *
     * @param time 时间戳
     * @return boolean
     */
    public static synchronized void increase(String ip, String time) {
        ConcurrentSkipListMap timeMap = (ConcurrentSkipListMap)map.get(projectName);
        if (timeMap != null) {
            ConcurrentSkipListMap<String, Integer> ipMap = (ConcurrentSkipListMap)timeMap.get(time);
            if (ipMap != null) {
                tpsIncrease(ip, ipMap);
            } else {
                initIpMap(time, ip, timeMap);
            }
        } else {
            timeMap = new ConcurrentSkipListMap();
            initIpMap(time, ip, timeMap);
            map.put(projectName, timeMap);
        }
    }

    /**
     * tps增加1
     *
     * @param ip    ip
     * @param ipMap ipMap
     */
    private static void tpsIncrease(String ip, ConcurrentSkipListMap<String, Integer> ipMap) {
        Integer tps = ipMap.get(ip);
        if (tps == null) {
            tps = 0;
        }

        tps = tps + 1;
        ipMap.put(ip, tps);
    }

    /**
     * 初始化ipMap
     *
     * @param time    时间戳
     * @param ip      ip
     * @param timeMap tumeMap
     */
    private static void initIpMap(String time, String ip, ConcurrentSkipListMap timeMap) {
        ConcurrentSkipListMap<String, Integer> ipMap = new ConcurrentSkipListMap<>();
        ipMap.put(ip, 1);
        timeMap.put(time, ipMap);
    }

    /**
     * 将时间戳前的缓存提交并清空
     *
     * @param time 时间
     * @return boolean
     */
    public static boolean submit(String time) {
        // 获取到当前ip,如果ip获取失败,该方法直接当成成功返回。
        String localIp = getLocalIp();
        if (localIp == null) {
            return true;
        }

        ConcurrentSkipListMap projMap = (ConcurrentSkipListMap)map.get(projectName);
        if (projMap != null && projMap.size() > 0) {
            while(true) {
                Map.Entry<String, ConcurrentSkipListMap<String, Integer>> timeMap = projMap.floorEntry(time);
                // 这里包含了第一次就拿不到数据以及消费完的情况
                if (timeMap == null) {
                    return true;
                }

                Iterator iterator = ((ConcurrentSkipListMap)timeMap.getValue()).entrySet().iterator();

                while(iterator.hasNext()) {
                    Map.Entry tpsEle = (Map.Entry)iterator.next();
                    String ip = (String)tpsEle.getKey();
                    Integer tps = (Integer)tpsEle.getValue();
                    baseGauge.labels(ip, timeMap.getKey(), localIp).set(tps);
                }
                projMap.remove(timeMap.getKey());
            }
        } else {
            logger.warn("{} no request", time);
            return false;
        }
    }

    /**
     * 通过java.net包获取当前ip
     *
     * @return String
     */
    private static String getLocalIp() {
        String localIp = null;

        try {
            localIp = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException var2) {
            log.error("localip error");
        }

        return localIp;
    }
}



一些思考:

  1. labels的标签名不能带有-,需要转为下划线。
  2. Counter只能进行累加操作,set()方法适用于Guage。
  3. 按照之前方案一修改后的Ip、Local维度都是在时间维度之下,也就是说如果要统计path、method、code的话,要么在时间维度下再封装一层metricMap,里面封装各种指标;要么再分别另取时间缓存的timeMap存放各种指标。
    但问题是path、method、code等指标重复度不大,说白了就是持久化和网络提交的性能比较。
  4. 当前只针对ip、localip、time的话,只对访问次数的累加。
    好处是:可以直接获取到时间范围内某ip的访问次数,而之前存有path、method、code的方案还需要对此进行累加。并且该三个指标可以通过Prometheus的默认指标http_server_requests_seconds_count去获取。
    坏处是:无法分析出时间域下哪个端口请求量大。
Last modification:March 2nd, 2021 at 06:21 pm
喵ฅฅ