Coverage for src/ramses_tx/logger.py: 45%

141 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-01-05 21:46 +0100

1#!/usr/bin/env python3 

2"""RAMSES RF - a RAMSES-II protocol decoder & analyser. 

3 

4This module wraps logger to provide bespoke functionality, especially for timestamps. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import re 

11import shutil 

12import sys 

13from collections.abc import Callable, Mapping 

14from datetime import datetime as dt 

15from logging.handlers import TimedRotatingFileHandler as _TimedRotatingFileHandler 

16from typing import Any 

17 

18from .version import VERSION 

19 

20DEV_MODE = False 

21 

22_LOGGER = logging.getLogger(__name__) 

23if DEV_MODE: 

24 _LOGGER.setLevel(logging.DEBUG) 

25 

26try: 

27 import colorlog 

28except ModuleNotFoundError: 

29 _use_color_ = False 

30 _LOGGER.warning("Consider installing the colorlog library for colored logging") 

31else: 

32 _use_color_ = True 

33 # logging.basicConfig(format=DEFAULT_FMT, datefmt=DEFAULT_DATEFMT) # Causes issues 

34 

35DEFAULT_FMT = "%(asctime)s.%(msecs)03d %(message)s" 

36DEFAULT_DATEFMT = "%H:%M:%S" 

37 

38# TODO: make account for the non-printing characters 

39CONSOLE_COLS = int(shutil.get_terminal_size(fallback=(int(2e3), 24)).columns - 1) 

40 

41if DEV_MODE: # Do this to have longer-format console messages 

42 # HH:MM:SS.sss vs YYYY-MM-DDTHH:MM:SS.ssssss, shorter format for the console 

43 CONSOLE_FMT = f"%(asctime)s%(frame).{CONSOLE_COLS - 27}s" 

44else: 

45 CONSOLE_FMT = f"%(asctime)s%(frame).{CONSOLE_COLS - 13}s" 

46 

47PKT_LOG_FMT = "%(asctime)s%(frame)s" 

48 

49BANDW_SUFFIX = "%(message)s%(error_text)s%(comment)s" 

50COLOR_SUFFIX = "%(yellow)s%(message)s%(red)s%(error_text)s%(cyan)s%(comment)s" 

51 

52# How to strip ASCII colour from a text file: 

53# sed -r "s/\x1B\[(([0-9]{1,2})?(;)?([0-9]{1,2})?)?[m,K,H,f,J]//g" file_name 

54 

55LOG_COLOURS = { 

56 "DEBUG": "white", 

57 "INFO": "green", 

58 "WARNING": "yellow", 

59 "ERROR": "bold_red", 

60 "CRITICAL": "bold_red", 

61} # default_log_colors 

62 

63 

64class _Logger(logging.Logger): # use pkt.dtm for the log record timestamp 

65 """Logger instances represent a single logging channel.""" 

66 

67 def makeRecord( 

68 self, 

69 name: str, 

70 level: int, 

71 fn: str, 

72 lno: int, 

73 msg: object, 

74 args: Any, 

75 exc_info: Any, 

76 func: str | None = None, 

77 extra: Mapping[str, object] | None = None, 

78 sinfo: str | None = None, 

79 ) -> logging.LogRecord: 

80 """Create a specialized LogRecord with a bespoke timestamp. 

81 

82 Will overwrite created and msecs (and thus asctime), but not relativeCreated. 

83 """ 

84 

85 extra = dict(extra or {}) # work with a copy 

86 extra["frame"] = extra.pop("_frame", "") 

87 if extra["frame"]: 

88 extra["frame"] = f" {extra['_rssi']} {extra['frame']}" 

89 

90 rv = super().makeRecord( 

91 name, level, fn, lno, msg, args, exc_info, func, extra, sinfo 

92 ) 

93 

94 if hasattr(rv, "dtm"): # if dtm := extra.get("dtm"): 

95 ct = rv.dtm.timestamp() 

96 rv.created = ct 

97 rv.msecs = (ct - int(ct)) * 1000 

98 

99 if rv.msg: 

100 rv.msg = f" < {rv.msg}" 

101 

102 if value := getattr(rv, "error_text", None): # HACK 

103 rv.error_text = f" * {value}" 

104 

105 if value := getattr(rv, "comment", None): # HACK 

106 rv.comment = f" # {value}" 

107 

108 return rv 

109 

110 

111class _Formatter: # format asctime with configurable precision 

112 """Formatter instances convert a LogRecord to text.""" 

113 

114 converter = None # was: time.localtime 

115 default_time_format = "%Y-%m-%dT%H:%M:%S.%f" 

116 precision = 6 

117 

118 def formatTime(self, record: logging.LogRecord, datefmt: str | None = None) -> str: 

119 """Return the creation time (asctime) of the LogRecord as formatted text. 

