Python基础——logging模块详解

logging简介

logging是python的内置库,主要用于进行格式化内容输出,可将格式化内容输出到文件,也可输出到屏幕。我们在开发过程中常用print函数来进行调试,但是实际应用部署时我们要将日志的信息要输出到文件中,方便后续查找以及备份。在我们使用日志管理时,我们还可以将日志格式化成json对象转存到ELK中方便图形化查看及管理。前面说的这些,我们都可以通过logging所包含的功能以及提供扩展的方式来完成。

logging工作流程

这是从网上查到一个关于python logging模块的工作流程图,非常遗憾没有找到出处。

可以看到图中,几个关键的类、

Logger

用于记录日志的对象。

通过流程图可以看到

判断是否enabled,实质就是看记录的level(logger.info,logger.debug等)和当前logger对象设置的level是否满足(比如logger设置的lever是Info,记录时使用的logger.debug,那么就会不满足,所以不会记录日志)

查看logger的过滤器是否满足。filter通过之后,交给logger的handler来记录日志,一个logger是可以设置多个handler。

交给Handlers实际记录日志

注:在整个应用中可以有多个logger,使用logging.getLogger时通过指定name来获取对象,实际logging中还存在一个Manager类,由Manager来进行多logger的单例模式管理。

Handler

用于记录日志到具体的文件或者输出流或其他的管道。

查看记录日志是否满足过滤器满足过滤器,按照设置的Formatter生成字符串将内容写入到具体的文件或者输出流

不同的Handler可能有不同的处理,但是底层原理还是做这三件事情。

Filter

用于过滤用户记录日志时,是否满足记录标准

Formatter

用于格式化,输出的字符串格式

这是原生自带的格式

%(name)s Name of the logger (logging channel) %(levelno)s Numeric logging level for the message (DEBUG, INFO, WARNING, ERROR, CRITICAL) %(levelname)s Text logging level for the message ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL") %(pathname)s Full pathname of the source file where the logging call was issued (if available) %(filename)s Filename portion of pathname %(module)s Module (name portion of filename) %(lineno)d Source line number where the logging call was issued (if available) %(funcName)s Function name %(created)f Time when the LogRecord was created (time.time() return value) %(asctime)s Textual time when the LogRecord was created %(msecs)d Millisecond portion of the creation time %(relativeCreated)d Time in milliseconds when the LogRecord was created, relative to the time the logging module was loaded (typically at application startup time) %(thread)d Thread ID (if available) %(threadName)s Thread name (if available) %(process)d Process ID (if available) %(message)s The result of record.getMessage(), computed just as the record is emitted

LogRecord

我们每一次的 logger.info, logger.debug, logger.error等实际上都是进行一次LogRecord的处理

包含一些基本的输出日志内容,以及内容中参数,还有附带的函数名称,行数等信息

logging源码阅读

这是我这边基于字典的一个logging配置

import logging import logging.config import os basedir = os.path.abspath(os.path.dirname(__file__)) log_path = os.path.join(basedir, '..', 'logs') service_name = "test_log" if not os.path.isdir(log_path): os.mkdir(log_path) class InfoFilter(logging.Filter): def filter(self, record): """ 只筛选出INFO级别的日志 """ if logging.INFO <= record.levelno < logging.ERROR: return super().filter(record) else: return 0 class ErrorFilter(logging.Filter): def filter(self, record): """ 只筛选出ERROR级别的日志 """ if logging.ERROR <= record.levelno < logging.CRITICAL: return super().filter(record) else: return 0 LOG_CONFIG_DICT = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'simple': { 'class': 'logging.Formatter', 'format': '%(asctime)s %(levelname)s %(name)s %(filename)s %(module)s %(funcName)s ' '%(lineno)d %(thread)d %(threadName)s %(process)d %(processName)s %(message)s' }, # json模式, 方便ELK收集处理 'json': { 'class': 'logging.Formatter', 'format': '{"time:":"%(asctime)s","level":"%(levelname)s","logger_name":"%(name)s",' '"file_name":"%(filename)s","module":"%(module)s","func_name":"%(funcName)s",' '"line_number":"%(lineno)d","thread_id":"%(thread)d","thread_name":"%(threadName)s",' '"process_id":"%(process)d","process_name":"%(processName)s","message":"%(message)s"}'} }, # 过滤器 'filters': { 'info_filter': { '()': InfoFilter }, 'error_filter': { '()': ErrorFilter } }, # 处理器 'handlers': { # 控制台输出 'console': { 'class': 'logging.StreamHandler', 'level': 'INFO', 'formatter': 'simple' }, # info文件输出 'info_file': { 'level': 'INFO', 'formatter': 'json', 'class': 'logging.handlers.TimedRotatingFileHandler', 'filename': '{0}/{1}_info.log'.format(log_path, service_name), 'when': "d", 'interval': 1, 'encoding': 'utf8', 'backupCount': 30, 'filters': ['info_filter'] }, # error文件输出 'error_file': { 'level': 'ERROR', 'formatter': 'json', 'class': 'logging.handlers.TimedRotatingFileHandler', 'filename': '{0}/{1}_error.log'.format(log_path, service_name), 'when': "d", 'interval': 1, 'encoding': 'utf8', 'backupCount': 30, 'filters': ['error_filter'] } }, # 记录器 'loggers': { 'full_logger': { 'handlers': ['console', 'info_file', 'error_file'], 'level': 'INFO' }, 'only_console_logger': { 'handlers': ['console'], 'level': 'INFO' }, 'only_file_logger': { 'handlers': ['info_file', 'error_file'] } } } logging.config.dictConfig(LOG_CONFIG_DICT)

