Coverage for arrakis/__main__.py: 59.5%
153 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-16 15:43 -0700
« prev ^ index » next coverage.py v7.6.12, created at 2025-04-16 15:43 -0700
1import argparse
2import logging
3import os
5import numpy
6from gpstime import GPSTimeParseAction
8from . import Channel, Client, __version__, constants
9from .flight import flight
11logger = logging.getLogger("arrakis")
14##########
17parser = argparse.ArgumentParser()
18parser.add_argument("--version", "-v", action="version", version=__version__)
19parser.add_argument(
20 "--url",
21 "-u",
22 type=str,
23 help="initial server url",
24)
25subparsers = parser.add_subparsers()
28def add_subparser(cmd, **kwargs):
29 sp = subparsers.add_parser(
30 cmd.__name__,
31 help=cmd.__doc__.splitlines()[0],
32 description=cmd.__doc__,
33 **kwargs,
34 )
35 sp.set_defaults(func=cmd)
36 return sp
39##########
42def parse_pattern(pattern):
43 if not pattern or pattern == "*":
44 pattern = constants.DEFAULT_MATCH
45 return pattern
48def print_channel(chan: Channel, as_json: bool = False) -> None:
49 if as_json:
50 output = chan.to_json()
51 else:
52 output = repr(chan)
53 print(output)
56def _add_find_count_args(parser):
57 parser.add_argument(
58 "pattern",
59 type=str,
60 nargs="?",
61 default=constants.DEFAULT_MATCH,
62 help="channel pattern",
63 )
64 parser.add_argument(
65 "--data-type",
66 "--dtype",
67 metavar="DTYPE",
68 type=numpy.dtype,
69 # action="append",
70 help="data type",
71 )
72 parser.add_argument(
73 "--min_rate",
74 metavar="INT",
75 type=int,
76 help="minimum sample rate",
77 )
78 parser.add_argument(
79 "--max_rate",
80 metavar="INT",
81 type=int,
82 help="maximum sample rate",
83 )
84 parser.add_argument(
85 "--publisher",
86 metavar="ID",
87 type=str,
88 # action="append",
89 help="publisher ID",
90 )
93##################################################
96def find(args):
97 """find channels matching regexp pattern"""
98 as_json = args.json
99 del args.json
100 client = Client(url=args.url)
101 del args.url
102 for chan in client.find(**vars(args)):
103 print_channel(chan, as_json=as_json)
106sparser = add_subparser(find, aliases=["search", "list"])
107_add_find_count_args(sparser)
108sparser.add_argument(
109 "-j",
110 "--json",
111 action="store_true",
112 help="print channel output as JSON",
113)
116##########
119def count(args):
120 """count channels matching pattern"""
121 client = Client(url=args.url)
122 del args.url
123 print(client.count(**vars(args)))
126sparser = add_subparser(count)
127_add_find_count_args(sparser)
130##########
133def describe(args):
134 """describe channels"""
135 as_json = args.json
136 del args.json
137 client = Client(url=args.url)
138 del args.url
139 for channel in client.describe(**vars(args)).values():
140 print_channel(channel, as_json=as_json)
143sparser = add_subparser(describe, aliases=["show"])
144sparser.add_argument("channels", nargs="+", help="list of channels to describe")
145sparser.add_argument(
146 "-j",
147 "--json",
148 action="store_true",
149 help="print channel output as JSON",
150)
153##########
156def stream(args):
157 """stream data for channels"""
158 if args.start:
159 args.start = args.start.gps()
160 if args.end:
161 args.end = args.end.gps()
162 client = Client(url=args.url)
163 del args.url
164 for buf in client.stream(**vars(args)):
165 print(buf)
168sparser = add_subparser(stream)
169sparser.add_argument("channels", nargs="+", help="list of channels to stream")
170sparser.add_argument(
171 "--start",
172 action=GPSTimeParseAction,
173 help="start time (GPS or arbitrary date/time string)",
174)
175sparser.add_argument(
176 "--end",
177 action=GPSTimeParseAction,
178 help="end time (GPS or arbitrary date/time string)",
179)
182##########
185def fetch(args):
186 """fetch data for channels"""
187 args.start = args.start.gps()
188 args.end = args.end.gps()
189 client = Client(url=args.url)
190 del args.url
191 data = client.fetch(**vars(args))
192 print(data)
195sparser = add_subparser(fetch)
196sparser.add_argument("channels", nargs="+", help="list of channels to fetch")
197sparser.add_argument(
198 "--start",
199 required=True,
200 action=GPSTimeParseAction,
201 help="start time (GPS or arbitrary date/time string)",
202)
203sparser.add_argument(
204 "--end",
205 required=True,
206 action=GPSTimeParseAction,
207 help="end time (GPS or arbitrary date/time string)",
208)
211##########
214def publish(args):
215 """publish values to channels
217 Arguments should be channel name + generator function pairs. The
218 generator function is used to generate data for the specified
219 channel and should be a sympy expression, with the 't' value used
220 to indicate time, e.g. "sin(t)". A numeric value for the
221 generator function will generate a constant stream.
223 """
224 import sched
225 import time
226 from math import log2
228 from sympy import lambdify, parse_expr
229 from sympy.abc import t
231 from . import Publisher, SeriesBlock, Time
233 if not args.list:
234 if len(args.channel_args) % 2 != 0:
235 parser.error("arguments must be channel name + generator function pairs")
236 chan_funcs = {}
237 for name, value in zip(args.channel_args[::2], args.channel_args[1::2]):
238 expr = parse_expr(value)
239 chan_funcs[name] = lambdify(t, expr, "numpy")
241 if args.rate < 1 or log2(args.rate) % 1 != 0:
242 parser.error("rate must be power of two.")
244 publisher = Publisher(args.publisher_id, args.url)
245 publisher.register()
247 if args.list:
248 for _name, channel in publisher.channels.items():
249 print_channel(channel)
250 return
252 def _gen_data(publisher, tick):
253 metadata = {}
254 series = {}
255 for name, func in chan_funcs.items():
256 try:
257 channel = publisher.channels[name]
258 except KeyError:
259 msg = f"unknown channel for publisher: {name}"
260 raise ValueError(msg) from None
261 time_array = numpy.arange(int(channel.sample_rate / args.rate)) + tick
262 data = numpy.array(
263 numpy.broadcast_to(func(time_array), time_array.shape),
264 dtype=channel.data_type,
265 )
266 series[name] = data
267 metadata[name] = channel
268 sblock = SeriesBlock(
269 tick * Time.SECONDS,
270 series,
271 metadata,
272 )
273 logger.info("publish: %s", sblock)
274 publisher.publish(sblock)
276 s = sched.scheduler(time.time, time.sleep)
277 tick = int(time.time())
278 with publisher:
279 while True:
280 tick += 1 / args.rate
281 s.enterabs(tick, 0, _gen_data, (publisher, tick))
282 s.run()
285sparser = add_subparser(publish)
286sparser.add_argument(
287 "--publisher-id",
288 type=str,
289 required=True,
290 help="publisher ID (required)",
291)
292sparser.add_argument(
293 "--rate",
294 type=int,
295 default=16,
296 help="publication rate, in Hz. Must be a power of two.",
297)
298lgroup = sparser.add_mutually_exclusive_group()
299lgroup.add_argument(
300 "--list",
301 action="store_true",
302 help="list publisher channels and exit",
303)
304lgroup.add_argument(
305 "channel_args",
306 nargs="*",
307 metavar="CHANNEL FUNC",
308 default=[],
309 help="channel name + generator function pairs",
310)
313##################################################
316def main():
317 logger.setLevel(os.getenv("LOG_LEVEL", "DEBUG").upper())
318 handler = logging.StreamHandler()
319 handler.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
320 logger.addHandler(handler)
322 args = parser.parse_args()
324 if "func" not in args:
325 parser.print_help()
326 return
328 func = args.func
329 del args.func
330 logger.debug(args)
332 try:
333 func(args)
334 except flight.FlightError as e:
335 msg = f"request error: {e}"
336 raise SystemExit(msg) from e
339if __name__ == "__main__":
340 import signal
342 signal.signal(signal.SIGINT, signal.SIG_DFL)
343 main()