Coverage for src / dataknobs_bots / knowledge / rag.py: 69%

284 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-26 16:43 -0700

1"""RAG (Retrieval-Augmented Generation) knowledge base implementation.""" 

2 

3import types 

4from pathlib import Path 

5from typing import Any 

6 

7from dataknobs_xization import ( 

8 ChunkQualityConfig, 

9 ContentTransformer, 

10 HeadingInclusion, 

11 chunk_markdown_tree, 

12 parse_markdown, 

13) 

14from dataknobs_xization.ingestion import ( 

15 DirectoryProcessor, 

16 KnowledgeBaseConfig, 

17 ProcessedDocument, 

18 process_directory, 

19) 

20from dataknobs_bots.knowledge.retrieval import ( 

21 ChunkMerger, 

22 ContextFormatter, 

23 FormatterConfig, 

24 MergerConfig, 

25) 

26 

27 

28class RAGKnowledgeBase: 

29 """RAG knowledge base using dataknobs-xization for chunking and vector search. 

30 

31 This implementation: 

32 - Parses markdown documents using dataknobs-xization 

33 - Chunks documents intelligently based on structure 

34 - Stores chunks with embeddings in vector store 

35 - Provides semantic search for relevant context 

36 

37 Attributes: 

38 vector_store: Vector store backend from dataknobs_data 

39 embedding_provider: LLM provider for generating embeddings 

40 chunking_config: Configuration for document chunking 

41 """ 

42 

43 def __init__( 

44 self, 

45 vector_store: Any, 

46 embedding_provider: Any, 

47 chunking_config: dict[str, Any] | None = None, 

48 merger_config: MergerConfig | None = None, 

49 formatter_config: FormatterConfig | None = None, 

50 ): 

51 """Initialize RAG knowledge base. 

52 

53 Args: 

54 vector_store: Vector store backend instance 

55 embedding_provider: LLM provider with embed() method 

56 chunking_config: Configuration for chunking: 

57 - max_chunk_size: Maximum chunk size in characters 

58 - chunk_overlap: Overlap between chunks 

59 - combine_under_heading: Combine text under same heading 

60 - quality_filter: ChunkQualityConfig for filtering 

61 - generate_embeddings: Whether to generate enriched embedding text 

62 merger_config: Configuration for chunk merging (optional) 

63 formatter_config: Configuration for context formatting (optional) 

64 """ 

65 self.vector_store = vector_store 

66 self.embedding_provider = embedding_provider 

67 self.chunking_config = chunking_config or { 

68 "max_chunk_size": 500, 

69 "chunk_overlap": 50, 

70 "combine_under_heading": True, 

71 } 

72 

73 # Initialize merger and formatter 

74 self.merger = ChunkMerger(merger_config) if merger_config else ChunkMerger() 

75 self.formatter = ContextFormatter(formatter_config) if formatter_config else ContextFormatter() 

76 

77 @classmethod 

78 async def from_config(cls, config: dict[str, Any]) -> "RAGKnowledgeBase": 

79 """Create RAG knowledge base from configuration. 

80 

81 Args: 

82 config: Configuration dictionary with: 

83 - vector_store: Vector store configuration 

84 - embedding_provider: LLM provider name 

85 - embedding_model: Model for embeddings 

86 - chunking: Optional chunking configuration 

87 - documents_path: Optional path to load documents from 

88 - document_pattern: Optional glob pattern for documents 

89 

90 Returns: 

91 Configured RAGKnowledgeBase instance 

92 

93 Example: 

94 ```python 

95 config = { 

96 "vector_store": { 

97 "backend": "faiss", 

98 "dimensions": 1536, 

99 "collection": "docs" 

100 }, 

101 "embedding_provider": "openai", 

102 "embedding_model": "text-embedding-3-small", 

103 "chunking": { 

104 "max_chunk_size": 500, 

105 "chunk_overlap": 50 

106 }, 

107 "documents_path": "./docs" 

108 } 

109 kb = await RAGKnowledgeBase.from_config(config) 

110 ``` 

111 """ 

112 from dataknobs_data.vector.stores import VectorStoreFactory 

113 from dataknobs_llm.llm import LLMProviderFactory 

114 

115 # Create vector store 

116 vs_config = config["vector_store"] 

117 factory = VectorStoreFactory() 

118 vector_store = factory.create(**vs_config) 

119 await vector_store.initialize() 

120 

121 # Create embedding provider 

122 llm_factory = LLMProviderFactory(is_async=True) 

123 embedding_provider = llm_factory.create( 

124 { 

125 "provider": config.get("embedding_provider", "openai"), 

126 "model": config.get("embedding_model", "text-embedding-ada-002"), 

127 } 

128 ) 

129 await embedding_provider.initialize() 

130 

131 # Create merger config if specified 

132 merger_config = None 

133 if "merger" in config: 

134 merger_config = MergerConfig(**config["merger"]) 

135 

136 # Create formatter config if specified 

137 formatter_config = None 