下面我们就基于config第一次配置时整个logging的工作原理。结合代码进行分析logging的工作之路。

我的python版本是Python3.6

从logging.config开始

dictConfigClass = DictConfigurator def dictConfig(config): """Configure logging using a dictionary.""" dictConfigClass(config).configure()

实质上是实例化logging.config.DictConfigurator类的对象,然后执行其configure方法。

class DictConfigurator(BaseConfigurator): """ Configure logging using a dictionary-like object to describe the configuration. """ def configure(self): """Do the configuration.""" config = self.config if 'version' not in config: raise ValueError("dictionary doesn't specify a version") if config['version'] != 1: raise ValueError("Unsupported version: %s" % config['version']) incremental = config.pop('incremental', False) EMPTY_DICT = {} logging._acquireLock() try: if incremental: handlers = config.get('handlers', EMPTY_DICT) for name in handlers: if name not in logging._handlers: raise ValueError('No handler found with ' 'name %r' % name) else: try: handler = logging._handlers[name] handler_config = handlers[name] level = handler_config.get('level', None) if level: handler.setLevel(logging._checkLevel(level)) except Exception as e: raise ValueError('Unable to configure handler ' '%r: %s' % (name, e)) loggers = config.get('loggers', EMPTY_DICT) for name in loggers: try: self.configure_logger(name, loggers[name], True) except Exception as e: raise ValueError('Unable to configure logger ' '%r: %s' % (name, e)) root = config.get('root', None) if root: try: self.configure_root(root, True) except Exception as e: raise ValueError('Unable to configure root ' 'logger: %s' % e) else: disable_existing = config.pop('disable_existing_loggers', True) logging._handlers.clear() del logging._handlerList[:] # Do formatters first - they don't refer to anything else formatters = config.get('formatters', EMPTY_DICT) for name in formatters: try: formatters[name] = self.configure_formatter( formatters[name]) except Exception as e: raise ValueError('Unable to configure ' 'formatter %r: %s' % (name, e)) # Next, do filters - they don't refer to anything else, either filters = config.get('filters', EMPTY_DICT) for name in filters: try: filters[name] = self.configure_filter(filters[name]) except Exception as e: raise ValueError('Unable to configure ' 'filter %r: %s' % (name, e)) handlers = config.get('handlers', EMPTY_DICT) deferred = [] for name in sorted(handlers): try: handler = self.configure_handler(handlers[name]) handler.name = name handlers[name] = handler except Exception as e: if 'target not configured yet' in str(e): deferred.append(name) else: raise ValueError('Unable to configure handler ' '%r: %s' % (name, e)) # Now do any that were deferred for name in deferred: try: handler = self.configure_handler(handlers[name]) handler.name = name handlers[name] = handler except Exception as e: raise ValueError('Unable to configure handler ' '%r: %s' % (name, e)) root = logging.root existing = list(root.manager.loggerDict.keys()) existing.sort() child_loggers = [] #now set up the new ones... loggers = config.get('loggers', EMPTY_DICT) for name in loggers: if name in existing: i = existing.index(name) + 1 # look after name prefixed = name + "." pflen = len(prefixed) num_existing = len(existing) while i < num_existing: if existing[i][:pflen] == prefixed: child_loggers.append(existing[i]) i += 1 existing.remove(name) try: self.configure_logger(name, loggers[name]) except Exception as e: raise ValueError('Unable to configure logger ' '%r: %s' % (name, e)) _handle_existing_loggers(existing, child_loggers, disable_existing) # And finally, do the root logger root = config.get('root', None) if root: try: self.configure_root(root) except Exception as e: raise ValueError('Unable to configure root ' 'logger: %s' % e) finally: logging._releaseLock()

