Coverage for src / dataknobs_bots / bot / manager.py: 31%

103 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 16:43 -0700

1"""Bot manager for multi-tenant bot instances. 

2 

3.. deprecated:: 

4 This module is deprecated. Use :class:`dataknobs_bots.bot.BotRegistry` instead, 

5 which provides the same functionality plus persistent storage backends, 

6 environment-aware configuration resolution, and TTL-based caching. 

7 

8 For simple in-memory usage, use :class:`dataknobs_bots.bot.InMemoryBotRegistry`. 

9 

10 Migration example:: 

11 

12 # Old (deprecated) 

13 from dataknobs_bots import BotManager 

14 manager = BotManager() 

15 bot = await manager.get_or_create("my-bot", config) 

16 

17 # New (recommended) 

18 from dataknobs_bots.bot import InMemoryBotRegistry 

19 registry = InMemoryBotRegistry(validate_on_register=False) 

20 await registry.initialize() 

21 await registry.register("my-bot", config) 

22 bot = await registry.get_bot("my-bot") 

23""" 

24 

25from __future__ import annotations 

26 

27import asyncio 

28import inspect 

29import logging 

30import warnings 

31from pathlib import Path 

32from typing import TYPE_CHECKING, Any, Callable, Protocol, runtime_checkable 

33 

34from .base import DynaBot 

35 

36if TYPE_CHECKING: 

37 from dataknobs_config import EnvironmentAwareConfig, EnvironmentConfig 

38 

39logger = logging.getLogger(__name__) 

40 

41_DEPRECATION_MESSAGE = ( 

42 "BotManager is deprecated and will be removed in a future version. " 

43 "Use BotRegistry or InMemoryBotRegistry instead, which provide persistent " 

44 "storage backends, environment-aware resolution, and TTL caching. " 

45 "See dataknobs_bots.bot.BotRegistry for details." 

46) 

47 

48 

49@runtime_checkable 

50class ConfigLoader(Protocol): 

51 """Protocol for configuration loaders with a load method.""" 

52 

53 def load(self, bot_id: str) -> dict[str, Any]: 

54 """Load configuration for a bot.""" 

55 ... 

56 

57 

58@runtime_checkable 

59class AsyncConfigLoader(Protocol): 

60 """Protocol for async configuration loaders.""" 

61 

62 async def load(self, bot_id: str) -> dict[str, Any]: 

63 """Load configuration for a bot asynchronously.""" 

64 ... 

65 

66 

67ConfigLoaderType = ( 

68 ConfigLoader 

69 | AsyncConfigLoader 

70 | Callable[[str], dict[str, Any]] 

71 | Callable[[str], Any] # For async callables 

72) 

73 

74 

75class BotManager: 

76 """Manages multiple DynaBot instances for multi-tenancy. 

77 

78 .. deprecated:: 

79 Use :class:`BotRegistry` or :class:`InMemoryBotRegistry` instead. 

80 

81 BotManager handles: 

82 - Bot instance creation and caching 

83 - Client-level isolation 

84 - Configuration loading and validation 

85 - Bot lifecycle management 

86 - Environment-aware resource resolution (optional) 

87 

88 Each client/tenant gets its own bot instance, which can serve multiple users. 

89 The underlying DynaBot architecture ensures conversation isolation through 

90 BotContext with different conversation_ids. 

91 

92 Attributes: 

93 bots: Cache of bot_id -> DynaBot instances 

94 config_loader: Optional configuration loader (sync or async) 

95 environment_name: Current environment name (if environment-aware) 

96 

97 Example: 

98 ```python 

99 # Basic usage with inline configuration 

100 manager = BotManager() 

101 bot = await manager.get_or_create("my-bot", config={ 

102 "llm": {"provider": "openai", "model": "gpt-4o"}, 

103 "conversation_storage": {"backend": "memory"}, 

104 }) 

105 

106 # With environment-aware configuration 

107 manager = BotManager(environment="production") 

108 bot = await manager.get_or_create("my-bot", config={ 

109 "bot": { 

110 "llm": {"$resource": "default", "type": "llm_providers"}, 

111 "conversation_storage": {"$resource": "db", "type": "databases"}, 

112 } 

113 }) 

114 

115 # With config loader function 

116 def load_config(bot_id: str) -> dict: 

117 return load_yaml(f"configs/{bot_id}.yaml") 

118 

119 manager = BotManager(config_loader=load_config) 

120 bot = await manager.get_or_create("my-bot") 

121 

122 # List active bots 

123 active_bots = manager.list_bots() 

124 ``` 

125 """ 

126 

127 def __init__( 

128 self, 

129 config_loader: ConfigLoaderType | None = None, 

130 environment: EnvironmentConfig | str | None = None, 

131 env_dir: str | Path = "config/environments", 

132 ): 