120 

121 Allows for sub-millisecond precision, using datetime instead of time objects. 

122 """ 

123 result = dt.fromtimestamp(record.created).strftime( 

124 datefmt or self.default_time_format 

125 ) 

126 if "f" not in self.default_time_format: 

127 return result 

128 precision = self.precision or -1 

129 return result[: precision - 6] if -1 <= precision < 6 else result 

130 

131 

132class ColoredFormatter(_Formatter, colorlog.ColoredFormatter): # type: ignore[misc] 

133 pass 

134 

135 

136class Formatter(_Formatter, logging.Formatter): # type: ignore[misc] 

137 pass 

138 

139 

140class PktLogFilter(logging.Filter): # record.levelno in (logging.INFO, logging.WARNING) 

141 """For packet log files, process only wanted packets.""" 

142 

143 def filter(self, record: logging.LogRecord) -> bool: 

144 """Return True if the record is to be processed.""" 

145 # if record._frame[4:] or record.comment or record.error_text: 

146 return record.levelno in (logging.INFO, logging.WARNING) 

147 

148 

149class StdErrFilter(logging.Filter): # record.levelno >= logging.WARNING 

150 """For sys.stderr, process only wanted packets.""" 

151 

152 def filter(self, record: logging.LogRecord) -> bool: 

153 """Return True if the record is to be processed.""" # WARNING-30, ERROR-40 

154 return record.levelno >= logging.WARNING 

155 

156 

157class StdOutFilter(logging.Filter): # record.levelno < logging.WARNING 

158 """For sys.stdout, process only wanted packets.""" 

159 

160 def filter(self, record: logging.LogRecord) -> bool: 

161 """Return True if the record is to be processed.""" # INFO-20, DEBUG-10 

162 return record.levelno < logging.WARNING 

163 

164 

165class BlockMqttFilter(logging.Filter): 

166 """Block mqtt traffic""" 

167 

168 def filter(self, record: logging.LogRecord) -> bool: 

169 """Return True if the record is to be processed.""" 

170 return not record.getMessage().startswith("mq Rx: ") 

171 

172 

173class TimedRotatingFileHandler(_TimedRotatingFileHandler): 

174 def __init__(self, *args: Any, **kwargs: Any) -> None: 

175 super().__init__(*args, **kwargs) 

176 assert self.when == "MIDNIGHT" 

177 self.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}$", re.ASCII) 

178 

179 # def emit(self, record): # used only for debugging 

180 # if True or self.shouldRollover(record): 

181 # self.doRollover() 

182 # return super().emit(record) 

183 

184 

185def getLogger( # permits a bespoke Logger class 

186 name: str | None = None, pkt_log: bool = False 

187) -> logging.Logger: 

188 """Return a logger with the specified name, creating it if necessary. 

189 

190 Used to set record timestamps to its packet timestamp instead of the current time. 

191 """ 

192 if name is None or not pkt_log: 

193 return logging.getLogger(name) 

194 

195 # Acquire lock, so no-one else uses our Logger class 

196 try: # TODO: remove this ASAP 

197 logging._acquireLock() # type: ignore[attr-defined] 

198 except AttributeError: # Python 3.13+ 

199 logging._lock.acquire() # type: ignore[attr-defined] 

200 

201 klass = logging.getLoggerClass() 

202 logging.setLoggerClass(_Logger) 

203 

204 logger = logging.getLogger(name) 

205 

206 logging.setLoggerClass(klass) 

207 

208 try: # TODO: remove this ASAP 

209 logging._releaseLock() # type: ignore[attr-defined] 

210 except AttributeError: # Python 3.13+ 

211 logging._lock.release() # type: ignore[attr-defined] 

212 

213 return logger 

214 

215 

216def set_logger_timesource(dtm_now: Callable[..., dt]) -> None: 

217 """Set a custom record factory, with a bespoke source of timestamps. 