138 if "formatter" in config: 

139 formatter_config = FormatterConfig(**config["formatter"]) 

140 

141 # Create instance 

142 kb = cls( 

143 vector_store=vector_store, 

144 embedding_provider=embedding_provider, 

145 chunking_config=config.get("chunking", {}), 

146 merger_config=merger_config, 

147 formatter_config=formatter_config, 

148 ) 

149 

150 # Load documents if path provided 

151 if "documents_path" in config: 

152 await kb.load_documents_from_directory( 

153 config["documents_path"], config.get("document_pattern", "**/*.md") 

154 ) 

155 

156 return kb 

157 

158 async def load_markdown_document( 

159 self, filepath: str | Path, metadata: dict[str, Any] | None = None 

160 ) -> int: 

161 """Load and chunk a markdown document. 

162 

163 Args: 

164 filepath: Path to markdown file 

165 metadata: Optional metadata to attach to all chunks 

166 

167 Returns: 

168 Number of chunks created 

169 

170 Example: 

171 ```python 

172 num_chunks = await kb.load_markdown_document( 

173 "docs/api.md", 

174 metadata={"category": "api", "version": "1.0"} 

175 ) 

176 ``` 

177 """ 

178 import numpy as np 

179 

180 # Read document 

181 filepath = Path(filepath) 

182 with open(filepath, encoding="utf-8") as f: 

183 markdown_text = f.read() 

184 

185 # Parse markdown 

186 tree = parse_markdown(markdown_text) 

187 

188 # Build quality filter config if specified 

189 quality_filter = None 

190 if "quality_filter" in self.chunking_config: 

191 qf_config = self.chunking_config["quality_filter"] 

192 if isinstance(qf_config, ChunkQualityConfig): 

193 quality_filter = qf_config 

194 elif isinstance(qf_config, dict): 

195 quality_filter = ChunkQualityConfig(**qf_config) 

196 

197 # Chunk the document with enhanced options 

198 chunks = chunk_markdown_tree( 

199 tree, 

200 max_chunk_size=self.chunking_config.get("max_chunk_size", 500), 

201 chunk_overlap=self.chunking_config.get("chunk_overlap", 50), 

202 heading_inclusion=HeadingInclusion.IN_METADATA, # Keep headings in metadata only 

203 combine_under_heading=self.chunking_config.get("combine_under_heading", True), 

204 quality_filter=quality_filter, 

205 generate_embeddings=self.chunking_config.get("generate_embeddings", True), 

206 ) 

207 

208 # Process and store chunks 

209 vectors = [] 

210 ids = [] 

211 metadatas = [] 

212 

213 for i, chunk in enumerate(chunks): 

214 # Use embedding_text if available, otherwise use chunk text 

215 text_for_embedding = chunk.metadata.embedding_text or chunk.text 

216 

217 # Generate embedding 

218 embedding = await self.embedding_provider.embed(text_for_embedding) 

219 

220 # Convert to numpy if needed 

221 if not isinstance(embedding, np.ndarray): 

222 embedding = np.array(embedding, dtype=np.float32) 

223 

224 # Prepare metadata with new fields 

225 chunk_id = f"{filepath.stem}_{i}" 

226 chunk_metadata = { 

227 "text": chunk.text, 

228 "source": str(filepath), 

229 "chunk_index": i, 

230 "heading_path": chunk.metadata.heading_display or chunk.metadata.get_heading_path(), 

231 "headings": chunk.metadata.headings, 

232 "heading_levels": chunk.metadata.heading_levels, 

233 "line_number": chunk.metadata.line_number, 

234 "chunk_size": chunk.metadata.chunk_size, 

235 "content_length": chunk.metadata.content_length, 

236 } 

237 

238 # Merge with user metadata 

239 if metadata: 

240 chunk_metadata.update(metadata) 

241 

242 vectors.append(embedding) 

243 ids.append(chunk_id) 

244 metadatas.append(chunk_metadata) 

245 

246 # Batch insert into vector store 

247 if vectors: 

248 await self.vector_store.add_vectors( 

249 vectors=vectors, ids=ids, metadata=metadatas 

250 ) 

251 

252 return len(chunks) 

253 

254 async def load_documents_from_directory( 

255 self, directory: str | Path, pattern: str = "**/*.md" 

256 ) -> dict[str, Any]: 

257 """Load all markdown documents from a directory. 

258 

259 Args: 

260 directory: Directory path containing documents 

261 pattern: Glob pattern for files to load (default: **/*.md) 

262 

263 Returns: 

264 Dictionary with loading statistics: 

265 - total_files: Number of files processed 

266 - total_chunks: Total chunks created 

267 - errors: List of errors encountered 

268 

269 Example: 

270 ```python 

271 results = await kb.load_documents_from_directory( 

272 "docs/", 

273 pattern="**/*.md" 

274 ) 

275 print(f"Loaded {results['total_chunks']} chunks from {results['total_files']} files") 

276 ``` 

277 """ 

