[docs]classBOSS:hero_config_sources:listdef__init__(self,name:str,configs:list,loop:asyncio.AbstractEventLoop,pool:ProcessPoolExecutor,realm:str,expose:bool=True):""" A BOSS object. Args: name: name of the Boss object loop: asyncio event loop that can be used to schedule tasks configs: List of WorkerConfigurationDocuments to specify the workers realm: Name of the realm the BOSS starts their HEROs in. expose: Specify whether the BOSS object itself should be exposed as HERO """self.name=nameself.heros:dict={}self.hero_config_sources:list=[]self._loop=loopself._pool=poolself.realm=realmself.start_all()
[docs]def_config_from_name(self,name):ifnamenotinself.heros:raiseNameError(f"HERO with name {name} not run by this BOSS. Available HEROs are {self.heros.keys()}")returnself.heros[name]["config"]
[docs]defadd_hero_source(self,parser:Callable[[str],WorkerConfigurationDocument],target:str):""" Adds a data source to the BOSS from which HERO configurations are loaded. Args: parser: A function which takes :code:`target` as an argument and returns a :py:class:`boss.configuration.WorkerConfigurationDocument` dict with the HERO config information. target: Target for the :code:`parser`, for example an URL for :py:meth:`boss.configuration.WorkerConfigurationDocument.parse_url` """self.hero_config_sources.append([parser,target])
[docs]defrefresh_hero_sources(self,auto_start:bool=True):""" Refresh the HERO configuration data from the registered sources, updating existing HEROs, adding new ones, and removing those that are no longer present in the sources. HEROs that were running before are only restarted if their configuration changed. HEROs without configuration changes stay untouched. Args: auto_start: If True, automatically starts a *new* hero source if its configuration is loaded. Defaults to True. Does not influence the behavior of HEROs that are already registered. """current_heros=list(self.heros.keys())defhandle_hero_dict(config:dict):ifconfigisnotNone:if"rows"inconfig:forconf_rowinconfig["rows"]:if"doc"inconf_row:handle_hero_dict(conf_row["doc"])else:handle_hero_dict(conf_row)elif"_id"inconfig:name=config["_id"]do_start=Falseifnameinself.heros:ifself.heros[name]["config"]!=config:# config changed, we need to restartifself.heros[name]["status"]=="running":do_start=True# was running before, auto start with the reloaded configself.stop_hero(name)self.heros[name]["config"]=WorkerConfigurationDocument(config)current_heros.remove(name)else:do_start=auto_startself.add_hero(config,auto_start=do_start)forparser,targetinself.hero_config_sources:log.info("refreshing HERO source %s",target)config=parser(target)handle_hero_dict(config)fordeleted_heroincurrent_heros:# hero not found in reloaded sources, deletelog.info("Removing HERO %s from BOSS",deleted_hero)self.remove_hero(deleted_hero)
[docs]defadd_hero(self,config:WorkerConfigurationDocument|dict,auto_start:bool=True):""" Start a new HERO and keep it running. Note that the id of the HERO specified in the config must be unique. Args: config: configuration for the new HERO. If a dict is given, it is converted into a WorkerConfigurationDocument. auto_start: If true the new HERO is immediately started after adding """ifnotisinstance(config,dict):returnNoneifnotisinstance(config,WorkerConfigurationDocument):config=WorkerConfigurationDocument(config)name=config["_id"]ifnamenotinself.heros:self.heros[name]={"config":config,"object":None,"status":"stopped"}ifauto_start:self.start_hero(name)
[docs]defstart_hero(self,name):""" Start HERO with given name. """config=self._config_from_name(name)status=self.heros[name]["status"]ifstatusnotin"running":try:self.heros[name]["object"]=config.build_hero_for_boss(self,self.realm)log.info(f"creating HERO with name {config['_id']} from class {config['classname']}")# calling setup hook if it existsifhasattr(self.heros[config["_id"]],"_setup")andcallable(getattr(self.heros[name]["object"],"_setup")):self.heros[name]["object"]._setup()self.heros[name]["status"]="running"exceptKeyErrorase:log.error(f"creating HERO with invalid dict: {config} failed: {e}")exceptExceptionase:log.error(f"creating HERO with name {config['_id']} from class {config['classname']} failed: {e}")
[docs]defstop_hero(self,name:str):config=self._config_from_name(name)status=self.heros[name]["status"]ifstatus=="running":log.info(f"destroying HERO with name {config['_id']} from class {config['classname']}")obj=self.heros[name]["object"]# calling teardown hook if it existsifhasattr(obj,"_teardown")andcallable(getattr(obj,"_teardown")):obj._teardown()obj._destroy_hero()delself.heros[name]["object"]self.heros[name]["object"]=Nonedelobjself.heros[name]["status"]="stopped"