Coverage for src / dataknobs_bots / bot / manager.py: 31%
103 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 16:43 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 16:43 -0700
1"""Bot manager for multi-tenant bot instances.
3.. deprecated::
4 This module is deprecated. Use :class:`dataknobs_bots.bot.BotRegistry` instead,
5 which provides the same functionality plus persistent storage backends,
6 environment-aware configuration resolution, and TTL-based caching.
8 For simple in-memory usage, use :class:`dataknobs_bots.bot.InMemoryBotRegistry`.
10 Migration example::
12 # Old (deprecated)
13 from dataknobs_bots import BotManager
14 manager = BotManager()
15 bot = await manager.get_or_create("my-bot", config)
17 # New (recommended)
18 from dataknobs_bots.bot import InMemoryBotRegistry
19 registry = InMemoryBotRegistry(validate_on_register=False)
20 await registry.initialize()
21 await registry.register("my-bot", config)
22 bot = await registry.get_bot("my-bot")
23"""
25from __future__ import annotations
27import asyncio
28import inspect
29import logging
30import warnings
31from pathlib import Path
32from typing import TYPE_CHECKING, Any, Callable, Protocol, runtime_checkable
34from .base import DynaBot
36if TYPE_CHECKING:
37 from dataknobs_config import EnvironmentAwareConfig, EnvironmentConfig
39logger = logging.getLogger(__name__)
41_DEPRECATION_MESSAGE = (
42 "BotManager is deprecated and will be removed in a future version. "
43 "Use BotRegistry or InMemoryBotRegistry instead, which provide persistent "
44 "storage backends, environment-aware resolution, and TTL caching. "
45 "See dataknobs_bots.bot.BotRegistry for details."
46)
49@runtime_checkable
50class ConfigLoader(Protocol):
51 """Protocol for configuration loaders with a load method."""
53 def load(self, bot_id: str) -> dict[str, Any]:
54 """Load configuration for a bot."""
55 ...
58@runtime_checkable
59class AsyncConfigLoader(Protocol):
60 """Protocol for async configuration loaders."""
62 async def load(self, bot_id: str) -> dict[str, Any]:
63 """Load configuration for a bot asynchronously."""
64 ...
67ConfigLoaderType = (
68 ConfigLoader
69 | AsyncConfigLoader
70 | Callable[[str], dict[str, Any]]
71 | Callable[[str], Any] # For async callables
72)
75class BotManager:
76 """Manages multiple DynaBot instances for multi-tenancy.
78 .. deprecated::
79 Use :class:`BotRegistry` or :class:`InMemoryBotRegistry` instead.
81 BotManager handles:
82 - Bot instance creation and caching
83 - Client-level isolation
84 - Configuration loading and validation
85 - Bot lifecycle management
86 - Environment-aware resource resolution (optional)
88 Each client/tenant gets its own bot instance, which can serve multiple users.
89 The underlying DynaBot architecture ensures conversation isolation through
90 BotContext with different conversation_ids.
92 Attributes:
93 bots: Cache of bot_id -> DynaBot instances
94 config_loader: Optional configuration loader (sync or async)
95 environment_name: Current environment name (if environment-aware)
97 Example:
98 ```python
99 # Basic usage with inline configuration
100 manager = BotManager()
101 bot = await manager.get_or_create("my-bot", config={
102 "llm": {"provider": "openai", "model": "gpt-4o"},
103 "conversation_storage": {"backend": "memory"},
104 })
106 # With environment-aware configuration
107 manager = BotManager(environment="production")
108 bot = await manager.get_or_create("my-bot", config={
109 "bot": {
110 "llm": {"$resource": "default", "type": "llm_providers"},
111 "conversation_storage": {"$resource": "db", "type": "databases"},
112 }
113 })
115 # With config loader function
116 def load_config(bot_id: str) -> dict:
117 return load_yaml(f"configs/{bot_id}.yaml")
119 manager = BotManager(config_loader=load_config)
120 bot = await manager.get_or_create("my-bot")
122 # List active bots
123 active_bots = manager.list_bots()
124 ```
125 """
127 def __init__(
128 self,
129 config_loader: ConfigLoaderType | None = None,
130 environment: EnvironmentConfig | str | None = None,
131 env_dir: str | Path = "config/environments",
132 ):
133 """Initialize BotManager.
135 Args:
136 config_loader: Optional configuration loader.
137 Can be:
138 - An object with a `.load(bot_id)` method (sync or async)
139 - A callable function: bot_id -> config_dict (sync or async)
140 - None (configurations must be provided explicitly)
141 environment: Environment name or EnvironmentConfig for resource resolution.
142 If None, environment-aware features are disabled unless
143 an EnvironmentAwareConfig is passed to get_or_create().
144 If a string, loads environment config from env_dir.
145 env_dir: Directory containing environment config files.
146 Only used if environment is a string name.
147 """
148 warnings.warn(_DEPRECATION_MESSAGE, DeprecationWarning, stacklevel=2)
150 self._bots: dict[str, DynaBot] = {}
151 self._config_loader = config_loader
152 self._env_dir = Path(env_dir)
154 # Load environment config if specified
155 self._environment: EnvironmentConfig | None = None
156 if environment is not None:
157 try:
158 from dataknobs_config import EnvironmentConfig
160 if isinstance(environment, str):
161 self._environment = EnvironmentConfig.load(environment, env_dir)
162 else:
163 self._environment = environment
164 logger.info(f"Initialized BotManager with environment: {self._environment.name}")
165 except ImportError:
166 logger.warning(
167 "dataknobs_config not installed, environment-aware features disabled"
168 )
169 else:
170 logger.info("Initialized BotManager")
172 @property
173 def environment_name(self) -> str | None:
174 """Get current environment name, or None if not environment-aware."""
175 return self._environment.name if self._environment else None
177 @property
178 def environment(self) -> EnvironmentConfig | None:
179 """Get current environment config, or None if not environment-aware."""
180 return self._environment
182 async def get_or_create(
183 self,
184 bot_id: str,
185 config: dict[str, Any] | EnvironmentAwareConfig | None = None,
186 use_environment: bool | None = None,
187 config_key: str = "bot",
188 ) -> DynaBot:
189 """Get existing bot or create new one.
191 Args:
192 bot_id: Bot identifier (e.g., "customer-support", "sales-assistant")
193 config: Optional bot configuration. Can be:
194 - dict with resolved values (traditional)
195 - dict with $resource references (requires environment)
196 - EnvironmentAwareConfig instance
197 If not provided and config_loader is set, will load configuration.
198 use_environment: Whether to use environment-aware resolution.
199 - True: Use environment for $resource resolution
200 - False: Use config as-is (no resolution)
201 - None (default): Auto-detect based on whether manager has
202 an environment configured or config is EnvironmentAwareConfig
203 config_key: Key within config containing bot configuration.
204 Defaults to "bot". Set to None to use root config.
205 Only used when use_environment is True.
207 Returns:
208 DynaBot instance
210 Raises:
211 ValueError: If config is None and no config_loader is set
213 Example:
214 ```python
215 # Traditional usage (no environment resolution)
216 manager = BotManager()
217 bot = await manager.get_or_create("support-bot", config={
218 "llm": {"provider": "openai", "model": "gpt-4"},
219 "conversation_storage": {"backend": "memory"},
220 })
222 # Environment-aware usage with $resource references
223 manager = BotManager(environment="production")
224 bot = await manager.get_or_create("support-bot", config={
225 "bot": {
226 "llm": {"$resource": "default", "type": "llm_providers"},
227 "conversation_storage": {"$resource": "db", "type": "databases"},
228 }
229 })
231 # Explicit environment resolution control
232 bot = await manager.get_or_create(
233 "support-bot",
234 config=my_config,
235 use_environment=True,
236 config_key="bot"
237 )
238 ```
239 """
240 # Return cached bot if exists
241 if bot_id in self._bots:
242 logger.debug(f"Returning cached bot: {bot_id}")
243 return self._bots[bot_id]
245 # Load configuration if not provided
246 if config is None:
247 if self._config_loader is None:
248 raise ValueError(
249 f"No configuration provided for bot '{bot_id}' "
250 "and no config_loader is set"
251 )
252 config = await self._load_config(bot_id)
254 # Determine whether to use environment resolution
255 is_env_aware_config = False
256 try:
257 from dataknobs_config import EnvironmentAwareConfig
259 is_env_aware_config = isinstance(config, EnvironmentAwareConfig)
260 except ImportError:
261 pass
263 should_use_environment = use_environment
264 if should_use_environment is None:
265 # Auto-detect: use environment if manager has one or config is EnvironmentAwareConfig
266 should_use_environment = self._environment is not None or is_env_aware_config
268 # Create new bot
269 logger.info(f"Creating new bot: {bot_id} (environment_aware={should_use_environment})")
271 if should_use_environment:
272 bot = await DynaBot.from_environment_aware_config(
273 config,
274 environment=self._environment,
275 env_dir=self._env_dir,
276 config_key=config_key,
277 )
278 else:
279 # Traditional path - use config as-is
280 bot = await DynaBot.from_config(config)
282 # Cache and return
283 self._bots[bot_id] = bot
284 return bot
286 async def get(self, bot_id: str) -> DynaBot | None:
287 """Get bot without creating if doesn't exist.
289 Args:
290 bot_id: Bot identifier
292 Returns:
293 DynaBot instance if exists, None otherwise
294 """
295 return self._bots.get(bot_id)
297 async def remove(self, bot_id: str) -> bool:
298 """Remove bot instance.
300 Args:
301 bot_id: Bot identifier
303 Returns:
304 True if bot was removed, False if didn't exist
305 """
306 if bot_id in self._bots:
307 logger.info(f"Removing bot: {bot_id}")
308 del self._bots[bot_id]
309 return True
310 return False
312 async def reload(self, bot_id: str) -> DynaBot:
313 """Reload bot instance with fresh configuration.
315 Args:
316 bot_id: Bot identifier
318 Returns:
319 New DynaBot instance
321 Raises:
322 ValueError: If no config_loader is set
323 """
324 if self._config_loader is None:
325 raise ValueError("Cannot reload without config_loader")
327 # Remove existing bot
328 await self.remove(bot_id)
330 # Create new one
331 return await self.get_or_create(bot_id)
333 def list_bots(self) -> list[str]:
334 """List all active bot IDs.
336 Returns:
337 List of bot identifiers
338 """
339 return list(self._bots.keys())
341 def get_bot_count(self) -> int:
342 """Get count of active bots.
344 Returns:
345 Number of active bot instances
346 """
347 return len(self._bots)
349 async def _load_config(self, bot_id: str) -> dict[str, Any]:
350 """Load configuration for bot using config_loader.
352 Supports both synchronous and asynchronous config loaders.
353 Handles both callable loaders and objects with a load() method.
355 Args:
356 bot_id: Bot identifier
358 Returns:
359 Bot configuration dictionary
360 """
361 logger.debug(f"Loading configuration for bot: {bot_id}")
363 if callable(self._config_loader):
364 # Handle callable config loader (function)
365 if inspect.iscoroutinefunction(self._config_loader):
366 # Async function
367 result = await self._config_loader(bot_id)
368 return dict(result) if isinstance(result, dict) else {}
369 else:
370 # Sync function - run in executor to avoid blocking
371 loop = asyncio.get_event_loop()
372 result = await loop.run_in_executor(None, self._config_loader, bot_id)
373 return dict(result) if isinstance(result, dict) else {}
374 else:
375 # Assume it's an object with a load method
376 load_method = self._config_loader.load # type: ignore
378 if inspect.iscoroutinefunction(load_method):
379 # Async method
380 result = await load_method(bot_id)
381 return dict(result) if isinstance(result, dict) else {}
382 else:
383 # Sync method - run in executor to avoid blocking
384 loop = asyncio.get_event_loop()
385 result = await loop.run_in_executor(None, load_method, bot_id)
386 return dict(result) if isinstance(result, dict) else {}
388 async def clear_all(self) -> None:
389 """Clear all bot instances.
391 Useful for testing or when restarting the service.
392 """
393 logger.info("Clearing all bot instances")
394 self._bots.clear()
396 def get_portable_config(
397 self,
398 config: dict[str, Any] | EnvironmentAwareConfig,
399 ) -> dict[str, Any]:
400 """Get portable configuration for storage.
402 Extracts portable config (with $resource references intact,
403 environment variables unresolved) suitable for storing in
404 registries or databases.
406 Args:
407 config: Configuration to make portable.
408 Can be dict or EnvironmentAwareConfig.
410 Returns:
411 Portable configuration dictionary
413 Example:
414 ```python
415 manager = BotManager(environment="production")
417 # Get portable config from EnvironmentAwareConfig
418 portable = manager.get_portable_config(env_aware_config)
420 # Store in registry (portable across environments)
421 await registry.store(bot_id, portable)
422 ```
423 """
424 return DynaBot.get_portable_config(config)
426 def __repr__(self) -> str:
427 """String representation."""
428 bots = ", ".join(self._bots.keys())
429 env = f", environment={self._environment.name!r}" if self._environment else ""
430 return f"BotManager(bots=[{bots}], count={len(self._bots)}{env})"