278 directory = Path(directory) 

279 results = {"total_files": 0, "total_chunks": 0, "errors": []} 

280 

281 for filepath in directory.glob(pattern): 

282 if not filepath.is_file(): 

283 continue 

284 

285 try: 

286 num_chunks = await self.load_markdown_document( 

287 filepath, metadata={"filename": filepath.name} 

288 ) 

289 results["total_files"] += 1 

290 results["total_chunks"] += num_chunks 

291 except Exception as e: 

292 results["errors"].append({"file": str(filepath), "error": str(e)}) 

293 

294 return results 

295 

296 async def load_json_document( 

297 self, 

298 filepath: str | Path, 

299 metadata: dict[str, Any] | None = None, 

300 schema: str | None = None, 

301 transformer: ContentTransformer | None = None, 

302 title: str | None = None, 

303 ) -> int: 

304 """Load and chunk a JSON document by converting it to markdown. 

305 

306 This method converts JSON data to markdown format using ContentTransformer, 

307 then processes it like any other markdown document. 

308 

309 Args: 

310 filepath: Path to JSON file 

311 metadata: Optional metadata to attach to all chunks 

312 schema: Optional schema name (requires transformer with registered schema) 

313 transformer: Optional ContentTransformer instance with custom configuration 

314 title: Optional document title for the markdown 

315 

316 Returns: 

317 Number of chunks created 

318 

319 Example: 

320 ```python 

321 # Generic conversion 

322 num_chunks = await kb.load_json_document( 

323 "data/patterns.json", 

324 metadata={"content_type": "patterns"} 

325 ) 

326 

327 # With custom schema 

328 transformer = ContentTransformer() 

329 transformer.register_schema("pattern", { 

330 "title_field": "name", 

331 "sections": [ 

332 {"field": "description", "heading": "Description"}, 

333 {"field": "example", "heading": "Example", "format": "code"} 

334 ] 

335 }) 

336 num_chunks = await kb.load_json_document( 

337 "data/patterns.json", 

338 transformer=transformer, 

339 schema="pattern" 

340 ) 

341 ``` 

342 """ 

343 import json 

344 

345 filepath = Path(filepath) 

346 

347 # Read JSON 

348 with open(filepath, encoding="utf-8") as f: 

349 data = json.load(f) 

350 

351 # Convert to markdown 

352 if transformer is None: 

353 transformer = ContentTransformer() 

354 

355 markdown_text = transformer.transform_json( 

356 data, 

357 schema=schema, 

358 title=title or filepath.stem.replace("_", " ").title(), 

359 ) 

360 

361 return await self._load_markdown_text( 

362 markdown_text, 

363 source=str(filepath), 

364 metadata=metadata, 

365 ) 

366 

367 async def load_yaml_document( 

368 self, 

369 filepath: str | Path, 

370 metadata: dict[str, Any] | None = None, 

371 schema: str | None = None, 

372 transformer: ContentTransformer | None = None, 

373 title: str | None = None, 

374 ) -> int: 

375 """Load and chunk a YAML document by converting it to markdown. 

376 

377 Args: 

378 filepath: Path to YAML file 

379 metadata: Optional metadata to attach to all chunks 

380 schema: Optional schema name (requires transformer with registered schema) 

381 transformer: Optional ContentTransformer instance with custom configuration 

382 title: Optional document title for the markdown 

383 

384 Returns: 

385 Number of chunks created 

386 

387 Example: 

388 ```python 

389 num_chunks = await kb.load_yaml_document( 

390 "data/config.yaml", 

391 metadata={"content_type": "configuration"} 

392 ) 

393 ``` 

394 """ 

395 filepath = Path(filepath) 

396 

397 # Convert to markdown 

398 if transformer is None: 

399 transformer = ContentTransformer() 

400 

401 markdown_text = transformer.transform_yaml( 

402 filepath, 

403 schema=schema, 

404 title=title or filepath.stem.replace("_", " ").title(), 

405 ) 

406 

407 return await self._load_markdown_text( 

408 markdown_text, 

409 source=str(filepath), 

410 metadata=metadata, 

411 ) 

412 

413 async def load_csv_document( 

414 self, 

415 filepath: str | Path, 

416 metadata: dict[str, Any] | None = None, 

417 title: str | None = None, 

418 title_field: str | None = None, 

419 transformer: ContentTransformer | None = None, 

420 ) -> int: 

421 """Load and chunk a CSV document by converting it to markdown. 

422 

423 Each row becomes a section with the first column (or title_field) as heading. 

424 

425 Args: 

426 filepath: Path to CSV file 

427 metadata: Optional metadata to attach to all chunks 

428 title: Optional document title for the markdown 

429 title_field: Column to use as section title (default: first column) 

430 transformer: Optional ContentTransformer instance with custom configuration 

431 

432 Returns: 

433 Number of chunks created 

434 

435 Example: 

436 ```python 

437 num_chunks = await kb.load_csv_document( 

438 "data/faq.csv", 

439 title="Frequently Asked Questions", 

440 title_field="question" 

441 ) 

442 ``` 

443 """ 

