[docs]@classmethoddefparse_string(cls,s:str):""" Parse a document in JSON format from a string. Args: s: string containing the JSON representation of the config """try:obj=cls()obj.update(json.loads(s))returnobjexceptjson.JSONDecodeErrorase:log.error(f"Error while encoding json: {e}")
[docs]@classmethoddefparse_url(cls,url:str):""" Parse a document in JSON format from a URL. Args: url: any URL supported by urllib (e.g. file://local.json or https://user:pass@couch.db/database/my_doc) """f_handle=file_from_url(url)returncls.parse_string(f_handle.read())
[docs]defbuild_hero_for_boss(self,boss_object,realm="heros"):# replace special string for asyncio loop and multiprocess poolif"tags"inself:self["tags"].append(f"BOSS: {boss_object.name}")else:self["tags"]=[f"BOSS: {boss_object.name}"]forkey,valinself["arguments"].items():ifval=="@_boss_loop":self["arguments"][key]=boss_object._loopifval=="@_boss_pool":self["arguments"][key]=boss_object._poolif(datasource_config:=self.datasource_config())isnotNone:ifdatasource_config["async"]:returnDatasourceHEROFactory.build(self["classname"],self["arguments"],self["_id"],tags=self["tags"],observables=datasource_config["observables"],realm=realm,)else:returnPolledDatasourceHEROFactory.build(self["classname"],self["arguments"],self["_id"],tags=self["tags"],loop=boss_object._loop,interval=datasource_config["interval"],realm=realm,observables=datasource_config["observables"],)else:returnHEROFactory.build(self["classname"],self["arguments"],self["_id"],realm=realm,tags=self["tags"])