Coverage for arrakis/__main__.py: 59.5%

153 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-04-16 15:43 -0700

1import argparse 

2import logging 

3import os 

4 

5import numpy 

6from gpstime import GPSTimeParseAction 

7 

8from . import Channel, Client, __version__, constants 

9from .flight import flight 

10 

11logger = logging.getLogger("arrakis") 

12 

13 

14########## 

15 

16 

17parser = argparse.ArgumentParser() 

18parser.add_argument("--version", "-v", action="version", version=__version__) 

19parser.add_argument( 

20 "--url", 

21 "-u", 

22 type=str, 

23 help="initial server url", 

24) 

25subparsers = parser.add_subparsers() 

26 

27 

28def add_subparser(cmd, **kwargs): 

29 sp = subparsers.add_parser( 

30 cmd.__name__, 

31 help=cmd.__doc__.splitlines()[0], 

32 description=cmd.__doc__, 

33 **kwargs, 

34 ) 

35 sp.set_defaults(func=cmd) 

36 return sp 

37 

38 

39########## 

40 

41 

42def parse_pattern(pattern): 

43 if not pattern or pattern == "*": 

44 pattern = constants.DEFAULT_MATCH 

45 return pattern 

46 

47 

48def print_channel(chan: Channel, as_json: bool = False) -> None: 

49 if as_json: 

50 output = chan.to_json() 

51 else: 

52 output = repr(chan) 

53 print(output) 

54 

55 

56def _add_find_count_args(parser): 

57 parser.add_argument( 

58 "pattern", 

59 type=str, 

60 nargs="?", 

61 default=constants.DEFAULT_MATCH, 

62 help="channel pattern", 

63 ) 

64 parser.add_argument( 

65 "--data-type", 

66 "--dtype", 

67 metavar="DTYPE", 

68 type=numpy.dtype, 

69 # action="append", 

70 help="data type", 

71 ) 

72 parser.add_argument( 

73 "--min_rate", 

74 metavar="INT", 

75 type=int, 

76 help="minimum sample rate", 

77 ) 

78 parser.add_argument( 

79 "--max_rate", 

80 metavar="INT", 

81 type=int, 

82 help="maximum sample rate", 

83 ) 

84 parser.add_argument( 

85 "--publisher", 

86 metavar="ID", 

87 type=str, 

88 # action="append", 

89 help="publisher ID", 

90 ) 

91 

92 

93################################################## 

94 

95 

96def find(args): 

97 """find channels matching regexp pattern""" 

98 as_json = args.json 

99 del args.json 

100 client = Client(url=args.url) 

101 del args.url 

102 for chan in client.find(**vars(args)): 

103 print_channel(chan, as_json=as_json) 

104 

105 

106sparser = add_subparser(find, aliases=["search", "list"]) 

107_add_find_count_args(sparser) 

108sparser.add_argument( 

109 "-j", 

110 "--json", 

111 action="store_true", 

112 help="print channel output as JSON", 

113) 

114 

115 

116########## 

117 

118 

119def count(args): 

120 """count channels matching pattern""" 

121 client = Client(url=args.url) 

122 del args.url 

123 print(client.count(**vars(args))) 

124 

125 

126sparser = add_subparser(count) 

127_add_find_count_args(sparser) 

128 

129 

130########## 

131 

132 

133def describe(args): 

134 """describe channels""" 

135 as_json = args.json 

136 del args.json 

137 client = Client(url=args.url) 

138 del args.url 

139 for channel in client.describe(**vars(args)).values(): 

140 print_channel(channel, as_json=as_json) 

141 

142 

143sparser = add_subparser(describe, aliases=["show"]) 

144sparser.add_argument("channels", nargs="+", help="list of channels to describe") 

145sparser.add_argument( 

146 "-j", 

147 "--json", 

148 action="store_true", 

149 help="print channel output as JSON", 

150) 

151 

152 

153########## 

154 

155 

156def stream(args): 

157 """stream data for channels""" 

158 if args.start: 

159 args.start = args.start.gps() 

160 if args.end: 

161 args.end = args.end.gps() 

162 client = Client(url=args.url) 

163 del args.url 

164 for buf in client.stream(**vars(args)): 

165 print(buf) 

166 

167 

168sparser = add_subparser(stream) 

169sparser.add_argument("channels", nargs="+", help="list of channels to stream") 

170sparser.add_argument( 

171 "--start", 

172 action=GPSTimeParseAction, 

173 help="start time (GPS or arbitrary date/time string)", 

174) 

175sparser.add_argument( 

176 "--end", 

177 action=GPSTimeParseAction, 

178 help="end time (GPS or arbitrary date/time string)", 

179) 

180 

181 

182########## 

183 

184 

185def fetch(args): 

186 """fetch data for channels""" 