444 filepath = Path(filepath) 

445 

446 # Convert to markdown 

447 if transformer is None: 

448 transformer = ContentTransformer() 

449 

450 markdown_text = transformer.transform_csv( 

451 filepath, 

452 title=title or filepath.stem.replace("_", " ").title(), 

453 title_field=title_field, 

454 ) 

455 

456 return await self._load_markdown_text( 

457 markdown_text, 

458 source=str(filepath), 

459 metadata=metadata, 

460 ) 

461 

462 async def load_from_directory( 

463 self, 

464 directory: str | Path, 

465 config: KnowledgeBaseConfig | None = None, 

466 progress_callback: Any | None = None, 

467 ) -> dict[str, Any]: 

468 """Load documents from a directory using KnowledgeBaseConfig. 

469 

470 This method uses the xization DirectoryProcessor to process documents 

471 with configurable patterns, chunking, and metadata. It supports markdown, 

472 JSON, and JSONL files with streaming for large files. 

473 

474 Args: 

475 directory: Directory path containing documents 

476 config: Optional KnowledgeBaseConfig. If not provided, attempts to load 

477 from knowledge_base.json/yaml in the directory, or uses defaults. 

478 progress_callback: Optional callback function(file_path, num_chunks) for progress 

479 

480 Returns: 

481 Dictionary with loading statistics: 

482 - total_files: Number of files processed 

483 - total_chunks: Total chunks created 

484 - files_by_type: Count of files by type (markdown, json, jsonl) 

485 - errors: List of errors encountered 

486 - documents: List of processed document info 

487 

488 Example: 

489 ```python 

490 # With auto-loaded config from directory 

491 results = await kb.load_from_directory("./docs") 

492 

493 # With explicit config 

494 config = KnowledgeBaseConfig( 

495 name="product-docs", 

496 default_chunking={"max_chunk_size": 800}, 

497 patterns=[ 

498 FilePatternConfig(pattern="api/**/*.json", text_fields=["title", "description"]), 

499 FilePatternConfig(pattern="**/*.md"), 

500 ], 

501 exclude_patterns=["**/drafts/**"], 

502 ) 

503 results = await kb.load_from_directory("./docs", config=config) 

504 print(f"Loaded {results['total_chunks']} chunks from {results['total_files']} files") 

505 ``` 

506 """ 

507 import numpy as np 

508 

509 directory = Path(directory) 

510 

511 # Load or use provided config 

512 if config is None: 

513 config = KnowledgeBaseConfig.load(directory) 

514 

515 # Create processor 

516 processor = DirectoryProcessor(config, directory) 

517 

518 # Track results 

519 results: dict[str, Any] = { 

520 "total_files": 0, 

521 "total_chunks": 0, 

522 "files_by_type": {"markdown": 0, "json": 0, "jsonl": 0}, 

523 "errors": [], 

524 "documents": [], 

525 } 

526 

527 # Process each document 

528 for doc in processor.process(): 

529 doc_info: dict[str, Any] = { 

530 "source": doc.source_file, 

531 "type": doc.document_type, 

532 "chunks": 0, 

533 "errors": doc.errors, 

534 } 

535 

536 if doc.has_errors: 

537 results["errors"].extend([ 

538 {"file": doc.source_file, "error": err} 

539 for err in doc.errors 

540 ]) 

541 results["documents"].append(doc_info) 

542 continue 

543 

544 # Process chunks for this document 

545 vectors = [] 

546 ids = [] 

547 metadatas = [] 

548 

549 source_stem = Path(doc.source_file).stem 

550 

551 for chunk in doc.chunks: 

552 # Get text for embedding 

553 text_for_embedding = chunk.get("embedding_text") or chunk.get("text", "") 

554 

555 if not text_for_embedding: 

556 continue 

557 

558 # Generate embedding 

559 embedding = await self.embedding_provider.embed(text_for_embedding) 

560 

561 # Convert to numpy if needed 

562 if not isinstance(embedding, np.ndarray): 

563 embedding = np.array(embedding, dtype=np.float32) 

564 

565 # Build chunk ID 

566 chunk_index = chunk.get("chunk_index", len(vectors)) 

567 chunk_id = f"{source_stem}_{chunk_index}" 

568 

569 # Build metadata 

570 chunk_metadata = { 

571 "text": chunk.get("text", ""), 

572 "source": doc.source_file, 

573 "chunk_index": chunk_index, 

574 "document_type": doc.document_type, 

575 } 

576 

577 # Add chunk-specific metadata 

578 if "metadata" in chunk: 

579 chunk_metadata.update(chunk["metadata"]) 

580 

581 # Add document-level metadata 

582 if doc.metadata: 

583 for key, value in doc.metadata.items(): 

584 if key not in chunk_metadata: 

585 chunk_metadata[key] = value 

586 

587 vectors.append(embedding) 