我们对**incremental**不进行设置,即使用全量的方式进行配置,对于所有的handler重置并处理

看下来基本上分为4步来走。

配置Formatter配置Filter配置Handler配置Logger

Formatter配置

源码部分

def configure_formatter(self, config): """Configure a formatter from a dictionary.""" if '()' in config: factory = config['()'] # for use in exception handler try: result = self.configure_custom(config) except TypeError as te: if "'format'" not in str(te): raise config['fmt'] = config.pop('format') config['()'] = factory result = self.configure_custom(config) else: fmt = config.get('format', None) dfmt = config.get('datefmt', None) style = config.get('style', '%') cname = config.get('class', None) if not cname: c = logging.Formatter else: c = _resolve(cname) result = c(fmt, dfmt, style) return result

可以使用自己的Formatter,默认使用logging.Formatter

Filter配置

def configure_filter(self, config): """Configure a filter from a dictionary.""" if '()' in config: result = self.configure_custom(config) else: name = config.get('name', '') result = logging.Filter(name) return result

设置filter,可以参考我上面配置的字典,可以自己定义Filter也可以使用系统内置Filter

Handler配置

def configure_handler(self, config): """Configure a handler from a dictionary.""" config_copy = dict(config) # for restoring in case of error formatter = config.pop('formatter', None) if formatter: try: formatter = self.config['formatters'][formatter] except Exception as e: raise ValueError('Unable to set formatter ' '%r: %s' % (formatter, e)) level = config.pop('level', None) filters = config.pop('filters', None) if '()' in config: c = config.pop('()') if not callable(c): c = self.resolve(c) factory = c else: cname = config.pop('class') klass = self.resolve(cname) #Special case for handler which refers to another handler if issubclass(klass, logging.handlers.MemoryHandler) and\ 'target' in config: try: th = self.config['handlers'][config['target']] if not isinstance(th, logging.Handler): config.update(config_copy) # restore for deferred cfg raise TypeError('target not configured yet') config['target'] = th except Exception as e: raise ValueError('Unable to set target handler ' '%r: %s' % (config['target'], e)) elif issubclass(klass, logging.handlers.SMTPHandler) and\ 'mailhost' in config: config['mailhost'] = self.as_tuple(config['mailhost']) elif issubclass(klass, logging.handlers.SysLogHandler) and\ 'address' in config: config['address'] = self.as_tuple(config['address']) factory = klass props = config.pop('.', None) kwargs = dict([(k, config[k]) for k in config if valid_ident(k)]) try: result = factory(**kwargs) except TypeError as te: if "'stream'" not in str(te): raise kwargs['strm'] = kwargs.pop('stream') result = factory(**kwargs) if formatter: result.setFormatter(formatter) if level is not None: result.setLevel(logging._checkLevel(level)) if filters: self.add_filters(result, filters) if props: for name, value in props.items(): setattr(result, name, value) return result

可以看到,前面顺序先配置formatter和filter是因为这里需要引用的到

值得注意的是,我们初始化时传递的一个字典,在整个配置过程中,字典里面的值会随着我们每次的配置变化而变化,所以我们在每个元素配置之后,在使用上一个字典元素时,就是配置完成之后的元素,为了方便理解,将配置filter之前和配置filter之后,config中的filter变化列出来

config == > before filters {'info_filter': {'()': }, 'error_filter': {'()': }} config == > after filters {'info_filter': , 'error_filter': }

配置前是我们配置文件中的内容,配置完成之后filter已经是一组对象了,所以在配置handler时我们就可以直接使用对象add_filter了。

Logger配置

def common_logger_config(self, logger, config, incremental=False): level = config.get('level', None) if level is not None: logger.setLevel(logging._checkLevel(level)) if not incremental: #Remove any existing handlers for h in logger.handlers[:]: logger.removeHandler(h) handlers = config.get('handlers', None) if handlers: self.add_handlers(logger, handlers) filters = config.get('filters', None) if filters: self.add_filters(logger, filters) def configure_logger(self, name, config, incremental=False): """Configure a non-root logger from a dictionary.""" logger = logging.getLogger(name) self.common_logger_config(logger, config, incremental) propagate = config.get('propagate', None) if propagate is not None: logger.propagate = propagate