187 args.start = args.start.gps() 

188 args.end = args.end.gps() 

189 client = Client(url=args.url) 

190 del args.url 

191 data = client.fetch(**vars(args)) 

192 print(data) 

193 

194 

195sparser = add_subparser(fetch) 

196sparser.add_argument("channels", nargs="+", help="list of channels to fetch") 

197sparser.add_argument( 

198 "--start", 

199 required=True, 

200 action=GPSTimeParseAction, 

201 help="start time (GPS or arbitrary date/time string)", 

202) 

203sparser.add_argument( 

204 "--end", 

205 required=True, 

206 action=GPSTimeParseAction, 

207 help="end time (GPS or arbitrary date/time string)", 

208) 

209 

210 

211########## 

212 

213 

214def publish(args): 

215 """publish values to channels 

216 

217 Arguments should be channel name + generator function pairs. The 

218 generator function is used to generate data for the specified 

219 channel and should be a sympy expression, with the 't' value used 

220 to indicate time, e.g. "sin(t)". A numeric value for the 

221 generator function will generate a constant stream. 

222 

223 """ 

224 import sched 

225 import time 

226 from math import log2 

227 

228 from sympy import lambdify, parse_expr 

229 from sympy.abc import t 

230 

231 from . import Publisher, SeriesBlock, Time 

232 

233 if not args.list: 

234 if len(args.channel_args) % 2 != 0: 

235 parser.error("arguments must be channel name + generator function pairs") 

236 chan_funcs = {} 

237 for name, value in zip(args.channel_args[::2], args.channel_args[1::2]): 

238 expr = parse_expr(value) 

239 chan_funcs[name] = lambdify(t, expr, "numpy") 

240 

241 if args.rate < 1 or log2(args.rate) % 1 != 0: 

242 parser.error("rate must be power of two.") 

243 

244 publisher = Publisher(args.publisher_id, args.url) 

245 publisher.register() 

246 

247 if args.list: 

248 for _name, channel in publisher.channels.items(): 

249 print_channel(channel) 

250 return 

251 

252 def _gen_data(publisher, tick): 

253 metadata = {} 

254 series = {} 

255 for name, func in chan_funcs.items(): 

256 try: 

257 channel = publisher.channels[name] 

258 except KeyError: 

259 msg = f"unknown channel for publisher: {name}" 

260 raise ValueError(msg) from None 

261 time_array = numpy.arange(int(channel.sample_rate / args.rate)) + tick 

262 data = numpy.array( 

263 numpy.broadcast_to(func(time_array), time_array.shape), 

264 dtype=channel.data_type, 

265 ) 

266 series[name] = data 

267 metadata[name] = channel 

268 sblock = SeriesBlock( 

269 tick * Time.SECONDS, 

270 series, 

271 metadata, 

272 ) 

273 logger.info("publish: %s", sblock) 

274 publisher.publish(sblock) 

275 

276 s = sched.scheduler(time.time, time.sleep) 

277 tick = int(time.time()) 

278 with publisher: 

279 while True: 

280 tick += 1 / args.rate 

281 s.enterabs(tick, 0, _gen_data, (publisher, tick)) 

282 s.run() 

283 

284 

285sparser = add_subparser(publish) 

286sparser.add_argument( 

287 "--publisher-id", 

288 type=str, 

289 required=True, 

290 help="publisher ID (required)", 

291) 

292sparser.add_argument( 

293 "--rate", 

294 type=int, 

295 default=16, 

296 help="publication rate, in Hz. Must be a power of two.", 

297) 

298lgroup = sparser.add_mutually_exclusive_group() 

299lgroup.add_argument( 

300 "--list", 

301 action="store_true", 

302 help="list publisher channels and exit", 

303) 

304lgroup.add_argument( 

305 "channel_args", 

306 nargs="*", 

307 metavar="CHANNEL FUNC", 

308 default=[], 

309 help="channel name + generator function pairs", 

310) 

311 

312 

313################################################## 

314 

315 

316def main(): 

317 logger.setLevel(os.getenv("LOG_LEVEL", "DEBUG").upper()) 

318 handler = logging.StreamHandler() 

319 handler.setFormatter(logging.Formatter("%(asctime)s %(message)s")) 

320 logger.addHandler(handler) 

321 

322 args = parser.parse_args() 

323 

324 if "func" not in args: 

325 parser.print_help() 

326 return 

327 

328 func = args.func 

329 del args.func 

330 logger.debug(args) 

331 

332 try: 

333 func(args) 

334 except flight.FlightError as e: 

335 msg = f"request error: {e}" 

336 raise SystemExit(msg) from e 

337 

338 

339if __name__ == "__main__": 

340 import signal 

341 

342 signal.signal(signal.SIGINT, signal.SIG_DFL) 

343 main()