588 ids.append(chunk_id) 

589 metadatas.append(chunk_metadata) 

590 

591 # Batch insert into vector store 

592 if vectors: 

593 await self.vector_store.add_vectors( 

594 vectors=vectors, ids=ids, metadata=metadatas 

595 ) 

596 

597 doc_info["chunks"] = len(vectors) 

598 results["total_files"] += 1 

599 results["total_chunks"] += len(vectors) 

600 results["files_by_type"][doc.document_type] += 1 

601 results["documents"].append(doc_info) 

602 

603 # Call progress callback if provided 

604 if progress_callback: 

605 progress_callback(doc.source_file, len(vectors)) 

606 

607 return results 

608 

609 async def _load_markdown_text( 

610 self, 

611 markdown_text: str, 

612 source: str, 

613 metadata: dict[str, Any] | None = None, 

614 ) -> int: 

615 """Internal method to load markdown text directly. 

616 

617 Used by load_json_document, load_yaml_document, and load_csv_document. 

618 

619 Args: 

620 markdown_text: Markdown content to load 

621 source: Source identifier for metadata 

622 metadata: Optional metadata to attach to all chunks 

623 

624 Returns: 

625 Number of chunks created 

626 """ 

627 import numpy as np 

628 

629 # Parse markdown 

630 tree = parse_markdown(markdown_text) 

631 

632 # Build quality filter config if specified 

633 quality_filter = None 

634 if "quality_filter" in self.chunking_config: 

635 qf_config = self.chunking_config["quality_filter"] 

636 if isinstance(qf_config, ChunkQualityConfig): 

637 quality_filter = qf_config 

638 elif isinstance(qf_config, dict): 

639 quality_filter = ChunkQualityConfig(**qf_config) 

640 

641 # Chunk the document with enhanced options 

642 chunks = chunk_markdown_tree( 

643 tree, 

644 max_chunk_size=self.chunking_config.get("max_chunk_size", 500), 

645 chunk_overlap=self.chunking_config.get("chunk_overlap", 50), 

646 heading_inclusion=HeadingInclusion.IN_METADATA, 

647 combine_under_heading=self.chunking_config.get("combine_under_heading", True), 

648 quality_filter=quality_filter, 

649 generate_embeddings=self.chunking_config.get("generate_embeddings", True), 

650 ) 

651 

652 # Process and store chunks 

653 vectors = [] 

654 ids = [] 

655 metadatas = [] 

656 

657 # Generate a base ID from source 

658 source_stem = Path(source).stem if source else "doc" 

659 

660 for i, chunk in enumerate(chunks): 

661 # Use embedding_text if available, otherwise use chunk text 

662 text_for_embedding = chunk.metadata.embedding_text or chunk.text 

663 

664 # Generate embedding 

665 embedding = await self.embedding_provider.embed(text_for_embedding) 

666 

667 # Convert to numpy if needed 

668 if not isinstance(embedding, np.ndarray): 

669 embedding = np.array(embedding, dtype=np.float32) 

670 

671 # Prepare metadata with new fields 

672 chunk_id = f"{source_stem}_{i}" 

673 chunk_metadata = { 

674 "text": chunk.text, 

675 "source": source, 

676 "chunk_index": i, 

677 "heading_path": chunk.metadata.heading_display or chunk.metadata.get_heading_path(), 

678 "headings": chunk.metadata.headings, 

679 "heading_levels": chunk.metadata.heading_levels, 

680 "line_number": chunk.metadata.line_number, 

681 "chunk_size": chunk.metadata.chunk_size, 

682 "content_length": chunk.metadata.content_length, 

683 } 

684 

685 # Merge with user metadata 

686 if metadata: 

687 chunk_metadata.update(metadata) 

688 

689 vectors.append(embedding) 

690 ids.append(chunk_id) 

691 metadatas.append(chunk_metadata) 

692 

693 # Batch insert into vector store 

694 if vectors: 

695 await self.vector_store.add_vectors( 

696 vectors=vectors, ids=ids, metadata=metadatas 

697 ) 

698 

699 return len(chunks) 

700 

701 async def query( 

702 self, 

703 query: str, 

704 k: int = 5, 

705 filter_metadata: dict[str, Any] | None = None, 

706 min_similarity: float = 0.0, 

707 merge_adjacent: bool = False, 

708 max_chunk_size: int | None = None, 

709 ) -> list[dict[str, Any]]: 

710 """Query knowledge base for relevant chunks. 

711 

712 Args: 

713 query: Query text to search for 

714 k: Number of results to return 

715 filter_metadata: Optional metadata filters 

716 min_similarity: Minimum similarity score (0-1) 

717 merge_adjacent: Whether to merge adjacent chunks with same heading 

718 max_chunk_size: Maximum size for merged chunks (uses merger config default if not specified) 

719 

720 Returns: 

721 List of result dictionaries with: 

722 - text: Chunk text 

723 - source: Source file 

724 - heading_path: Heading hierarchy 

725 - similarity: Similarity score 

726 - metadata: Full chunk metadata 

727 

728 Example: 

729 ```python 

730 results = await kb.query( 

731 "How do I configure the database?", 

732 k=3, 

733 merge_adjacent=True 

734 ) 

735 for result in results: 

736 print(f"[{result['similarity']:.2f}] {result['heading_path']}") 

737 print(result['text']) 

738 ``` 

739 """ 