133 """Initialize BotManager. 

134 

135 Args: 

136 config_loader: Optional configuration loader. 

137 Can be: 

138 - An object with a `.load(bot_id)` method (sync or async) 

139 - A callable function: bot_id -> config_dict (sync or async) 

140 - None (configurations must be provided explicitly) 

141 environment: Environment name or EnvironmentConfig for resource resolution. 

142 If None, environment-aware features are disabled unless 

143 an EnvironmentAwareConfig is passed to get_or_create(). 

144 If a string, loads environment config from env_dir. 

145 env_dir: Directory containing environment config files. 

146 Only used if environment is a string name. 

147 """ 

148 warnings.warn(_DEPRECATION_MESSAGE, DeprecationWarning, stacklevel=2) 

149 

150 self._bots: dict[str, DynaBot] = {} 

151 self._config_loader = config_loader 

152 self._env_dir = Path(env_dir) 

153 

154 # Load environment config if specified 

155 self._environment: EnvironmentConfig | None = None 

156 if environment is not None: 

157 try: 

158 from dataknobs_config import EnvironmentConfig 

159 

160 if isinstance(environment, str): 

161 self._environment = EnvironmentConfig.load(environment, env_dir) 

162 else: 

163 self._environment = environment 

164 logger.info(f"Initialized BotManager with environment: {self._environment.name}") 

165 except ImportError: 

166 logger.warning( 

167 "dataknobs_config not installed, environment-aware features disabled" 

168 ) 

169 else: 

170 logger.info("Initialized BotManager") 

171 

172 @property 

173 def environment_name(self) -> str | None: 

174 """Get current environment name, or None if not environment-aware.""" 

175 return self._environment.name if self._environment else None 

176 

177 @property 

178 def environment(self) -> EnvironmentConfig | None: 

179 """Get current environment config, or None if not environment-aware.""" 

180 return self._environment 

181 

182 async def get_or_create( 

183 self, 

184 bot_id: str, 

185 config: dict[str, Any] | EnvironmentAwareConfig | None = None, 

186 use_environment: bool | None = None, 

187 config_key: str = "bot", 

188 ) -> DynaBot: 

189 """Get existing bot or create new one. 

190 

191 Args: 

192 bot_id: Bot identifier (e.g., "customer-support", "sales-assistant") 

193 config: Optional bot configuration. Can be: 

194 - dict with resolved values (traditional) 

195 - dict with $resource references (requires environment) 

196 - EnvironmentAwareConfig instance 

197 If not provided and config_loader is set, will load configuration. 

198 use_environment: Whether to use environment-aware resolution. 

199 - True: Use environment for $resource resolution 

200 - False: Use config as-is (no resolution) 

201 - None (default): Auto-detect based on whether manager has 

202 an environment configured or config is EnvironmentAwareConfig 

203 config_key: Key within config containing bot configuration. 

204 Defaults to "bot". Set to None to use root config. 

205 Only used when use_environment is True. 

206 

207 Returns: 

208 DynaBot instance 

209 

210 Raises: 

211 ValueError: If config is None and no config_loader is set 

212 

213 Example: 

214 ```python 

215 # Traditional usage (no environment resolution) 

216 manager = BotManager() 

217 bot = await manager.get_or_create("support-bot", config={ 

218 "llm": {"provider": "openai", "model": "gpt-4"}, 

219 "conversation_storage": {"backend": "memory"}, 

220 }) 

221 

222 # Environment-aware usage with $resource references 

223 manager = BotManager(environment="production") 

224 bot = await manager.get_or_create("support-bot", config={ 

225 "bot": { 

226 "llm": {"$resource": "default", "type": "llm_providers"}, 

227 "conversation_storage": {"$resource": "db", "type": "databases"}, 

228 } 

229 }) 

230 

231 # Explicit environment resolution control 

232 bot = await manager.get_or_create( 

233 "support-bot", 

234 config=my_config, 

235 use_environment=True, 

236 config_key="bot" 

237 ) 

238 ``` 

239 """ 

240 # Return cached bot if exists 

241 if bot_id in self._bots: 

242 logger.debug(f"Returning cached bot: {bot_id}") 

243 return self._bots[bot_id] 

244 

245 # Load configuration if not provided 

246 if config is None: 

247 if self._config_loader is None: 

248 raise ValueError( 

249 f"No configuration provided for bot '{bot_id}' " 

250 "and no config_loader is set" 

251 ) 

252 config = await self._load_config(bot_id) 

253 

254 # Determine whether to use environment resolution 

255 is_env_aware_config = False 

256 try: 

257 from dataknobs_config import EnvironmentAwareConfig 

258 

259 is_env_aware_config = isinstance(config, EnvironmentAwareConfig) 

260 except ImportError: 

261 pass 

262 

263 should_use_environment = use_environment 

264 if should_use_environment is None: 

265 # Auto-detect: use environment if manager has one or config is EnvironmentAwareConfig 

266 should_use_environment = self._environment is not None or is_env_aware_config 

267 

268 # Create new bot 

269 logger.info(f"Creating new bot: {bot_id} (environment_aware={should_use_environment})") 

270 

271 if should_use_environment: 

