#!/usr/bin/python # -*- coding: utf8 -*- from bs4 import BeautifulSoup import requests from os import sep import time, json class omg_action_lag(): kafka_url = 'http://cmak.bitea.one' def get_consumer(self, cluster, topic): topic_url = self.kafka_url + sep + 'clusters' + sep + cluster + sep + 'topics' + sep + topic html_content = requests.get(topic_url) soup = BeautifulSoup(html_content.text) all_link = soup.find_all('a', href=True) consumer_dict = {} for i in all_link: itopic = i.text ihref = i['href'] if "consumer" in ihref and topic in ihref: consumer_dict[itopic] = ihref return consumer_dict def get_consumer_lag(self, cluster, topic, lagout): consumer_dict = self.get_consumer(cluster, topic) for consumer, link in consumer_dict.items(): consumer_url = self.kafka_url + link consumer_content = requests.get(consumer_url) soup = BeautifulSoup(consumer_content.text) if "console" in consumer or "test" in consumer: pass else: all_td = soup.find_all('td') for i in range(len(all_td)): if 'Total Lag' in all_td[i]: lag_tmp = all_td[i + 1] lag_temp = str(lag_tmp.text) lag = int(lag_temp.replace(',', '')) lagout = int(lagout) # 给过期超过设定值的consumer的lag设置为0 if lag > lagout: lag = 0 ts = int(time.time()) tag = topic + "_lag" endpoint = topic + "_lag" payload = [] payload.append( { "endpoint": endpoint, "metric": tag, "timestamp": ts, "step": 30, "value": lag, "counterType": "GAUGE", "tags": "%s_status=%s,type=%s" % (topic, consumer, "TC"), } ) r = requests.post("http://falcon.bitea.one:1988/v1/push", data=json.dumps(payload)) if __name__ == '__main__': # 检查参数,可以配置多个集群及话题,增加一个参数,忽略掉不消费的消费者打点 ck_team = ( ("nft-kafka", "collection_events", "100000"),("nft-kafka","raw_action_nft","100000")) for i in ck_team: cluster = i[0] topic = i[1] lagout = i[2] g = omg_action_lag() g.get_consumer_lag(cluster, topic, lagout)