740 import numpy as np 

741 

742 # Generate query embedding 

743 query_embedding = await self.embedding_provider.embed(query) 

744 

745 # Convert to numpy if needed 

746 if not isinstance(query_embedding, np.ndarray): 

747 query_embedding = np.array(query_embedding, dtype=np.float32) 

748 

749 # Search vector store 

750 search_results = await self.vector_store.search( 

751 query_vector=query_embedding, 

752 k=k, 

753 filter=filter_metadata, 

754 include_metadata=True, 

755 ) 

756 

757 # Format results 

758 results = [] 

759 for _chunk_id, similarity, chunk_metadata in search_results: 

760 if chunk_metadata and similarity >= min_similarity: 

761 results.append( 

762 { 

763 "text": chunk_metadata.get("text", ""), 

764 "source": chunk_metadata.get("source", ""), 

765 "heading_path": chunk_metadata.get("heading_path", ""), 

766 "similarity": similarity, 

767 "metadata": chunk_metadata, 

768 } 

769 ) 

770 

771 # Apply chunk merging if requested 

772 if merge_adjacent and results: 

773 # Update merger config if max_chunk_size specified 

774 if max_chunk_size is not None: 

775 merger = ChunkMerger(MergerConfig(max_merged_size=max_chunk_size)) 

776 else: 

777 merger = self.merger 

778 

779 merged_chunks = merger.merge(results) 

780 results = merger.to_result_list(merged_chunks) 

781 

782 return results 

783 

784 async def hybrid_query( 

785 self, 

786 query: str, 

787 k: int = 5, 

788 text_weight: float = 0.5, 

789 vector_weight: float = 0.5, 

790 fusion_strategy: str = "rrf", 

791 text_fields: list[str] | None = None, 

792 filter_metadata: dict[str, Any] | None = None, 

793 min_similarity: float = 0.0, 

794 merge_adjacent: bool = False, 

795 max_chunk_size: int | None = None, 

796 ) -> list[dict[str, Any]]: 

797 """Query knowledge base using hybrid search (text + vector). 

798 

799 Combines keyword matching with semantic vector search for improved 

800 retrieval quality. Uses Reciprocal Rank Fusion (RRF) or weighted 

801 score fusion to combine results. 

802 

803 Args: 

804 query: Query text to search for 

805 k: Number of results to return 

806 text_weight: Weight for text search (0.0 to 1.0) 

807 vector_weight: Weight for vector search (0.0 to 1.0) 

808 fusion_strategy: Fusion method - "rrf" (default), "weighted_sum", or "native" 

809 text_fields: Fields to search for text matching (default: ["text"]) 

810 filter_metadata: Optional metadata filters 

811 min_similarity: Minimum combined score (0-1) 

812 merge_adjacent: Whether to merge adjacent chunks with same heading 

813 max_chunk_size: Maximum size for merged chunks 

814 

815 Returns: 

816 List of result dictionaries with: 

817 - text: Chunk text 

818 - source: Source file 

819 - heading_path: Heading hierarchy 

820 - similarity: Combined similarity score 

821 - text_score: Score from text search (if available) 

822 - vector_score: Score from vector search (if available) 

823 - metadata: Full chunk metadata 

824 

825 Example: 

826 ```python 

827 # Default RRF fusion 

828 results = await kb.hybrid_query( 

829 "How do I configure the database?", 

830 k=5, 

831 ) 

832 

833 # Weighted toward vector search 

834 results = await kb.hybrid_query( 

835 "database configuration", 

836 k=5, 

837 text_weight=0.3, 

838 vector_weight=0.7, 

839 ) 

840 

841 # Weighted sum fusion 

842 results = await kb.hybrid_query( 

843 "configure database", 

844 k=5, 

845 fusion_strategy="weighted_sum", 

846 ) 

847 

848 for result in results: 

849 print(f"[{result['similarity']:.2f}] {result['heading_path']}") 

850 print(f" text_score={result.get('text_score', 'N/A')}") 

851 print(f" vector_score={result.get('vector_score', 'N/A')}") 

852 print(result['text']) 

853 ``` 

854 """ 

855 from dataknobs_data.vector.hybrid import ( 

856 FusionStrategy, 

857 HybridSearchConfig, 

858 reciprocal_rank_fusion, 

859 weighted_score_fusion, 

860 ) 

861 import numpy as np 

862 

863 # Generate query embedding 

864 query_embedding = await self.embedding_provider.embed(query) 

865 

866 # Convert to numpy if needed 

867 if not isinstance(query_embedding, np.ndarray): 