218 

219 Used to have records with the same datetime as the most recent packet log record. 

220 """ 

221 

222 def record_factory(*args: Any, **kwargs: Any) -> logging.LogRecord: 

223 record = old_factory(*args, **kwargs) 

224 

225 ct = dtm_now().timestamp() 

226 record.created = ct 

227 record.msecs = (ct - int(ct)) * 1000 

228 

229 return record 

230 

231 old_factory = logging.getLogRecordFactory() 

232 

233 logging.setLogRecordFactory(record_factory) 

234 

235 

236def set_pkt_logging( 

237 logger: logging.Logger, 

238 cc_console: bool = False, 

239 file_name: str | None = None, 

240 rotate_backups: int = 0, 

241 rotate_bytes: int | None = None, 

242) -> None: 

243 """Create/configure handlers, formatters, etc. 

244 

245 Parameters: 

246 - file_name: base of file to store packet logs in, from root 

247 - rotate_backups: keep this many copies, and rotate at midnight unless: 

248 - rotate_bytes: rotate log files when log > rotate_size 

249 """ 

250 

251 logger.propagate = False # log file is distinct from any app/debug logging 

252 logger.setLevel(logging.DEBUG) # must be at least .INFO 

253 

254 # as set_pkt_logging() may be called several times: to avoid duplicates in logs... 

255 for handler in logger.handlers: # dont use logger.hasHandlers() as not propagating 

256 logger.removeHandler(handler) 

257 

258 if file_name: # note: this opens the packet_log file IO and may block 

259 if rotate_bytes: 

260 rotate_backups = rotate_backups or 2 

261 handler = logging.handlers.RotatingFileHandler( 

262 file_name, maxBytes=rotate_bytes, backupCount=rotate_backups 

263 ) 

264 elif rotate_backups: 

265 handler = TimedRotatingFileHandler( 

266 file_name, when="MIDNIGHT", backupCount=rotate_backups 

267 ) 

268 else: 

269 handler = logging.FileHandler(file_name) 

270 

271 logfile_fmt = Formatter(fmt=PKT_LOG_FMT + BANDW_SUFFIX) 

272 

273 handler.setFormatter(logfile_fmt) 

274 handler.setLevel(logging.INFO) # .INFO (usually), or .DEBUG 

275 handler.addFilter(PktLogFilter()) # record.levelno in (.INFO, .WARNING) 

276 logger.addHandler(handler) 

277 

278 elif cc_console: 

279 logger.addHandler(logging.NullHandler()) 

280 

281 else: 

282 logger.setLevel(logging.CRITICAL) 

283 return 

284 

285 if cc_console: # CC: output to stdout/stderr 

286 console_fmt: ColoredFormatter | Formatter 

287 if _use_color_: 

288 console_fmt = ColoredFormatter( 

289 fmt=f"%(log_color)s{CONSOLE_FMT + COLOR_SUFFIX}", 

290 reset=True, 

291 log_colors=LOG_COLOURS, 

292 ) 

293 else: 

294 console_fmt = Formatter(fmt=CONSOLE_FMT + BANDW_SUFFIX) 

295 

296 handler = logging.StreamHandler(stream=sys.stderr) 

297 handler.setFormatter(console_fmt) 

298 handler.setLevel(logging.WARNING) # musr be .WARNING or less 

299 handler.addFilter(StdErrFilter()) # record.levelno >= .WARNING 

300 logger.addHandler(handler) 

301 

302 handler = logging.StreamHandler(stream=sys.stdout) 

303 handler.setFormatter(console_fmt) 

304 handler.setLevel(logging.DEBUG) # must be .INFO or less 

305 handler.addFilter(StdOutFilter()) # record.levelno < .WARNING 

306 logger.addHandler(handler) 

307 

308 extras = { 

309 "_frame": "", 

310 "error_text": "", 

311 "comment": f"ramses_tx {VERSION}", 

312 } 

313 logger.warning("", extra=extras) # initial log line