这就比较容易理解了,将我们上面配置过的filters和handlers添加到我们的logger中。

这里需要注意的一点是logger = logging.getLogger(name),看下logger.getLogger源码

root = RootLogger(WARNING) Logger.root = root Logger.manager = Manager(Logger.root) def getLogger(name=None): if name: return Logger.manager.getLogger(name) else: return root class Manager(object): def __init__(self, rootnode): self.root = rootnode self.disable = 0 self.emittedNoHandlerWarning = False self.loggerDict = {} self.loggerClass = None self.logRecordFactory = None def getLogger(self, name): rv = None if not isinstance(name, str): raise TypeError('A logger name must be a string') _acquireLock() try: if name in self.loggerDict: rv = self.loggerDict[name] if isinstance(rv, PlaceHolder): ph = rv rv = (self.loggerClass or _loggerClass)(name) rv.manager = self self.loggerDict[name] = rv self._fixupChildren(ph, rv) self._fixupParents(rv) else: rv = (self.loggerClass or _loggerClass)(name) rv.manager = self self.loggerDict[name] = rv self._fixupParents(rv) finally: _releaseLock() return rv

可以看到,logging使用Mangaer进行logger的单例管理

截止到这里,基本上我们使用前的准备,logging都替我们准备好了,下面就是我们的使用

获取logger并记录日志

class Logger(Filterer): def info(self, msg, *args, **kwargs): if self.isEnabledFor(INFO): self._log(INFO, msg, args, **kwargs) def error(self, msg, *args, **kwargs): if self.isEnabledFor(ERROR): self._log(ERROR, msg, args, **kwargs) def isEnabledFor(self, level): if self.manager.disable >= level: return False return level >= self.getEffectiveLevel() def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False): sinfo = None if _srcfile: try: fn, lno, func, sinfo = self.findCaller(stack_info) except ValueError: # pragma: no cover fn, lno, func = "(unknown file)", 0, "(unknown function)" else: # pragma: no cover fn, lno, func = "(unknown file)", 0, "(unknown function)" if exc_info: if isinstance(exc_info, BaseException): exc_info = (type(exc_info), exc_info, exc_info.__traceback__) elif not isinstance(exc_info, tuple): exc_info = sys.exc_info() record = self.makeRecord(self.name, level, fn, lno, msg, args, exc_info, func, extra, sinfo) self.handle(record) def handle(self, record): if (not self.disabled) and self.filter(record): self.callHandlers(record) def callHandlers(self, record): c = self found = 0 while c: for hdlr in c.handlers: found = found + 1 if record.levelno >= hdlr.level: hdlr.handle(record) if not c.propagate: c = None #break out else: c = c.parent if (found == 0): if lastResort: if record.levelno >= lastResort.level: lastResort.handle(record) elif raiseExceptions and not self.manager.emittedNoHandlerWarning: sys.stderr.write("No handlers could be found for logger" " "%s"\n" % self.name) self.manager.emittedNoHandlerWarning = True class StreamHandler(Handler): terminator = '\n' def __init__(self, stream=None): """ Initialize the handler. If stream is not specified, sys.stderr is used. """ Handler.__init__(self) if stream is None: stream = sys.stderr self.stream = stream def flush(self): self.acquire() try: if self.stream and hasattr(self.stream, "flush"): self.stream.flush() finally: self.release() def emit(self, record): try: msg = self.format(record) stream = self.stream stream.write(msg) stream.write(self.terminator) self.flush() except Exception: self.handleError(record) def __repr__(self): level = getLevelName(self.level) name = getattr(self.stream, 'name', '') if name: name += ' ' return '' % (self.__class__.__name__, name, level)

根据代码可以看到符合我们流程图看到的流程,我们细化一下就是

1. 查看记录日志是否满足过滤器,然后准备logrecord中的信息并生成logrecord

2. 将logrecord交给所有handlers处理

3. 在handler中确定是否满足handler过滤器,满足的话按照配置的Formatter生成字符串

4. 将内容写入到具体的文件或者输出流

logger = logging.getLogger("full_logger") logger.info("111") # 根据我们的配置 这里会输出到流中,以及记录到test_log_info.log文件中 logger.error("error") # 根据我们的配置 这里会输出到流中,以及记录到test_log_error.info文件中