868 query_embedding = np.array(query_embedding, dtype=np.float32) 

869 

870 # Check if vector store supports hybrid search natively 

871 has_hybrid = hasattr(self.vector_store, "hybrid_search") 

872 

873 # Default text fields for knowledge base chunks 

874 search_text_fields = text_fields or ["text"] 

875 

876 # Map string to FusionStrategy enum 

877 strategy_map = { 

878 "rrf": FusionStrategy.RRF, 

879 "weighted_sum": FusionStrategy.WEIGHTED_SUM, 

880 "native": FusionStrategy.NATIVE, 

881 } 

882 strategy = strategy_map.get(fusion_strategy.lower(), FusionStrategy.RRF) 

883 

884 if has_hybrid and strategy == FusionStrategy.NATIVE: 

885 # Use vector store's native hybrid search 

886 config = HybridSearchConfig( 

887 text_weight=text_weight, 

888 vector_weight=vector_weight, 

889 fusion_strategy=strategy, 

890 text_fields=search_text_fields, 

891 ) 

892 hybrid_results = await self.vector_store.hybrid_search( 

893 query_text=query, 

894 query_vector=query_embedding, 

895 text_fields=search_text_fields, 

896 k=k, 

897 config=config, 

898 filter=filter_metadata, 

899 ) 

900 

901 # Convert HybridSearchResult to our result format 

902 results = [] 

903 for hr in hybrid_results: 

904 if hr.combined_score >= min_similarity: 

905 # Extract metadata from record 

906 record_metadata = {} 

907 if hasattr(hr.record, "data"): 

908 record_metadata = hr.record.data or {} 

909 elif hasattr(hr.record, "metadata"): 

910 record_metadata = hr.record.metadata or {} 

911 

912 results.append({ 

913 "text": record_metadata.get("text", ""), 

914 "source": record_metadata.get("source", ""), 

915 "heading_path": record_metadata.get("heading_path", ""), 

916 "similarity": hr.combined_score, 

917 "text_score": hr.text_score, 

918 "vector_score": hr.vector_score, 

919 "metadata": record_metadata, 

920 }) 

921 else: 

922 # Client-side hybrid search implementation 

923 # Step 1: Vector search 

924 vector_results = await self.vector_store.search( 

925 query_vector=query_embedding, 

926 k=k * 2, # Get more for fusion 

927 filter=filter_metadata, 

928 include_metadata=True, 

929 ) 

930 

931 # Step 2: Text search (simple keyword matching on stored chunks) 

932 # For vector stores without text search, we search in retrieved chunks 

933 # and also do a broader metadata-based text match if supported 

934 

935 # Build vector result map 

936 vector_scores: list[tuple[str, float]] = [] 

937 chunks_by_id: dict[str, dict[str, Any]] = {} 

938 

939 for chunk_id, similarity, chunk_metadata in vector_results: 

940 if chunk_metadata: 

941 vector_scores.append((chunk_id, similarity)) 

942 chunks_by_id[chunk_id] = chunk_metadata 

943 

944 # Simple text matching on chunk content 

945 query_lower = query.lower() 

946 query_terms = query_lower.split() 

947 text_scores: list[tuple[str, float]] = [] 

948 

949 for chunk_id, chunk_metadata in chunks_by_id.items(): 

950 text_content = "" 

951 for field in search_text_fields: 

952 value = chunk_metadata.get(field, "") 

953 if value: 

954 text_content += " " + str(value) 

955 

956 text_content_lower = text_content.lower() 

957 

958 # Calculate text match score 

959 if query_lower in text_content_lower: 

960 # Exact phrase match 

961 score = 1.0 

962 else: 

963 # Term overlap score 

964 matched_terms = sum(1 for term in query_terms if term in text_content_lower) 

965 score = matched_terms / len(query_terms) if query_terms else 0.0 

966 

967 if score > 0: 

968 text_scores.append((chunk_id, score)) 

969 

970 # Sort text scores descending 

971 text_scores.sort(key=lambda x: x[1], reverse=True) 

972 

973 # Step 3: Fuse results 

974 if strategy == FusionStrategy.WEIGHTED_SUM: 

975 total = text_weight + vector_weight 

976 if total > 0: 

977 norm_text = text_weight / total 

978 norm_vector = vector_weight / total 

979 else: 

980 norm_text = norm_vector = 0.5 

981 

982 fused = weighted_score_fusion( 

983 text_results=text_scores, 

984 vector_results=vector_scores, 

985 text_weight=norm_text, 

986 vector_weight=norm_vector, 

987 normalize_scores=True, 

988 ) 

989 else: 

990 # Default to RRF 

991 fused = reciprocal_rank_fusion( 

992 text_results=text_scores, 

993 vector_results=vector_scores, 

994 k=60, 

995 text_weight=text_weight, 

996 vector_weight=vector_weight, 

997 ) 

998 

999 # Build result list 

1000 text_score_map = dict(text_scores) 

1001 vector_score_map = dict(vector_scores) 

