将日志logging写入mysql
import logging
import pymysql
from pymysql import MySQLError
class MySQLHandler(logging.Handler):
def __init__(self):
logging.Handler.__init__(self)
self.connection = pymysql.connect(
host='localhost', port=3306, database='log',
user='root', password='xxxx' )
self.table = 't_logs'
self.cursor = self.connection.cursor()
self.create_table_if_not_exists()
def create_table_if_not_exists(self):
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {self.table} (
id INT AUTO_INCREMENT PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
logger_name VARCHAR(255),
log_level VARCHAR(50),
message TEXT
)
"""
self.cursor.execute(create_table_query)
self.connection.commit()
def emit(self, record):
if self.connection:
try:
insert_query = f"""
INSERT INTO {self.table} (logger_name, log_level, message)
VALUES (%s, %s, %s)
"""
self.cursor.execute(insert_query, (record.name, record.levelname, record.msg))
self.connection.commit()
except MySQLError as e:
print(f"Error inserting log into MySQL: {e}")
else:
print("MySQL connection is closed")
def close(self):
if self.connection:
self.cursor.close()
self.connection.close()
logging.Handler.close(self)
if __name__=='__main__':
# 配置日志记录器
logger = logging.getLogger('my_logger')
logger.setLevel(logging.DEBUG)
# 添加自定义的 MySQL 处理器
mysql_handler = MySQLHandler()
#mysql_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(mysql_handler)
# 输出不同级别的日志
logger.debug('这是一个调试信息')
logger.info('这是一个信息')
logger.warning('这是一个警告')
logger.error('这是一个错误')
logger.critical('这是一个严重错误')
评论区