272 bot = await DynaBot.from_environment_aware_config( 

273 config, 

274 environment=self._environment, 

275 env_dir=self._env_dir, 

276 config_key=config_key, 

277 ) 

278 else: 

279 # Traditional path - use config as-is 

280 bot = await DynaBot.from_config(config) 

281 

282 # Cache and return 

283 self._bots[bot_id] = bot 

284 return bot 

285 

286 async def get(self, bot_id: str) -> DynaBot | None: 

287 """Get bot without creating if doesn't exist. 

288 

289 Args: 

290 bot_id: Bot identifier 

291 

292 Returns: 

293 DynaBot instance if exists, None otherwise 

294 """ 

295 return self._bots.get(bot_id) 

296 

297 async def remove(self, bot_id: str) -> bool: 

298 """Remove bot instance. 

299 

300 Args: 

301 bot_id: Bot identifier 

302 

303 Returns: 

304 True if bot was removed, False if didn't exist 

305 """ 

306 if bot_id in self._bots: 

307 logger.info(f"Removing bot: {bot_id}") 

308 del self._bots[bot_id] 

309 return True 

310 return False 

311 

312 async def reload(self, bot_id: str) -> DynaBot: 

313 """Reload bot instance with fresh configuration. 

314 

315 Args: 

316 bot_id: Bot identifier 

317 

318 Returns: 

319 New DynaBot instance 

320 

321 Raises: 

322 ValueError: If no config_loader is set 

323 """ 

324 if self._config_loader is None: 

325 raise ValueError("Cannot reload without config_loader") 

326 

327 # Remove existing bot 

328 await self.remove(bot_id) 

329 

330 # Create new one 

331 return await self.get_or_create(bot_id) 

332 

333 def list_bots(self) -> list[str]: 

334 """List all active bot IDs. 

335 

336 Returns: 

337 List of bot identifiers 

338 """ 

339 return list(self._bots.keys()) 

340 

341 def get_bot_count(self) -> int: 

342 """Get count of active bots. 

343 

344 Returns: 

345 Number of active bot instances 

346 """ 

347 return len(self._bots) 

348 

349 async def _load_config(self, bot_id: str) -> dict[str, Any]: 

350 """Load configuration for bot using config_loader. 

351 

352 Supports both synchronous and asynchronous config loaders. 

353 Handles both callable loaders and objects with a load() method. 

354 

355 Args: 

356 bot_id: Bot identifier 

357 

358 Returns: 

359 Bot configuration dictionary 

360 """ 

361 logger.debug(f"Loading configuration for bot: {bot_id}") 

362 

363 if callable(self._config_loader): 

364 # Handle callable config loader (function) 

365 if inspect.iscoroutinefunction(self._config_loader): 

366 # Async function 

367 result = await self._config_loader(bot_id) 

368 return dict(result) if isinstance(result, dict) else {} 

369 else: 

370 # Sync function - run in executor to avoid blocking 

371 loop = asyncio.get_event_loop() 

372 result = await loop.run_in_executor(None, self._config_loader, bot_id) 

373 return dict(result) if isinstance(result, dict) else {} 

374 else: 

375 # Assume it's an object with a load method 

376 load_method = self._config_loader.load # type: ignore 

377 

378 if inspect.iscoroutinefunction(load_method): 

379 # Async method 

380 result = await load_method(bot_id) 

381 return dict(result) if isinstance(result, dict) else {} 

382 else: 

383 # Sync method - run in executor to avoid blocking 

384 loop = asyncio.get_event_loop() 

385 result = await loop.run_in_executor(None, load_method, bot_id) 

386 return dict(result) if isinstance(result, dict) else {} 

387 

388 async def clear_all(self) -> None: 

389 """Clear all bot instances. 

390 

391 Useful for testing or when restarting the service. 

392 """ 

393 logger.info("Clearing all bot instances") 

394 self._bots.clear() 

395 

396 def get_portable_config( 

397 self, 

398 config: dict[str, Any] | EnvironmentAwareConfig, 

399 ) -> dict[str, Any]: 

400 """Get portable configuration for storage. 

401 

402 Extracts portable config (with $resource references intact, 

403 environment variables unresolved) suitable for storing in 

404 registries or databases. 

405 

406 Args: 

407 config: Configuration to make portable. 

408 Can be dict or EnvironmentAwareConfig. 

409 

410 Returns: 

411 Portable configuration dictionary 

412 

413 Example: 

414 ```python 

415 manager = BotManager(environment="production") 

416 

417 # Get portable config from EnvironmentAwareConfig 

418 portable = manager.get_portable_config(env_aware_config) 

419 

420 # Store in registry (portable across environments) 

421 await registry.store(bot_id, portable) 

422 ``` 

423 """ 

424 return DynaBot.get_portable_config(config) 

425 

426 def __repr__(self) -> str: 

427 """String representation.""" 

428 bots = ", ".join(self._bots.keys()) 

429 env = f", environment={self._environment.name!r}" if self._environment else "" 

430 return f"BotManager(bots=[{bots}], count={len(self._bots)}{env})"