Coverage for src/ramses_tx/logger.py: 45%
141 statements
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2026-01-05 21:46 +0100
1#!/usr/bin/env python3
2"""RAMSES RF - a RAMSES-II protocol decoder & analyser.
4This module wraps logger to provide bespoke functionality, especially for timestamps.
5"""
7from __future__ import annotations
9import logging
10import re
11import shutil
12import sys
13from collections.abc import Callable, Mapping
14from datetime import datetime as dt
15from logging.handlers import TimedRotatingFileHandler as _TimedRotatingFileHandler
16from typing import Any
18from .version import VERSION
20DEV_MODE = False
22_LOGGER = logging.getLogger(__name__)
23if DEV_MODE:
24 _LOGGER.setLevel(logging.DEBUG)
26try:
27 import colorlog
28except ModuleNotFoundError:
29 _use_color_ = False
30 _LOGGER.warning("Consider installing the colorlog library for colored logging")
31else:
32 _use_color_ = True
33 # logging.basicConfig(format=DEFAULT_FMT, datefmt=DEFAULT_DATEFMT) # Causes issues
35DEFAULT_FMT = "%(asctime)s.%(msecs)03d %(message)s"
36DEFAULT_DATEFMT = "%H:%M:%S"
38# TODO: make account for the non-printing characters
39CONSOLE_COLS = int(shutil.get_terminal_size(fallback=(int(2e3), 24)).columns - 1)
41if DEV_MODE: # Do this to have longer-format console messages
42 # HH:MM:SS.sss vs YYYY-MM-DDTHH:MM:SS.ssssss, shorter format for the console
43 CONSOLE_FMT = f"%(asctime)s%(frame).{CONSOLE_COLS - 27}s"
44else:
45 CONSOLE_FMT = f"%(asctime)s%(frame).{CONSOLE_COLS - 13}s"
47PKT_LOG_FMT = "%(asctime)s%(frame)s"
49BANDW_SUFFIX = "%(message)s%(error_text)s%(comment)s"
50COLOR_SUFFIX = "%(yellow)s%(message)s%(red)s%(error_text)s%(cyan)s%(comment)s"
52# How to strip ASCII colour from a text file:
53# sed -r "s/\x1B\[(([0-9]{1,2})?(;)?([0-9]{1,2})?)?[m,K,H,f,J]//g" file_name
55LOG_COLOURS = {
56 "DEBUG": "white",
57 "INFO": "green",
58 "WARNING": "yellow",
59 "ERROR": "bold_red",
60 "CRITICAL": "bold_red",
61} # default_log_colors
64class _Logger(logging.Logger): # use pkt.dtm for the log record timestamp
65 """Logger instances represent a single logging channel."""
67 def makeRecord(
68 self,
69 name: str,
70 level: int,
71 fn: str,
72 lno: int,
73 msg: object,
74 args: Any,
75 exc_info: Any,
76 func: str | None = None,
77 extra: Mapping[str, object] | None = None,
78 sinfo: str | None = None,
79 ) -> logging.LogRecord:
80 """Create a specialized LogRecord with a bespoke timestamp.
82 Will overwrite created and msecs (and thus asctime), but not relativeCreated.
83 """
85 extra = dict(extra or {}) # work with a copy
86 extra["frame"] = extra.pop("_frame", "")
87 if extra["frame"]:
88 extra["frame"] = f" {extra['_rssi']} {extra['frame']}"
90 rv = super().makeRecord(
91 name, level, fn, lno, msg, args, exc_info, func, extra, sinfo
92 )
94 if hasattr(rv, "dtm"): # if dtm := extra.get("dtm"):
95 ct = rv.dtm.timestamp()
96 rv.created = ct
97 rv.msecs = (ct - int(ct)) * 1000
99 if rv.msg:
100 rv.msg = f" < {rv.msg}"
102 if value := getattr(rv, "error_text", None): # HACK
103 rv.error_text = f" * {value}"
105 if value := getattr(rv, "comment", None): # HACK
106 rv.comment = f" # {value}"
108 return rv
111class _Formatter: # format asctime with configurable precision
112 """Formatter instances convert a LogRecord to text."""
114 converter = None # was: time.localtime
115 default_time_format = "%Y-%m-%dT%H:%M:%S.%f"
116 precision = 6
118 def formatTime(self, record: logging.LogRecord, datefmt: str | None = None) -> str:
119 """Return the creation time (asctime) of the LogRecord as formatted text.
121 Allows for sub-millisecond precision, using datetime instead of time objects.
122 """
123 result = dt.fromtimestamp(record.created).strftime(
124 datefmt or self.default_time_format
125 )
126 if "f" not in self.default_time_format:
127 return result
128 precision = self.precision or -1
129 return result[: precision - 6] if -1 <= precision < 6 else result
132class ColoredFormatter(_Formatter, colorlog.ColoredFormatter): # type: ignore[misc]
133 pass
136class Formatter(_Formatter, logging.Formatter): # type: ignore[misc]
137 pass
140class PktLogFilter(logging.Filter): # record.levelno in (logging.INFO, logging.WARNING)
141 """For packet log files, process only wanted packets."""
143 def filter(self, record: logging.LogRecord) -> bool:
144 """Return True if the record is to be processed."""
145 # if record._frame[4:] or record.comment or record.error_text:
146 return record.levelno in (logging.INFO, logging.WARNING)
149class StdErrFilter(logging.Filter): # record.levelno >= logging.WARNING
150 """For sys.stderr, process only wanted packets."""
152 def filter(self, record: logging.LogRecord) -> bool:
153 """Return True if the record is to be processed.""" # WARNING-30, ERROR-40
154 return record.levelno >= logging.WARNING
157class StdOutFilter(logging.Filter): # record.levelno < logging.WARNING
158 """For sys.stdout, process only wanted packets."""
160 def filter(self, record: logging.LogRecord) -> bool:
161 """Return True if the record is to be processed.""" # INFO-20, DEBUG-10
162 return record.levelno < logging.WARNING
165class BlockMqttFilter(logging.Filter):
166 """Block mqtt traffic"""
168 def filter(self, record: logging.LogRecord) -> bool:
169 """Return True if the record is to be processed."""
170 return not record.getMessage().startswith("mq Rx: ")
173class TimedRotatingFileHandler(_TimedRotatingFileHandler):
174 def __init__(self, *args: Any, **kwargs: Any) -> None:
175 super().__init__(*args, **kwargs)
176 assert self.when == "MIDNIGHT"
177 self.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}$", re.ASCII)
179 # def emit(self, record): # used only for debugging
180 # if True or self.shouldRollover(record):
181 # self.doRollover()
182 # return super().emit(record)
185def getLogger( # permits a bespoke Logger class
186 name: str | None = None, pkt_log: bool = False
187) -> logging.Logger:
188 """Return a logger with the specified name, creating it if necessary.
190 Used to set record timestamps to its packet timestamp instead of the current time.
191 """
192 if name is None or not pkt_log:
193 return logging.getLogger(name)
195 # Acquire lock, so no-one else uses our Logger class
196 try: # TODO: remove this ASAP
197 logging._acquireLock() # type: ignore[attr-defined]
198 except AttributeError: # Python 3.13+
199 logging._lock.acquire() # type: ignore[attr-defined]
201 klass = logging.getLoggerClass()
202 logging.setLoggerClass(_Logger)
204 logger = logging.getLogger(name)
206 logging.setLoggerClass(klass)
208 try: # TODO: remove this ASAP
209 logging._releaseLock() # type: ignore[attr-defined]
210 except AttributeError: # Python 3.13+
211 logging._lock.release() # type: ignore[attr-defined]
213 return logger
216def set_logger_timesource(dtm_now: Callable[..., dt]) -> None:
217 """Set a custom record factory, with a bespoke source of timestamps.
219 Used to have records with the same datetime as the most recent packet log record.
220 """
222 def record_factory(*args: Any, **kwargs: Any) -> logging.LogRecord:
223 record = old_factory(*args, **kwargs)
225 ct = dtm_now().timestamp()
226 record.created = ct
227 record.msecs = (ct - int(ct)) * 1000
229 return record
231 old_factory = logging.getLogRecordFactory()
233 logging.setLogRecordFactory(record_factory)
236def set_pkt_logging(
237 logger: logging.Logger,
238 cc_console: bool = False,
239 file_name: str | None = None,
240 rotate_backups: int = 0,
241 rotate_bytes: int | None = None,
242) -> None:
243 """Create/configure handlers, formatters, etc.
245 Parameters:
246 - file_name: base of file to store packet logs in, from root
247 - rotate_backups: keep this many copies, and rotate at midnight unless:
248 - rotate_bytes: rotate log files when log > rotate_size
249 """
251 logger.propagate = False # log file is distinct from any app/debug logging
252 logger.setLevel(logging.DEBUG) # must be at least .INFO
254 # as set_pkt_logging() may be called several times: to avoid duplicates in logs...
255 for handler in logger.handlers: # dont use logger.hasHandlers() as not propagating
256 logger.removeHandler(handler)
258 if file_name: # note: this opens the packet_log file IO and may block
259 if rotate_bytes:
260 rotate_backups = rotate_backups or 2
261 handler = logging.handlers.RotatingFileHandler(
262 file_name, maxBytes=rotate_bytes, backupCount=rotate_backups
263 )
264 elif rotate_backups:
265 handler = TimedRotatingFileHandler(
266 file_name, when="MIDNIGHT", backupCount=rotate_backups
267 )
268 else:
269 handler = logging.FileHandler(file_name)
271 logfile_fmt = Formatter(fmt=PKT_LOG_FMT + BANDW_SUFFIX)
273 handler.setFormatter(logfile_fmt)
274 handler.setLevel(logging.INFO) # .INFO (usually), or .DEBUG
275 handler.addFilter(PktLogFilter()) # record.levelno in (.INFO, .WARNING)
276 logger.addHandler(handler)
278 elif cc_console:
279 logger.addHandler(logging.NullHandler())
281 else:
282 logger.setLevel(logging.CRITICAL)
283 return
285 if cc_console: # CC: output to stdout/stderr
286 console_fmt: ColoredFormatter | Formatter
287 if _use_color_:
288 console_fmt = ColoredFormatter(
289 fmt=f"%(log_color)s{CONSOLE_FMT + COLOR_SUFFIX}",
290 reset=True,
291 log_colors=LOG_COLOURS,
292 )
293 else:
294 console_fmt = Formatter(fmt=CONSOLE_FMT + BANDW_SUFFIX)
296 handler = logging.StreamHandler(stream=sys.stderr)
297 handler.setFormatter(console_fmt)
298 handler.setLevel(logging.WARNING) # musr be .WARNING or less
299 handler.addFilter(StdErrFilter()) # record.levelno >= .WARNING
300 logger.addHandler(handler)
302 handler = logging.StreamHandler(stream=sys.stdout)
303 handler.setFormatter(console_fmt)
304 handler.setLevel(logging.DEBUG) # must be .INFO or less
305 handler.addFilter(StdOutFilter()) # record.levelno < .WARNING
306 logger.addHandler(handler)
308 extras = {
309 "_frame": "",
310 "error_text": "",
311 "comment": f"ramses_tx {VERSION}",
312 }
313 logger.warning("", extra=extras) # initial log line