# -*- coding: utf-8 -*- import os import socket import logging import requests import json from datetime import datetime from alibabacloud_alimt20181012.client import Client as alimt20181012Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_alimt20181012 import models as alimt_20181012_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient logging.basicConfig( level=logging.INFO, # 设置日志级别为 INFO filename='get_translate_usage.log', format='%(asctime)s - %(levelname)s - %(message)s' # 设置日志格式 ) def create_client(access_key_id, access_key_secret): config = open_api_models.Config( access_key_id=access_key_id, access_key_secret=access_key_secret ) config.endpoint = f'mt.aliyuncs.com' return alimt20181012Client(config) def getdayvalue(access_key_id, access_key_secret, date): # 实例化一个客户端 client = create_client(access_key_id, access_key_secret) begin_time = date + " 00:00:00" end_time = date + " 23:59:59" # 请求构造器 get_translate_report_request = alimt_20181012_models.GetTranslateReportRequest( begin_time=begin_time, end_time=end_time, api_name='translate_standard', group='day' ) runtime = util_models.RuntimeOptions() try: response = client.get_translate_report_with_options(get_translate_report_request, runtime) result = response.body.data.get("result") count = 0 for i in result: count = count + i.get("total") return count except Exception as error: print(error.message) def tongji(access_key_id, access_key_secret): current_date = datetime.now() date = current_date.strftime('%Y-%m') day = current_date.strftime('%d') all_count = 0 today_count = 0 for i in range(int(day) + 1): if i != 0: if i < 10: i = "0" + str(i) datex = date + "-" + str(i) count = getdayvalue(access_key_id, access_key_secret, datex) all_count = all_count + count today_count = count return all_count, today_count def send_dingtalk_alert(webhook_url, message): headers = {'Content-Type': 'application/json'} data = { 'msgtype': 'text', 'text': { 'content': message } } response = requests.post(webhook_url, headers=headers, data=json.dumps(data)) if response.status_code == 200: logging.info('钉钉报警发送成功') else: logging.error('钉钉报警发送失败') if __name__ == '__main__': all_count, today_count = tongji(os.getenv('ACCESS_KEY_ID'), os.getenv('ACCESS_KEY_SECRET')) edu = 1000000 shengyu = edu - all_count hostname = socket.gethostname() webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=51af3eaab5c834d91ff1b73741fbca9cbcce669e50bd9673dde8c3490eb6c8ee' if shengyu > 10000: message = f"本月截止到今天,总共翻译了{all_count}个词汇,今天使用了{today_count},剩余{shengyu}个额度,无需担心,请放心使用,本条消息来自{hostname}" logging.info(message) send_dingtalk_alert(webhook_url, message) else: message = f"本月截止到今天,总共翻译了{all_count}个词汇,今天使用了{today_count},剩余{shengyu}个额度,请及时购买资源包,本条消息来自{hostname}" logging.info(message) send_dingtalk_alert(webhook_url, message)