Coverage for src / dataknobs_bots / knowledge / rag.py: 69%
284 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 16:43 -0700
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-26 16:43 -0700
1"""RAG (Retrieval-Augmented Generation) knowledge base implementation."""
3import types
4from pathlib import Path
5from typing import Any
7from dataknobs_xization import (
8 ChunkQualityConfig,
9 ContentTransformer,
10 HeadingInclusion,
11 chunk_markdown_tree,
12 parse_markdown,
13)
14from dataknobs_xization.ingestion import (
15 DirectoryProcessor,
16 KnowledgeBaseConfig,
17 ProcessedDocument,
18 process_directory,
19)
20from dataknobs_bots.knowledge.retrieval import (
21 ChunkMerger,
22 ContextFormatter,
23 FormatterConfig,
24 MergerConfig,
25)
28class RAGKnowledgeBase:
29 """RAG knowledge base using dataknobs-xization for chunking and vector search.
31 This implementation:
32 - Parses markdown documents using dataknobs-xization
33 - Chunks documents intelligently based on structure
34 - Stores chunks with embeddings in vector store
35 - Provides semantic search for relevant context
37 Attributes:
38 vector_store: Vector store backend from dataknobs_data
39 embedding_provider: LLM provider for generating embeddings
40 chunking_config: Configuration for document chunking
41 """
43 def __init__(
44 self,
45 vector_store: Any,
46 embedding_provider: Any,
47 chunking_config: dict[str, Any] | None = None,
48 merger_config: MergerConfig | None = None,
49 formatter_config: FormatterConfig | None = None,
50 ):
51 """Initialize RAG knowledge base.
53 Args:
54 vector_store: Vector store backend instance
55 embedding_provider: LLM provider with embed() method
56 chunking_config: Configuration for chunking:
57 - max_chunk_size: Maximum chunk size in characters
58 - chunk_overlap: Overlap between chunks
59 - combine_under_heading: Combine text under same heading
60 - quality_filter: ChunkQualityConfig for filtering
61 - generate_embeddings: Whether to generate enriched embedding text
62 merger_config: Configuration for chunk merging (optional)
63 formatter_config: Configuration for context formatting (optional)
64 """
65 self.vector_store = vector_store
66 self.embedding_provider = embedding_provider
67 self.chunking_config = chunking_config or {
68 "max_chunk_size": 500,
69 "chunk_overlap": 50,
70 "combine_under_heading": True,
71 }
73 # Initialize merger and formatter
74 self.merger = ChunkMerger(merger_config) if merger_config else ChunkMerger()
75 self.formatter = ContextFormatter(formatter_config) if formatter_config else ContextFormatter()
77 @classmethod
78 async def from_config(cls, config: dict[str, Any]) -> "RAGKnowledgeBase":
79 """Create RAG knowledge base from configuration.
81 Args:
82 config: Configuration dictionary with:
83 - vector_store: Vector store configuration
84 - embedding_provider: LLM provider name
85 - embedding_model: Model for embeddings
86 - chunking: Optional chunking configuration
87 - documents_path: Optional path to load documents from
88 - document_pattern: Optional glob pattern for documents
90 Returns:
91 Configured RAGKnowledgeBase instance
93 Example:
94 ```python
95 config = {
96 "vector_store": {
97 "backend": "faiss",
98 "dimensions": 1536,
99 "collection": "docs"
100 },
101 "embedding_provider": "openai",
102 "embedding_model": "text-embedding-3-small",
103 "chunking": {
104 "max_chunk_size": 500,
105 "chunk_overlap": 50
106 },
107 "documents_path": "./docs"
108 }
109 kb = await RAGKnowledgeBase.from_config(config)
110 ```
111 """
112 from dataknobs_data.vector.stores import VectorStoreFactory
113 from dataknobs_llm.llm import LLMProviderFactory
115 # Create vector store
116 vs_config = config["vector_store"]
117 factory = VectorStoreFactory()
118 vector_store = factory.create(**vs_config)
119 await vector_store.initialize()
121 # Create embedding provider
122 llm_factory = LLMProviderFactory(is_async=True)
123 embedding_provider = llm_factory.create(
124 {
125 "provider": config.get("embedding_provider", "openai"),
126 "model": config.get("embedding_model", "text-embedding-ada-002"),
127 }
128 )
129 await embedding_provider.initialize()
131 # Create merger config if specified
132 merger_config = None
133 if "merger" in config:
134 merger_config = MergerConfig(**config["merger"])
136 # Create formatter config if specified
137 formatter_config = None
138 if "formatter" in config:
139 formatter_config = FormatterConfig(**config["formatter"])
141 # Create instance
142 kb = cls(
143 vector_store=vector_store,
144 embedding_provider=embedding_provider,
145 chunking_config=config.get("chunking", {}),
146 merger_config=merger_config,
147 formatter_config=formatter_config,
148 )
150 # Load documents if path provided
151 if "documents_path" in config:
152 await kb.load_documents_from_directory(
153 config["documents_path"], config.get("document_pattern", "**/*.md")
154 )
156 return kb
158 async def load_markdown_document(
159 self, filepath: str | Path, metadata: dict[str, Any] | None = None
160 ) -> int:
161 """Load and chunk a markdown document.
163 Args:
164 filepath: Path to markdown file
165 metadata: Optional metadata to attach to all chunks
167 Returns:
168 Number of chunks created
170 Example:
171 ```python
172 num_chunks = await kb.load_markdown_document(
173 "docs/api.md",
174 metadata={"category": "api", "version": "1.0"}
175 )
176 ```
177 """
178 import numpy as np
180 # Read document
181 filepath = Path(filepath)
182 with open(filepath, encoding="utf-8") as f:
183 markdown_text = f.read()
185 # Parse markdown
186 tree = parse_markdown(markdown_text)
188 # Build quality filter config if specified
189 quality_filter = None
190 if "quality_filter" in self.chunking_config:
191 qf_config = self.chunking_config["quality_filter"]
192 if isinstance(qf_config, ChunkQualityConfig):
193 quality_filter = qf_config
194 elif isinstance(qf_config, dict):
195 quality_filter = ChunkQualityConfig(**qf_config)
197 # Chunk the document with enhanced options
198 chunks = chunk_markdown_tree(
199 tree,
200 max_chunk_size=self.chunking_config.get("max_chunk_size", 500),
201 chunk_overlap=self.chunking_config.get("chunk_overlap", 50),
202 heading_inclusion=HeadingInclusion.IN_METADATA, # Keep headings in metadata only
203 combine_under_heading=self.chunking_config.get("combine_under_heading", True),
204 quality_filter=quality_filter,
205 generate_embeddings=self.chunking_config.get("generate_embeddings", True),
206 )
208 # Process and store chunks
209 vectors = []
210 ids = []
211 metadatas = []
213 for i, chunk in enumerate(chunks):
214 # Use embedding_text if available, otherwise use chunk text
215 text_for_embedding = chunk.metadata.embedding_text or chunk.text
217 # Generate embedding
218 embedding = await self.embedding_provider.embed(text_for_embedding)
220 # Convert to numpy if needed
221 if not isinstance(embedding, np.ndarray):
222 embedding = np.array(embedding, dtype=np.float32)
224 # Prepare metadata with new fields
225 chunk_id = f"{filepath.stem}_{i}"
226 chunk_metadata = {
227 "text": chunk.text,
228 "source": str(filepath),
229 "chunk_index": i,
230 "heading_path": chunk.metadata.heading_display or chunk.metadata.get_heading_path(),
231 "headings": chunk.metadata.headings,
232 "heading_levels": chunk.metadata.heading_levels,
233 "line_number": chunk.metadata.line_number,
234 "chunk_size": chunk.metadata.chunk_size,
235 "content_length": chunk.metadata.content_length,
236 }
238 # Merge with user metadata
239 if metadata:
240 chunk_metadata.update(metadata)
242 vectors.append(embedding)
243 ids.append(chunk_id)
244 metadatas.append(chunk_metadata)
246 # Batch insert into vector store
247 if vectors:
248 await self.vector_store.add_vectors(
249 vectors=vectors, ids=ids, metadata=metadatas
250 )
252 return len(chunks)
254 async def load_documents_from_directory(
255 self, directory: str | Path, pattern: str = "**/*.md"
256 ) -> dict[str, Any]:
257 """Load all markdown documents from a directory.
259 Args:
260 directory: Directory path containing documents
261 pattern: Glob pattern for files to load (default: **/*.md)
263 Returns:
264 Dictionary with loading statistics:
265 - total_files: Number of files processed
266 - total_chunks: Total chunks created
267 - errors: List of errors encountered
269 Example:
270 ```python
271 results = await kb.load_documents_from_directory(
272 "docs/",
273 pattern="**/*.md"
274 )
275 print(f"Loaded {results['total_chunks']} chunks from {results['total_files']} files")
276 ```
277 """
278 directory = Path(directory)
279 results = {"total_files": 0, "total_chunks": 0, "errors": []}
281 for filepath in directory.glob(pattern):
282 if not filepath.is_file():
283 continue
285 try:
286 num_chunks = await self.load_markdown_document(
287 filepath, metadata={"filename": filepath.name}
288 )
289 results["total_files"] += 1
290 results["total_chunks"] += num_chunks
291 except Exception as e:
292 results["errors"].append({"file": str(filepath), "error": str(e)})
294 return results
296 async def load_json_document(
297 self,
298 filepath: str | Path,
299 metadata: dict[str, Any] | None = None,
300 schema: str | None = None,
301 transformer: ContentTransformer | None = None,
302 title: str | None = None,
303 ) -> int:
304 """Load and chunk a JSON document by converting it to markdown.
306 This method converts JSON data to markdown format using ContentTransformer,
307 then processes it like any other markdown document.
309 Args:
310 filepath: Path to JSON file
311 metadata: Optional metadata to attach to all chunks
312 schema: Optional schema name (requires transformer with registered schema)
313 transformer: Optional ContentTransformer instance with custom configuration
314 title: Optional document title for the markdown
316 Returns:
317 Number of chunks created
319 Example:
320 ```python
321 # Generic conversion
322 num_chunks = await kb.load_json_document(
323 "data/patterns.json",
324 metadata={"content_type": "patterns"}
325 )
327 # With custom schema
328 transformer = ContentTransformer()
329 transformer.register_schema("pattern", {
330 "title_field": "name",
331 "sections": [
332 {"field": "description", "heading": "Description"},
333 {"field": "example", "heading": "Example", "format": "code"}
334 ]
335 })
336 num_chunks = await kb.load_json_document(
337 "data/patterns.json",
338 transformer=transformer,
339 schema="pattern"
340 )
341 ```
342 """
343 import json
345 filepath = Path(filepath)
347 # Read JSON
348 with open(filepath, encoding="utf-8") as f:
349 data = json.load(f)
351 # Convert to markdown
352 if transformer is None:
353 transformer = ContentTransformer()
355 markdown_text = transformer.transform_json(
356 data,
357 schema=schema,
358 title=title or filepath.stem.replace("_", " ").title(),
359 )
361 return await self._load_markdown_text(
362 markdown_text,
363 source=str(filepath),
364 metadata=metadata,
365 )
367 async def load_yaml_document(
368 self,
369 filepath: str | Path,
370 metadata: dict[str, Any] | None = None,
371 schema: str | None = None,
372 transformer: ContentTransformer | None = None,
373 title: str | None = None,
374 ) -> int:
375 """Load and chunk a YAML document by converting it to markdown.
377 Args:
378 filepath: Path to YAML file
379 metadata: Optional metadata to attach to all chunks
380 schema: Optional schema name (requires transformer with registered schema)
381 transformer: Optional ContentTransformer instance with custom configuration
382 title: Optional document title for the markdown
384 Returns:
385 Number of chunks created
387 Example:
388 ```python
389 num_chunks = await kb.load_yaml_document(
390 "data/config.yaml",
391 metadata={"content_type": "configuration"}
392 )
393 ```
394 """
395 filepath = Path(filepath)
397 # Convert to markdown
398 if transformer is None:
399 transformer = ContentTransformer()
401 markdown_text = transformer.transform_yaml(
402 filepath,
403 schema=schema,
404 title=title or filepath.stem.replace("_", " ").title(),
405 )
407 return await self._load_markdown_text(
408 markdown_text,
409 source=str(filepath),
410 metadata=metadata,
411 )
413 async def load_csv_document(
414 self,
415 filepath: str | Path,
416 metadata: dict[str, Any] | None = None,
417 title: str | None = None,
418 title_field: str | None = None,
419 transformer: ContentTransformer | None = None,
420 ) -> int:
421 """Load and chunk a CSV document by converting it to markdown.
423 Each row becomes a section with the first column (or title_field) as heading.
425 Args:
426 filepath: Path to CSV file
427 metadata: Optional metadata to attach to all chunks
428 title: Optional document title for the markdown
429 title_field: Column to use as section title (default: first column)
430 transformer: Optional ContentTransformer instance with custom configuration
432 Returns:
433 Number of chunks created
435 Example:
436 ```python
437 num_chunks = await kb.load_csv_document(
438 "data/faq.csv",
439 title="Frequently Asked Questions",
440 title_field="question"
441 )
442 ```
443 """
444 filepath = Path(filepath)
446 # Convert to markdown
447 if transformer is None:
448 transformer = ContentTransformer()
450 markdown_text = transformer.transform_csv(
451 filepath,
452 title=title or filepath.stem.replace("_", " ").title(),
453 title_field=title_field,
454 )
456 return await self._load_markdown_text(
457 markdown_text,
458 source=str(filepath),
459 metadata=metadata,
460 )
462 async def load_from_directory(
463 self,
464 directory: str | Path,
465 config: KnowledgeBaseConfig | None = None,
466 progress_callback: Any | None = None,
467 ) -> dict[str, Any]:
468 """Load documents from a directory using KnowledgeBaseConfig.
470 This method uses the xization DirectoryProcessor to process documents
471 with configurable patterns, chunking, and metadata. It supports markdown,
472 JSON, and JSONL files with streaming for large files.
474 Args:
475 directory: Directory path containing documents
476 config: Optional KnowledgeBaseConfig. If not provided, attempts to load
477 from knowledge_base.json/yaml in the directory, or uses defaults.
478 progress_callback: Optional callback function(file_path, num_chunks) for progress
480 Returns:
481 Dictionary with loading statistics:
482 - total_files: Number of files processed
483 - total_chunks: Total chunks created
484 - files_by_type: Count of files by type (markdown, json, jsonl)
485 - errors: List of errors encountered
486 - documents: List of processed document info
488 Example:
489 ```python
490 # With auto-loaded config from directory
491 results = await kb.load_from_directory("./docs")
493 # With explicit config
494 config = KnowledgeBaseConfig(
495 name="product-docs",
496 default_chunking={"max_chunk_size": 800},
497 patterns=[
498 FilePatternConfig(pattern="api/**/*.json", text_fields=["title", "description"]),
499 FilePatternConfig(pattern="**/*.md"),
500 ],
501 exclude_patterns=["**/drafts/**"],
502 )
503 results = await kb.load_from_directory("./docs", config=config)
504 print(f"Loaded {results['total_chunks']} chunks from {results['total_files']} files")
505 ```
506 """
507 import numpy as np
509 directory = Path(directory)
511 # Load or use provided config
512 if config is None:
513 config = KnowledgeBaseConfig.load(directory)
515 # Create processor
516 processor = DirectoryProcessor(config, directory)
518 # Track results
519 results: dict[str, Any] = {
520 "total_files": 0,
521 "total_chunks": 0,
522 "files_by_type": {"markdown": 0, "json": 0, "jsonl": 0},
523 "errors": [],
524 "documents": [],
525 }
527 # Process each document
528 for doc in processor.process():
529 doc_info: dict[str, Any] = {
530 "source": doc.source_file,
531 "type": doc.document_type,
532 "chunks": 0,
533 "errors": doc.errors,
534 }
536 if doc.has_errors:
537 results["errors"].extend([
538 {"file": doc.source_file, "error": err}
539 for err in doc.errors
540 ])
541 results["documents"].append(doc_info)
542 continue
544 # Process chunks for this document
545 vectors = []
546 ids = []
547 metadatas = []
549 source_stem = Path(doc.source_file).stem
551 for chunk in doc.chunks:
552 # Get text for embedding
553 text_for_embedding = chunk.get("embedding_text") or chunk.get("text", "")
555 if not text_for_embedding:
556 continue
558 # Generate embedding
559 embedding = await self.embedding_provider.embed(text_for_embedding)
561 # Convert to numpy if needed
562 if not isinstance(embedding, np.ndarray):
563 embedding = np.array(embedding, dtype=np.float32)
565 # Build chunk ID
566 chunk_index = chunk.get("chunk_index", len(vectors))
567 chunk_id = f"{source_stem}_{chunk_index}"
569 # Build metadata
570 chunk_metadata = {
571 "text": chunk.get("text", ""),
572 "source": doc.source_file,
573 "chunk_index": chunk_index,
574 "document_type": doc.document_type,
575 }
577 # Add chunk-specific metadata
578 if "metadata" in chunk:
579 chunk_metadata.update(chunk["metadata"])
581 # Add document-level metadata
582 if doc.metadata:
583 for key, value in doc.metadata.items():
584 if key not in chunk_metadata:
585 chunk_metadata[key] = value
587 vectors.append(embedding)
588 ids.append(chunk_id)
589 metadatas.append(chunk_metadata)
591 # Batch insert into vector store
592 if vectors:
593 await self.vector_store.add_vectors(
594 vectors=vectors, ids=ids, metadata=metadatas
595 )
597 doc_info["chunks"] = len(vectors)
598 results["total_files"] += 1
599 results["total_chunks"] += len(vectors)
600 results["files_by_type"][doc.document_type] += 1
601 results["documents"].append(doc_info)
603 # Call progress callback if provided
604 if progress_callback:
605 progress_callback(doc.source_file, len(vectors))
607 return results
609 async def _load_markdown_text(
610 self,
611 markdown_text: str,
612 source: str,
613 metadata: dict[str, Any] | None = None,
614 ) -> int:
615 """Internal method to load markdown text directly.
617 Used by load_json_document, load_yaml_document, and load_csv_document.
619 Args:
620 markdown_text: Markdown content to load
621 source: Source identifier for metadata
622 metadata: Optional metadata to attach to all chunks
624 Returns:
625 Number of chunks created
626 """
627 import numpy as np
629 # Parse markdown
630 tree = parse_markdown(markdown_text)
632 # Build quality filter config if specified
633 quality_filter = None
634 if "quality_filter" in self.chunking_config:
635 qf_config = self.chunking_config["quality_filter"]
636 if isinstance(qf_config, ChunkQualityConfig):
637 quality_filter = qf_config
638 elif isinstance(qf_config, dict):
639 quality_filter = ChunkQualityConfig(**qf_config)
641 # Chunk the document with enhanced options
642 chunks = chunk_markdown_tree(
643 tree,
644 max_chunk_size=self.chunking_config.get("max_chunk_size", 500),
645 chunk_overlap=self.chunking_config.get("chunk_overlap", 50),
646 heading_inclusion=HeadingInclusion.IN_METADATA,
647 combine_under_heading=self.chunking_config.get("combine_under_heading", True),
648 quality_filter=quality_filter,
649 generate_embeddings=self.chunking_config.get("generate_embeddings", True),
650 )
652 # Process and store chunks
653 vectors = []
654 ids = []
655 metadatas = []
657 # Generate a base ID from source
658 source_stem = Path(source).stem if source else "doc"
660 for i, chunk in enumerate(chunks):
661 # Use embedding_text if available, otherwise use chunk text
662 text_for_embedding = chunk.metadata.embedding_text or chunk.text
664 # Generate embedding
665 embedding = await self.embedding_provider.embed(text_for_embedding)
667 # Convert to numpy if needed
668 if not isinstance(embedding, np.ndarray):
669 embedding = np.array(embedding, dtype=np.float32)
671 # Prepare metadata with new fields
672 chunk_id = f"{source_stem}_{i}"
673 chunk_metadata = {
674 "text": chunk.text,
675 "source": source,
676 "chunk_index": i,
677 "heading_path": chunk.metadata.heading_display or chunk.metadata.get_heading_path(),
678 "headings": chunk.metadata.headings,
679 "heading_levels": chunk.metadata.heading_levels,
680 "line_number": chunk.metadata.line_number,
681 "chunk_size": chunk.metadata.chunk_size,
682 "content_length": chunk.metadata.content_length,
683 }
685 # Merge with user metadata
686 if metadata:
687 chunk_metadata.update(metadata)
689 vectors.append(embedding)
690 ids.append(chunk_id)
691 metadatas.append(chunk_metadata)
693 # Batch insert into vector store
694 if vectors:
695 await self.vector_store.add_vectors(
696 vectors=vectors, ids=ids, metadata=metadatas
697 )
699 return len(chunks)
701 async def query(
702 self,
703 query: str,
704 k: int = 5,
705 filter_metadata: dict[str, Any] | None = None,
706 min_similarity: float = 0.0,
707 merge_adjacent: bool = False,
708 max_chunk_size: int | None = None,
709 ) -> list[dict[str, Any]]:
710 """Query knowledge base for relevant chunks.
712 Args:
713 query: Query text to search for
714 k: Number of results to return
715 filter_metadata: Optional metadata filters
716 min_similarity: Minimum similarity score (0-1)
717 merge_adjacent: Whether to merge adjacent chunks with same heading
718 max_chunk_size: Maximum size for merged chunks (uses merger config default if not specified)
720 Returns:
721 List of result dictionaries with:
722 - text: Chunk text
723 - source: Source file
724 - heading_path: Heading hierarchy
725 - similarity: Similarity score
726 - metadata: Full chunk metadata
728 Example:
729 ```python
730 results = await kb.query(
731 "How do I configure the database?",
732 k=3,
733 merge_adjacent=True
734 )
735 for result in results:
736 print(f"[{result['similarity']:.2f}] {result['heading_path']}")
737 print(result['text'])
738 ```
739 """
740 import numpy as np
742 # Generate query embedding
743 query_embedding = await self.embedding_provider.embed(query)
745 # Convert to numpy if needed
746 if not isinstance(query_embedding, np.ndarray):
747 query_embedding = np.array(query_embedding, dtype=np.float32)
749 # Search vector store
750 search_results = await self.vector_store.search(
751 query_vector=query_embedding,
752 k=k,
753 filter=filter_metadata,
754 include_metadata=True,
755 )
757 # Format results
758 results = []
759 for _chunk_id, similarity, chunk_metadata in search_results:
760 if chunk_metadata and similarity >= min_similarity:
761 results.append(
762 {
763 "text": chunk_metadata.get("text", ""),
764 "source": chunk_metadata.get("source", ""),
765 "heading_path": chunk_metadata.get("heading_path", ""),
766 "similarity": similarity,
767 "metadata": chunk_metadata,
768 }
769 )
771 # Apply chunk merging if requested
772 if merge_adjacent and results:
773 # Update merger config if max_chunk_size specified
774 if max_chunk_size is not None:
775 merger = ChunkMerger(MergerConfig(max_merged_size=max_chunk_size))
776 else:
777 merger = self.merger
779 merged_chunks = merger.merge(results)
780 results = merger.to_result_list(merged_chunks)
782 return results
784 async def hybrid_query(
785 self,
786 query: str,
787 k: int = 5,
788 text_weight: float = 0.5,
789 vector_weight: float = 0.5,
790 fusion_strategy: str = "rrf",
791 text_fields: list[str] | None = None,
792 filter_metadata: dict[str, Any] | None = None,
793 min_similarity: float = 0.0,
794 merge_adjacent: bool = False,
795 max_chunk_size: int | None = None,
796 ) -> list[dict[str, Any]]:
797 """Query knowledge base using hybrid search (text + vector).
799 Combines keyword matching with semantic vector search for improved
800 retrieval quality. Uses Reciprocal Rank Fusion (RRF) or weighted
801 score fusion to combine results.
803 Args:
804 query: Query text to search for
805 k: Number of results to return
806 text_weight: Weight for text search (0.0 to 1.0)
807 vector_weight: Weight for vector search (0.0 to 1.0)
808 fusion_strategy: Fusion method - "rrf" (default), "weighted_sum", or "native"
809 text_fields: Fields to search for text matching (default: ["text"])
810 filter_metadata: Optional metadata filters
811 min_similarity: Minimum combined score (0-1)
812 merge_adjacent: Whether to merge adjacent chunks with same heading
813 max_chunk_size: Maximum size for merged chunks
815 Returns:
816 List of result dictionaries with:
817 - text: Chunk text
818 - source: Source file
819 - heading_path: Heading hierarchy
820 - similarity: Combined similarity score
821 - text_score: Score from text search (if available)
822 - vector_score: Score from vector search (if available)
823 - metadata: Full chunk metadata
825 Example:
826 ```python
827 # Default RRF fusion
828 results = await kb.hybrid_query(
829 "How do I configure the database?",
830 k=5,
831 )
833 # Weighted toward vector search
834 results = await kb.hybrid_query(
835 "database configuration",
836 k=5,
837 text_weight=0.3,
838 vector_weight=0.7,
839 )
841 # Weighted sum fusion
842 results = await kb.hybrid_query(
843 "configure database",
844 k=5,
845 fusion_strategy="weighted_sum",
846 )
848 for result in results:
849 print(f"[{result['similarity']:.2f}] {result['heading_path']}")
850 print(f" text_score={result.get('text_score', 'N/A')}")
851 print(f" vector_score={result.get('vector_score', 'N/A')}")
852 print(result['text'])
853 ```
854 """
855 from dataknobs_data.vector.hybrid import (
856 FusionStrategy,
857 HybridSearchConfig,
858 reciprocal_rank_fusion,
859 weighted_score_fusion,
860 )
861 import numpy as np
863 # Generate query embedding
864 query_embedding = await self.embedding_provider.embed(query)
866 # Convert to numpy if needed
867 if not isinstance(query_embedding, np.ndarray):
868 query_embedding = np.array(query_embedding, dtype=np.float32)
870 # Check if vector store supports hybrid search natively
871 has_hybrid = hasattr(self.vector_store, "hybrid_search")
873 # Default text fields for knowledge base chunks
874 search_text_fields = text_fields or ["text"]
876 # Map string to FusionStrategy enum
877 strategy_map = {
878 "rrf": FusionStrategy.RRF,
879 "weighted_sum": FusionStrategy.WEIGHTED_SUM,
880 "native": FusionStrategy.NATIVE,
881 }
882 strategy = strategy_map.get(fusion_strategy.lower(), FusionStrategy.RRF)
884 if has_hybrid and strategy == FusionStrategy.NATIVE:
885 # Use vector store's native hybrid search
886 config = HybridSearchConfig(
887 text_weight=text_weight,
888 vector_weight=vector_weight,
889 fusion_strategy=strategy,
890 text_fields=search_text_fields,
891 )
892 hybrid_results = await self.vector_store.hybrid_search(
893 query_text=query,
894 query_vector=query_embedding,
895 text_fields=search_text_fields,
896 k=k,
897 config=config,
898 filter=filter_metadata,
899 )
901 # Convert HybridSearchResult to our result format
902 results = []
903 for hr in hybrid_results:
904 if hr.combined_score >= min_similarity:
905 # Extract metadata from record
906 record_metadata = {}
907 if hasattr(hr.record, "data"):
908 record_metadata = hr.record.data or {}
909 elif hasattr(hr.record, "metadata"):
910 record_metadata = hr.record.metadata or {}
912 results.append({
913 "text": record_metadata.get("text", ""),
914 "source": record_metadata.get("source", ""),
915 "heading_path": record_metadata.get("heading_path", ""),
916 "similarity": hr.combined_score,
917 "text_score": hr.text_score,
918 "vector_score": hr.vector_score,
919 "metadata": record_metadata,
920 })
921 else:
922 # Client-side hybrid search implementation
923 # Step 1: Vector search
924 vector_results = await self.vector_store.search(
925 query_vector=query_embedding,
926 k=k * 2, # Get more for fusion
927 filter=filter_metadata,
928 include_metadata=True,
929 )
931 # Step 2: Text search (simple keyword matching on stored chunks)
932 # For vector stores without text search, we search in retrieved chunks
933 # and also do a broader metadata-based text match if supported
935 # Build vector result map
936 vector_scores: list[tuple[str, float]] = []
937 chunks_by_id: dict[str, dict[str, Any]] = {}
939 for chunk_id, similarity, chunk_metadata in vector_results:
940 if chunk_metadata:
941 vector_scores.append((chunk_id, similarity))
942 chunks_by_id[chunk_id] = chunk_metadata
944 # Simple text matching on chunk content
945 query_lower = query.lower()
946 query_terms = query_lower.split()
947 text_scores: list[tuple[str, float]] = []
949 for chunk_id, chunk_metadata in chunks_by_id.items():
950 text_content = ""
951 for field in search_text_fields:
952 value = chunk_metadata.get(field, "")
953 if value:
954 text_content += " " + str(value)
956 text_content_lower = text_content.lower()
958 # Calculate text match score
959 if query_lower in text_content_lower:
960 # Exact phrase match
961 score = 1.0
962 else:
963 # Term overlap score
964 matched_terms = sum(1 for term in query_terms if term in text_content_lower)
965 score = matched_terms / len(query_terms) if query_terms else 0.0
967 if score > 0:
968 text_scores.append((chunk_id, score))
970 # Sort text scores descending
971 text_scores.sort(key=lambda x: x[1], reverse=True)
973 # Step 3: Fuse results
974 if strategy == FusionStrategy.WEIGHTED_SUM:
975 total = text_weight + vector_weight
976 if total > 0:
977 norm_text = text_weight / total
978 norm_vector = vector_weight / total
979 else:
980 norm_text = norm_vector = 0.5
982 fused = weighted_score_fusion(
983 text_results=text_scores,
984 vector_results=vector_scores,
985 text_weight=norm_text,
986 vector_weight=norm_vector,
987 normalize_scores=True,
988 )
989 else:
990 # Default to RRF
991 fused = reciprocal_rank_fusion(
992 text_results=text_scores,
993 vector_results=vector_scores,
994 k=60,
995 text_weight=text_weight,
996 vector_weight=vector_weight,
997 )
999 # Build result list
1000 text_score_map = dict(text_scores)
1001 vector_score_map = dict(vector_scores)
1003 results = []
1004 for chunk_id, combined_score in fused[:k]:
1005 if combined_score < min_similarity:
1006 continue
1008 chunk_metadata = chunks_by_id.get(chunk_id)
1009 if not chunk_metadata:
1010 continue
1012 results.append({
1013 "text": chunk_metadata.get("text", ""),
1014 "source": chunk_metadata.get("source", ""),
1015 "heading_path": chunk_metadata.get("heading_path", ""),
1016 "similarity": combined_score,
1017 "text_score": text_score_map.get(chunk_id),
1018 "vector_score": vector_score_map.get(chunk_id),
1019 "metadata": chunk_metadata,
1020 })
1022 # Apply chunk merging if requested
1023 if merge_adjacent and results:
1024 if max_chunk_size is not None:
1025 merger = ChunkMerger(MergerConfig(max_merged_size=max_chunk_size))
1026 else:
1027 merger = self.merger
1029 merged_chunks = merger.merge(results)
1030 results = merger.to_result_list(merged_chunks)
1032 return results
1034 def format_context(
1035 self,
1036 results: list[dict[str, Any]],
1037 wrap_in_tags: bool = True,
1038 ) -> str:
1039 """Format search results for LLM context.
1041 Convenience method to format results using the configured formatter.
1043 Args:
1044 results: Search results from query()
1045 wrap_in_tags: Whether to wrap in <knowledge_base> tags
1047 Returns:
1048 Formatted context string
1049 """
1050 context = self.formatter.format(results)
1051 if wrap_in_tags:
1052 context = self.formatter.wrap_for_prompt(context)
1053 return context
1055 async def clear(self) -> None:
1056 """Clear all documents from the knowledge base.
1058 Warning: This removes all stored chunks and embeddings.
1059 """
1060 if hasattr(self.vector_store, "clear"):
1061 await self.vector_store.clear()
1062 else:
1063 raise NotImplementedError(
1064 "Vector store does not support clearing. "
1065 "Consider creating a new knowledge base with a fresh collection."
1066 )
1068 async def save(self) -> None:
1069 """Save the knowledge base to persistent storage.
1071 This persists the vector store index and metadata to disk.
1072 Only applicable for vector stores that support persistence (e.g., FAISS).
1074 Example:
1075 ```python
1076 await kb.load_markdown_document("docs/api.md")
1077 await kb.save() # Persist to disk
1078 ```
1079 """
1080 if hasattr(self.vector_store, "save"):
1081 await self.vector_store.save()
1083 async def close(self) -> None:
1084 """Close the knowledge base and release resources.
1086 This method:
1087 - Saves the vector store to disk (if persistence is configured)
1088 - Closes the vector store connection
1089 - Closes the embedding provider (releases HTTP sessions)
1091 Should be called when done using the knowledge base to prevent
1092 resource leaks (e.g., unclosed aiohttp sessions).
1094 Example:
1095 ```python
1096 kb = await RAGKnowledgeBase.from_config(config)
1097 try:
1098 await kb.load_markdown_document("docs/api.md")
1099 results = await kb.query("How do I configure?")
1100 finally:
1101 await kb.close()
1102 ```
1103 """
1104 # Close vector store (will save if persist_path is set)
1105 if hasattr(self.vector_store, "close"):
1106 await self.vector_store.close()
1108 # Close embedding provider (releases HTTP client sessions)
1109 if hasattr(self.embedding_provider, "close"):
1110 await self.embedding_provider.close()
1112 async def __aenter__(self) -> "RAGKnowledgeBase":
1113 """Async context manager entry.
1115 Returns:
1116 Self for use in async with statement
1118 Example:
1119 ```python
1120 async with await RAGKnowledgeBase.from_config(config) as kb:
1121 await kb.load_markdown_document("docs/api.md")
1122 results = await kb.query("How do I configure?")
1123 # Automatically saved and closed
1124 ```
1125 """
1126 return self
1128 async def __aexit__(
1129 self,
1130 exc_type: type[BaseException] | None,
1131 exc_val: BaseException | None,
1132 exc_tb: types.TracebackType | None,
1133 ) -> None:
1134 """Async context manager exit - ensures cleanup.
1136 Args:
1137 exc_type: Exception type if an exception occurred
1138 exc_val: Exception value if an exception occurred
1139 exc_tb: Exception traceback if an exception occurred
1140 """
1141 await self.close()