在 Python
的logging
模块中,它不仅提供了基础的日志功能,还拥有一系列高级配置选项来满足复杂应用的日志管理需求。
说到logging
模块的高级配置,必须提及日志分层
、logging.config配置
、日志异步操作
等关键功能。它们每一项都为开发者提供了强大的调试和监控环境,对于构建可维护和高效的日志系统至关重要。
在接下来的三篇logging高级配置
文章中,我将为读者朋友们介绍 Python
的 logging
模块中的三个高级配置的具体应用:日志分层
、logging.config
以及 日志异步操作
,探讨它们如何优化日志处理流程,并提升应用的整体性能。
本文将聚焦于 logging
模块中的日志异步操作
概念,探讨如何通过多线程
、多进程
和 异步IO
等操作去提高在处理大量或复杂的日志时的日志记录的整体效率。
这篇文章反复修改了很久,从15000字修改成现在。
主要难点在于asyncio
实现异步日志这一块,同时包含同步操作和异步操作,而异步操作又包含多个事件循环,是有些难搞的!!
这也属于是 ??日志记录百科全书 的最后一篇新知识啦!
后面可能还会更新两篇不是新知识的关于日志记录
的文章,它们大概是:
模块 | 释义 |
---|---|
logging | Python 的日志记录工具,标准库 |
threading | 多线程 |
ThreadPoolExecutor | 线程池 |
ProcessPoolExecutor | 进程池 |
asyncio | 异步编程模块 |
aiohttp | 异步网络请求模块 |
flask | 轻便web框架 |
一些概念:
概念 | 解释 |
---|---|
并行处理(多线程/多进程) | 同一时间,多个线程或进程分别执行不同的任务 |
异步处理(如 asyncio ) | 同一时间,单个线程或进程轮流执行多个任务,根据任务状态交替进行。 |
用通俗易懂的语言解释就是:
异步日志操作指的是在应用程序的主执行流程之外处理日志记录的过程。这通常通过在后台线程或进程中执行日志记录任务来实现,从而避免阻塞主程序的运行。异步日志操作有助于提高应用程序的性能,特别是在高负载环境中(如 Web 服务器、大数据处理系统和高度并发的应用程序),因为它可以减少由于日志记录而导致的主线程阻塞。
在Python中,有几种方式可以实现异步日志:
threading
模块)来处理日志任务。multiprocessing
模块)来避免线程间的竞争条件。asyncio
模块来实现日志记录。下面我将分别展示使用多线程
、多进程
和 异步IO
实现异步日志记录的完整代码示例。
多线程
和多进程
是并行的,它们并不是严格意义上的异步任务!只有asyncio
才可以实现真正的异步任务。
多线程和多进程都是非严格意义上的异步处理!!!
其中日志记录任务从主线程
&主进程
被分离出来,由另一个线程
&进程
并行处理,但它的本质还是同步的。
但也正因为它们是并行的,所以在处理复杂或资源密集型的日志记录场景时候,它们也都是非常合适的。
在 Python 中,多线程、多进程和异步IO(基于 asyncio
)是实现异步编程(非严格意义)的三种主要方法。它们各自适用于不同的场景,并且有各自的特点和优势。理解这些差异有助于在特定应用中选择最合适的并发策略。
对于大多数情况下的日志记录:
logging
模块兼容性良好;通常,对于日志记录,选择简单、易于维护的并发模式更为重要,因为日志记录本身通常不应该成为应用程序逻辑的主要瓶颈。
简单总结如下表:
方法 | 实现难度 | 应用场景 |
---|---|---|
多线程 | ? | I/O密集型任务 |
多进程 | ?? | CPU密集型任务 |
Asyncio异步 | ????? | I/O密集型任务和异步应用 |
在选择日志记录方法时,应考虑项目的具体需求、技术栈以及团队的技术能力。每种方法都有其适用的场景,选择合适的可以提高日志处理的效率和性能。
????在异步日志的编程中,如果没有显式的对队列(
Queue
)进行一些操作,那么队列(Queue
)在这里,起到的作为几乎为零。
在实现日志异步操作时,无论是使用 多线程
、多进程
还是 异步IO
,都可以选择使用 logging
+ 队列(Queue)
来实现。只是它们之间所使用的 队列(Queue)
都是不同类型的,这是因为它们的运行环境和机制不同:
queue.Queue
,Python标准库, 它是线程安全的,适用于在多线程环境中共享数据,确保数据的一致性和安全性;multiprocessing.Queue
,专门为多进程设计,能够在不同进程中安全地传递消息;asyncio.Queue
,为asyncio
异步编程设计,支持在异步函数中使用。使用队列的好处:
多线程
、多进程
还是 异步IO
环境下,都能提高日志记录的效率和程序的整体性能。优点 | 阐述 |
---|---|
减少主线程的阻塞 | 对于一些耗时的日志操作,如将日志发送到远程服务器,将其放入子线程中运行可以减少对主线程的阻塞,提高应用程序的响应速度和性能。 |
充分利用资源 | 在多线程环境中,可以更有效地利用多核处理器的能力。通过在后台线程中处理日志,可以平滑化资源使用,避免主线程在执行耗时操作时的性能瓶颈。 |
高度定制化的日志处理 | 异步日志记录可以实现高度定制化的日志处理,如关键词捕获和触发报警功能,有助于提高整体性能。 |
生产者-消费者解耦 | 使用队列实现了日志生产者(应用程序的各个部分生成日志)和消费者(负责处理日志的线程或进程)之间的解耦,提高了系统的灵活性和可扩展性。 |
虽然 logging
模块的标准处理器已经非常强大,但在特定场景下,使用自定义的异步日志处理可以提供更多的灵活性和性能优势。这在处理大量日志、需要高度定制化处理或希望最小化对主应用性能影响的情况下尤其有用。所以这个时候使用上队列(Queue
),效果是非常好的。
通过实现单例模式,可以确保整个应用中只有一个日志队列和一个后台线程(只有一个异步日志记录实例),在实现异步日志程序时,使用单例模式是非常合适的。
单例模式确保一个类只有一个实例,并提供一个全局访问点。通常我们只需要一个日志系统实例来收集和分发整个应用程序的日志信息。
使用单例模式的优势包括:
下面三份异步日志代码中,都使用了一个自定义日志处理器-->AsyncLogHandler
,每一份都是处理100条日志消息,用于测试。
同步执行了日志记录,
异步通过多线程、多进程或asyncio异步将日志信息上传到服务器;
通篇下来,它们实现的功能是一致,但速度上,有所不同,如下表所示:
类型 | 耗时/s |
---|---|
多线程 | 2.8 |
多进程 | 3.5 |
asyncio异步 | 0.71 |
延伸一下,如果如果日志记录数量剧增,那么:
Flask
搭一个服务端,用于测试http://127.0.0.1:5000/submit
# -*- coding: utf-8 -*-
import time
from flask import Flask, jsonify
app = Flask(__name__)
@app.route("/submit", methods=["POST"])
def api_data():
data = {"status": "success"}
time.sleep(.5)
return jsonify(data)
if __name__ == "__main__":
app.run()
通过合理的设计代码,有效地结合了单独的监视线程和线程池的并发处理能力,适合于尤其是涉及耗时操作(如网络请求)的日志系统。这样的设计允许主程序继续运行而不被日志处理操作阻塞,同时确保日志消息能够及时且高效地被处理。
下面二者结合,将 _log_worker
放在单独的线程中运行可以避免阻塞主线程,并允许持续和实时地处理队列中的日志消息。
self.thread
负责运行_log_worker方法,
executor
负责处理耗时任务(upload_log
)
# -*- coding: utf-8 -*-
# @Author : Frica01
# @Time : 2023-12-12 20:43
# @Name : async_logging.py
import logging
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import requests
log_format = '%(levelname)-7s - %(asctime)s - [%(filename)s:%(funcName)s:%(lineno)d] - %(message)s'
class AsyncLogHandler(logging.Handler):
"""
异步日志处理器,将日志消息发送到队列中。
Attributes:
log_queue (queue.Queue): 存储日志消息的队列。
"""
def __init__(self, log_queue: queue.Queue):
super().__init__(level=logging.DEBUG)
self.log_queue = log_queue
def emit(self, record):
"""
将格式化的日志记录放入队列中。
Args:
record (logging.LogRecord): 日志记录对象。
"""
log_message = self.format(record)
self.log_queue.put(log_message)
class AsyncLogger:
"""
异步日志记录器,用于处理日志消息的异步记录。
该类实现了单例模式,确保整个应用中只有一个日志队列和处理线程。
Attributes:
log_queue (queue.Queue): 用于存储日志消息的队列。
thread (threading.Thread): 用于处理队列中日志消息的后台线程。
executor (ThreadPoolExecutor): 用于处理队列中日志消息的后台线程池。
"""
_instance = None
def __new__(cls):
"""创建或获取 AsyncLogger 的单例实例。"""
if cls._instance is None:
cls._instance = super(AsyncLogger, cls).__new__(cls)
cls._instance._init()
return cls._instance
def _init(self):
"""初始化日志队列和启动日志处理线程。"""
self.log_queue = queue.Queue()
self.executor = ThreadPoolExecutor(max_workers=10) # 创建线程池
self._start_log_worker()
def _start_log_worker(self):
"""启动用于处理日志消息的后台线程。"""
self.thread = threading.Thread(target=self._log_worker, daemon=True)
self.thread.start()
def _log_worker(self):
"""后台线程的工作函数,从队列中获取并打印日志消息。"""
while True:
try:
log_message = self.log_queue.get_nowait()
if log_message == "STOP":
break
self.executor.submit(self.upload_log, log_message)
# res = requests.post('http://127.0.0.1:5000/submit', data={'data': log_message})
# print(res.json())
except queue.Empty:
# 处理队列为空的情况, 选择暂停一段时间
time.sleep(.1)
@staticmethod
def upload_log(log_message):
# 这里是上传日志的操作
res = requests.post('http://127.0.0.1:5000/submit', data={'data': log_message})
print(res.json())
def setup_logger(self, name=None, level=logging.INFO, filename=None):
"""设置并返回具有指定名称和级别的日志记录器。
Args:
name (str): 日志记录器的名称。
level (int): 日志级别。
filename (str): 日志文件的名称。
Returns:
logging.Logger: 配置后的日志记录器。
"""
logger = logging.getLogger(name)
logger.setLevel(level)
# # 避免日志消息被传播到root logger
logger.propagate = False
# 文件处理器
if filename:
file_handler = logging.FileHandler(filename)
file_handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(file_handler)
# 控制台处理器
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(stream_handler)
# 用于处于耗时的处理器
async_handler = AsyncLogHandler(self.log_queue)
async_handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(async_handler)
return logger
@property
def is_queue_empty(self):
"""
检查日志队列是否为空。
Returns:
bool: 如果队列为空返回 True,否则返回 False。
"""
return self.log_queue.empty()
def stop_logging(self):
"""发送停止信号并等待日志处理线程结束。"""
self.log_queue.put("STOP")
self.thread.join()
self.executor.shutdown(wait=True) # 关闭线程池
# -*- coding: utf-8 -*-
import time
from async_logging import AsyncLogger
def main():
# 创建AsyncLogger实例
async_logger = AsyncLogger()
# 创建日志记录器,确保指定的级别允许记录日志消息
logger = async_logger.setup_logger(
'main',
level=10, # 即 logging.DEBUG
# filename='app.log'
)
# 使用日志记录器
for i in range(100):
logger.info('This is an info message')
while not async_logger.is_queue_empty:
time.sleep(0.1)
if __name__ == '__main__':
main()
值得一提的是,这份代码仅仅是修改上面的 多线程日志记录
的一个方法。
但是它们之间的应用场景有所不同,特别是在涉及到CPU密集型的日志处理任务时。
CPU密集型任务:对于CPU密集型的日志处理任务,如需要进行复杂计算或处理的日志消息,使用进程池可以提高性能。
避免GIL限制:由于GIL的存在,Python的线程并不能真正并行执行计算密集型任务。使用进程池可以绕过GIL,实现真正的并行处理。
from concurrent.futures import ProcessPoolExecutor
# 其他代码保持不变
class AsyncLogger:
# 省略其他部分
def _init(self):
"""初始化日志队列和启动日志处理进程池。"""
self.log_queue = queue.Queue()
self.executor = ProcessPoolExecutor(max_workers=10) # 使用进程池
# 省略其他部分
# -*- coding: utf-8 -*-
# @Author : Frica01
# @Time : 2023-12-12 20:43
# @Name : async_logging.py
import asyncio
import logging
import threading
import aiohttp
log_format = '%(levelname)-7s - %(asctime)s - [%(filename)s:%(funcName)s:%(lineno)d] - %(message)s'
class AsyncLogHandler(logging.Handler):
"""
异步日志处理器,将日志消息发送到队列中。
Attributes:
log_queue (asyncio.Queue): 存储日志消息的队列。
"""
def __init__(self, log_queue: asyncio.Queue):
super().__init__(level=logging.DEBUG)
self.log_queue = log_queue
def emit(self, record):
"""
将格式化的日志记录放入队列中。
Args:
record (logging.LogRecord): 日志记录对象。
"""
log_message = self.format(record)
self.log_queue.put_nowait(log_message)
class AsyncLogger:
"""
异步日志记录器,用于处理日志消息的异步记录。
该类实现了单例模式,确保整个应用中只有一个日志队列和处理线程。
Attributes:
log_queue (queue.Queue): 用于存储日志消息的队列。
loop (asyncio.new_event_loop): 事件循环
thread (threading.Thread): 用于处理队列中日志消息的后台线程。
tasks (list): 队列列表
"""
_instance = None
def __new__(cls):
"""创建或获取 AsyncLogger 的单例实例。"""
if cls._instance is None:
cls._instance = super(AsyncLogger, cls).__new__(cls)
cls._instance._init()
return cls._instance
def _init(self):
"""初始化日志队列和启动日志处理线程。"""
self.log_queue = asyncio.Queue()
self.loop = asyncio.new_event_loop() # 创建新的异步事件循环
self.thread = threading.Thread(target=self._start_log_worker, args=(self.loop,), daemon=True)
self.thread.start()
self.tasks = list()
def _start_log_worker(self, loop):
"""启动用于处理日志消息的后台线程的事件循环"""
asyncio.set_event_loop(loop) # 将当前线程的事件循环设置为 loop
loop.run_until_complete(self._log_worker()) # 启动事件循环,并运行 _log_worker
async def _log_worker(self):
"""后台线程的工作函数,从队列中获取并打印日志消息。"""
async with aiohttp.ClientSession() as session:
while True:
log_message = await self.log_queue.get()
if log_message == "STOP":
break
task = asyncio.create_task(self.upload_log(session, log_message))
self.tasks.append(task)
await asyncio.gather(*self.tasks)
@staticmethod
async def upload_log(session, log_message):
try:
async with session.post(
'http://127.0.0.1:5000/submit', data={'data': log_message}
) as response:
print(f"Log uploaded: {await response.json()}")
except Exception as e:
print(f"Failed to send log: {e}")
def setup_logger(self, name=None, level=logging.INFO, filename=None):
"""设置并返回具有指定名称和级别的日志记录器。
Args:
name (str): 日志记录器的名称。
level (int): 日志级别。
filename (str): 日志文件的名称。
Returns:
logging.Logger: 配置后的日志记录器。
"""
logger = logging.getLogger(name)
logger.setLevel(level)
logger.propagate = False
# 文件处理器
if filename:
file_handler = logging.FileHandler(filename)
file_handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(file_handler)
# 控制台处理器
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(stream_handler)
# 用于处于耗时的处理器
async_handler = AsyncLogHandler(self.log_queue)
async_handler.setFormatter(logging.Formatter(log_format))
logger.addHandler(async_handler)
return logger
@property
def is_queue_empty(self):
"""
检查日志队列是否为空。
Returns:
bool: 如果队列为空返回 True,否则返回 False。
"""
return self.log_queue.empty()
def stop_logging(self):
"""发送停止信号并等待日志处理线程结束和结束事件循环"""
asyncio.run_coroutine_threadsafe(self.log_queue.put("STOP"), self.loop)
self.thread.join()
self.loop.close()
# -*- coding: utf-8 -*-
import asyncio
from async_logging import AsyncLogger
async def main():
async_logger = AsyncLogger()
logger = async_logger.setup_logger(
'main',
level=10, # 即 logging.DEBUG
filename='app.log'
)
for i in range(100):
logger.info('This is an info message')
async_logger.stop_logging()
if __name__ == '__main__':
asyncio.run(main())
上面三份代码分别展示了使用多线程、多进程和异步asyncio
进行异步日志处理的不同方法。每种方法都有其独特的应用场景和优势,根据项目的具体需求和环境可以选择合适的一种。以下是对它们的简单梳理:
本文详尽地介绍了 Python 中 logging
模块的高级配置,特别是关于日志异步操作的实现。
文章深入探讨了多线程、多进程和异步IO(基于 asyncio
)在日志记录中的应用,提供了实际代码示例,并清晰地解释了每种方法的应用场景和优势。
每种方法都有其特定的应用场景和优势。选择合适的日志记录方法取决于应用的具体需求,如性能要求、并发级别以及日志任务的性质(CPU 密集型或 I/O 密集型)。
总的来说,这篇文章为开发人员提供了一个全面的指南,以选择和实现最适合应用的异步日志记录方法。
本次分享到此结束,
see you~🚀🚀