""" 日志配置和功能模块 提供应用程序的日志记录功能 """ import os import logging import gzip import shutil import glob from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler import time from typing import Optional, IO, Any # 添加处理ID过滤器 class ProcessIdFilter(logging.Filter): def __init__(self): super().__init__() self.process_id = None def filter(self, record): if not hasattr(record, 'process_id'): if self.process_id is None: # 生成唯一的处理ID self.process_id = int(time.time() * 1000) % 10000 record.process_id = f"PROC-{self.process_id:04d}" return True # 自定义日志处理器,支持压缩 class CompressedRotatingFileHandler(RotatingFileHandler): def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False): super().__init__(filename, mode, maxBytes, backupCount, encoding, delay) def doRollover(self): """ 执行日志滚动时,压缩旧的日志文件 """ # 关闭流 if self.stream: self.stream.close() self.stream = None # type: ignore if self.backupCount > 0: # 移动旧的日志文件 for i in range(self.backupCount - 1, 0, -1): sfn = f"{self.baseFilename}.{i}" dfn = f"{self.baseFilename}.{i + 1}" if os.path.exists(sfn): if os.path.exists(dfn): os.remove(dfn) os.rename(sfn, dfn) dfn = f"{self.baseFilename}.1" if os.path.exists(dfn): os.remove(dfn) # 压缩当前日志文件 try: with open(self.baseFilename, 'rb') as f_in: with gzip.open(f"{dfn}.gz", 'wb') as f_out: shutil.copyfileobj(f_in, f_out) except Exception: # 如果压缩失败,回退到普通复制 shutil.copy2(self.baseFilename, dfn) # 重新打开流 if not self.delay: self.stream = self._open() # 自定义TimedRotatingFileHandler,支持压缩 class CompressedTimedRotatingFileHandler(TimedRotatingFileHandler): def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None): super().__init__(filename, when, interval, backupCount, encoding, delay, utc, atTime) def doRollover(self): """ 执行日志滚动时,压缩旧的日志文件 """ # 执行标准的滚动 super().doRollover() # 查找最新创建的备份文件并压缩 backup_files = glob.glob(f"{self.baseFilename}.*") for backup_file in backup_files: if not backup_file.endswith('.gz') and os.path.isfile(backup_file): try: with open(backup_file, 'rb') as f_in: with gzip.open(f"{backup_file}.gz", 'wb') as f_out: shutil.copyfileobj(f_in, f_out) os.remove(backup_file) # 删除原始未压缩文件 except Exception: # 压缩失败时不做处理,保留原始文件 pass def clean_old_logs(log_folder, max_days=30): """ 清理超过指定天数的日志文件 参数: log_folder: 日志目录 max_days: 保留的最大天数 """ try: current_time = time.time() max_age = max_days * 86400 # 转换为秒 for file in os.listdir(log_folder): file_path = os.path.join(log_folder, file) if os.path.isfile(file_path) and file.startswith('word_processor.log.'): file_age = current_time - os.path.getmtime(file_path) if file_age > max_age: os.remove(file_path) except Exception as e: # 清理过程中的错误不应影响主程序 pass def setup_logger(log_folder='logs', log_level=logging.INFO, max_size_mb=10, backup_count=30): """ 配置日志记录器 参数: log_folder: 日志保存目录 log_level: 日志级别 max_size_mb: 单个日志文件的最大大小(MB) backup_count: 保留的备份文件数量 返回: logger: 配置好的日志记录器 """ # 确保日志目录存在 os.makedirs(log_folder, exist_ok=True) # 创建日志记录器 logger = logging.getLogger('word_processor') logger.setLevel(log_level) # 如果已经有处理器,则不再添加 if logger.handlers: return logger # 日志格式 - 优化格式,添加处理ID以便跟踪单次操作 log_format = logging.Formatter( '%(asctime)s [%(levelname)s] [%(process_id)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) # 创建处理ID过滤器实例 process_id_filter = ProcessIdFilter() # 文件处理器 - 同时基于大小和时间滚动 log_file = os.path.join(log_folder, 'word_processor.log') # 基于大小的处理器 size_handler = CompressedRotatingFileHandler( log_file, maxBytes=max_size_mb * 1024 * 1024, # 转换为字节 backupCount=5, # 保留5个基于大小的备份 encoding='utf-8' ) size_handler.setFormatter(log_format) size_handler.setLevel(log_level) size_handler.addFilter(process_id_filter) # 基于时间的处理器 time_handler = CompressedTimedRotatingFileHandler( log_file, when='midnight', interval=1, backupCount=backup_count, # 保留指定天数的日志 encoding='utf-8' ) time_handler.setFormatter(log_format) time_handler.setLevel(log_level) time_handler.addFilter(process_id_filter) # 控制台处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(log_format) console_handler.setLevel(log_level) console_handler.addFilter(process_id_filter) # 添加处理器到记录器 logger.addHandler(size_handler) logger.addHandler(time_handler) logger.addHandler(console_handler) # 清理旧日志 clean_old_logs(log_folder, backup_count) return logger # 初始化日志记录器 - 可以根据环境设置不同的级别 # 生产环境推荐 logging.INFO 或 logging.WARNING # 开发环境可以使用 logging.DEBUG import os env = os.environ.get('FLASK_ENV', 'production') log_level = logging.INFO if env == 'production' else logging.DEBUG max_size_mb = 10 # 10MB backup_count = 30 # 30天 logger = setup_logger(log_level=log_level, max_size_mb=max_size_mb, backup_count=backup_count) # 重置处理ID,用于跟踪新的请求 def reset_process_id(): for handler in logger.handlers: for filter_obj in handler.filters: if isinstance(filter_obj, ProcessIdFilter): filter_obj.process_id = None