1002 

1003 results = [] 

1004 for chunk_id, combined_score in fused[:k]: 

1005 if combined_score < min_similarity: 

1006 continue 

1007 

1008 chunk_metadata = chunks_by_id.get(chunk_id) 

1009 if not chunk_metadata: 

1010 continue 

1011 

1012 results.append({ 

1013 "text": chunk_metadata.get("text", ""), 

1014 "source": chunk_metadata.get("source", ""), 

1015 "heading_path": chunk_metadata.get("heading_path", ""), 

1016 "similarity": combined_score, 

1017 "text_score": text_score_map.get(chunk_id), 

1018 "vector_score": vector_score_map.get(chunk_id), 

1019 "metadata": chunk_metadata, 

1020 }) 

1021 

1022 # Apply chunk merging if requested 

1023 if merge_adjacent and results: 

1024 if max_chunk_size is not None: 

1025 merger = ChunkMerger(MergerConfig(max_merged_size=max_chunk_size)) 

1026 else: 

1027 merger = self.merger 

1028 

1029 merged_chunks = merger.merge(results) 

1030 results = merger.to_result_list(merged_chunks) 

1031 

1032 return results 

1033 

1034 def format_context( 

1035 self, 

1036 results: list[dict[str, Any]], 

1037 wrap_in_tags: bool = True, 

1038 ) -> str: 

1039 """Format search results for LLM context. 

1040 

1041 Convenience method to format results using the configured formatter. 

1042 

1043 Args: 

1044 results: Search results from query() 

1045 wrap_in_tags: Whether to wrap in <knowledge_base> tags 

1046 

1047 Returns: 

1048 Formatted context string 

1049 """ 

1050 context = self.formatter.format(results) 

1051 if wrap_in_tags: 

1052 context = self.formatter.wrap_for_prompt(context) 

1053 return context 

1054 

1055 async def clear(self) -> None: 

1056 """Clear all documents from the knowledge base. 

1057 

1058 Warning: This removes all stored chunks and embeddings. 

1059 """ 

1060 if hasattr(self.vector_store, "clear"): 

1061 await self.vector_store.clear() 

1062 else: 

1063 raise NotImplementedError( 

1064 "Vector store does not support clearing. " 

1065 "Consider creating a new knowledge base with a fresh collection." 

1066 ) 

1067 

1068 async def save(self) -> None: 

1069 """Save the knowledge base to persistent storage. 

1070 

1071 This persists the vector store index and metadata to disk. 

1072 Only applicable for vector stores that support persistence (e.g., FAISS). 

1073 

1074 Example: 

1075 ```python 

1076 await kb.load_markdown_document("docs/api.md") 

1077 await kb.save() # Persist to disk 

1078 ``` 

1079 """ 

1080 if hasattr(self.vector_store, "save"): 

1081 await self.vector_store.save() 

1082 

1083 async def close(self) -> None: 

1084 """Close the knowledge base and release resources. 

1085 

1086 This method: 

1087 - Saves the vector store to disk (if persistence is configured) 

1088 - Closes the vector store connection 

1089 - Closes the embedding provider (releases HTTP sessions) 

1090 

1091 Should be called when done using the knowledge base to prevent 

1092 resource leaks (e.g., unclosed aiohttp sessions). 

1093 

1094 Example: 

1095 ```python 

1096 kb = await RAGKnowledgeBase.from_config(config) 

1097 try: 

1098 await kb.load_markdown_document("docs/api.md") 

1099 results = await kb.query("How do I configure?") 

1100 finally: 

1101 await kb.close() 

1102 ``` 

1103 """ 

1104 # Close vector store (will save if persist_path is set) 

1105 if hasattr(self.vector_store, "close"): 

1106 await self.vector_store.close() 

1107 

1108 # Close embedding provider (releases HTTP client sessions) 

1109 if hasattr(self.embedding_provider, "close"): 

1110 await self.embedding_provider.close() 

1111 

1112 async def __aenter__(self) -> "RAGKnowledgeBase": 

1113 """Async context manager entry. 

1114 

1115 Returns: 

1116 Self for use in async with statement 

1117 

1118 Example: 

1119 ```python 

1120 async with await RAGKnowledgeBase.from_config(config) as kb: 

1121 await kb.load_markdown_document("docs/api.md") 

1122 results = await kb.query("How do I configure?") 

1123 # Automatically saved and closed 

1124 ``` 

1125 """ 

1126 return self 

1127 

1128 async def __aexit__( 

1129 self, 

1130 exc_type: type[BaseException] | None, 

1131 exc_val: BaseException | None, 

1132 exc_tb: types.TracebackType | None, 

1133 ) -> None: 

1134 """Async context manager exit - ensures cleanup. 

1135 

1136 Args: 

1137 exc_type: Exception type if an exception occurred 

1138 exc_val: Exception value if an exception occurred 

1139 exc_tb: Exception traceback if an exception occurred 

1140 """ 

1141 await self.close()