Coverage for src/dataknobs_llm/llm/providers/anthropic.py: 16%

110 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 13:51 -0700

1"""Anthropic Claude LLM provider implementation. 

2 

3This module provides Anthropic Claude API integration for dataknobs-llm, supporting: 

4- Claude 3 (Opus, Sonnet, Haiku) and Claude 2 models 

5- Native tools API for function calling 

6- Vision capabilities (Claude 3+) 

7- Streaming responses 

8- Long context windows (up to 200k tokens) 

9- Advanced reasoning and coding capabilities 

10 

11The AnthropicProvider uses the official Anthropic Python SDK and supports 

12all standard Anthropic API parameters including system prompts, temperature, 

13and token limits. 

14 

15Example: 

16 ```python 

17 from dataknobs_llm.llm.providers import AnthropicProvider 

18 from dataknobs_llm.llm.base import LLMConfig 

19 

20 # Create provider 

21 config = LLMConfig( 

22 provider="anthropic", 

23 model="claude-3-sonnet-20240229", 

24 api_key="sk-ant-...", # or set ANTHROPIC_API_KEY env var 

25 temperature=0.7, 

26 max_tokens=1024 

27 ) 

28 

29 async with AnthropicProvider(config) as llm: 

30 # Simple completion 

31 response = await llm.complete("Explain quantum computing") 

32 print(response.content) 

33 

34 # Streaming for real-time output 

35 async for chunk in llm.stream_complete("Write a story"): 

36 print(chunk.delta, end="", flush=True) 

37 

38 # Tool use (Claude 3+) 

39 tools = [{ 

40 "name": "calculator", 

41 "description": "Perform arithmetic", 

42 "input_schema": { 

43 "type": "object", 

44 "properties": { 

45 "operation": {"type": "string"}, 

46 "x": {"type": "number"}, 

47 "y": {"type": "number"} 

48 } 

49 } 

50 }] 

51 

52 response = await llm.function_call(messages, tools) 

53 ``` 

54 

55See Also: 

56 - Anthropic API Documentation: https://docs.anthropic.com/ 

57 - anthropic Python package: https://github.com/anthropics/anthropic-sdk-python 

58""" 

59 

60import os 

61import json 

62from typing import TYPE_CHECKING, Any, Dict, List, Union, AsyncIterator 

63 

64from ..base import ( 

65 LLMConfig, LLMMessage, LLMResponse, LLMStreamResponse, 

66 AsyncLLMProvider, ModelCapability, 

67 normalize_llm_config 

68) 

69from dataknobs_llm.prompts import AsyncPromptBuilder 

70 

71if TYPE_CHECKING: 

72 from dataknobs_config.config import Config 

73 

74 

75class AnthropicProvider(AsyncLLMProvider): 

76 r"""Anthropic Claude LLM provider with full API support. 

77 

78 Provides async access to Anthropic's Claude models including Claude 3 

79 (Opus, Sonnet, Haiku) and Claude 2. Supports advanced features like 

80 native tool use, vision, and extended context windows. 

81 

82 Features: 

83 - Claude 3 Opus/Sonnet/Haiku and Claude 2 models 

84 - Native tools API for function calling (Claude 3+) 

85 - Vision capabilities for image understanding (Claude 3+) 

86 - Streaming responses for real-time output 

87 - Long context windows (up to 200k tokens) 

88 - Advanced reasoning and coding capabilities 

89 - System prompts for behavior control 

90 - JSON output mode 

91 

92 Example: 

93 ```python 

94 from dataknobs_llm.llm.providers import AnthropicProvider 

95 from dataknobs_llm.llm.base import LLMConfig, LLMMessage 

96 

97 # Basic usage 

98 config = LLMConfig( 

99 provider="anthropic", 

100 model="claude-3-sonnet-20240229", 

101 api_key="sk-ant-...", 

102 temperature=0.7, 

103 max_tokens=1024 

104 ) 

105 

106 async with AnthropicProvider(config) as llm: 

107 # Simple completion 

108 response = await llm.complete("Explain machine learning") 

109 print(response.content) 

110 

111 # With system prompt 

112 messages = [ 

113 LLMMessage( 

114 role="system", 

115 content="You are an expert Python tutor" 

116 ), 

117 LLMMessage( 

118 role="user", 

119 content="How do I use decorators?" 

120 ) 

121 ] 

122 response = await llm.complete(messages) 

123 

124 # Long context processing (Claude 3+) 

125 long_config = LLMConfig( 

126 provider="anthropic", 

127 model="claude-3-opus-20240229", 

128 max_tokens=4096 

129 ) 

130 

131 llm = AnthropicProvider(long_config) 

132 await llm.initialize() 

133 

134 # Process large document 

135 with open("large_doc.txt") as f: 

136 long_text = f.read() # Up to 200k tokens! 

137 

138 response = await llm.complete( 

139 f"Summarize this document:\n\n{long_text}" 

140 ) 

141 

142 # Tool use / function calling (Claude 3+) 

143 tools = [ 

144 { 

145 "name": "web_search", 

146 "description": "Search the web for information", 

147 "input_schema": { 

148 "type": "object", 

149 "properties": { 

150 "query": { 

151 "type": "string", 

152 "description": "Search query" 

153 }, 

154 "num_results": { 

155 "type": "integer", 

156 "description": "Number of results" 

157 } 

158 }, 

159 "required": ["query"] 

160 } 

161 } 

162 ] 

163 

164 messages = [ 

165 LLMMessage( 

166 role="user", 

167 content="Search for latest AI news" 

168 ) 

169 ] 

170 

171 response = await llm.function_call(messages, tools) 

172 if response.function_call: 

173 import json 

174 tool_input = json.loads(response.function_call["arguments"]) 

175 print(f"Tool: {response.function_call['name']}") 

176 print(f"Input: {tool_input}") 

177 ``` 

178 

179 Args: 

180 config: LLMConfig, dataknobs Config, or dict with provider settings 

181 prompt_builder: Optional AsyncPromptBuilder for prompt rendering 

182 

183 Attributes: 

184 _client: Anthropic AsyncAnthropic client instance 

185 

186 See Also: 

187 LLMConfig: Configuration options 

188 AsyncLLMProvider: Base provider interface 

189 Anthropic API Docs: https://docs.anthropic.com/ 

190 """ 

191 

192 def __init__( 

193 self, 

194 config: Union[LLMConfig, "Config", Dict[str, Any]], 

195 prompt_builder: AsyncPromptBuilder | None = None 

196 ): 

197 # Normalize config first 

198 llm_config = normalize_llm_config(config) 

199 super().__init__(llm_config, prompt_builder=prompt_builder) 

200 

201 async def initialize(self) -> None: 

202 """Initialize Anthropic client.""" 

203 try: 

204 import anthropic 

205 

206 api_key = self.config.api_key or os.environ.get('ANTHROPIC_API_KEY') 

207 if not api_key: 

208 raise ValueError("Anthropic API key not provided") 

209 

210 self._client = anthropic.AsyncAnthropic( 

211 api_key=api_key, 

212 base_url=self.config.api_base, 

213 timeout=self.config.timeout 

214 ) 

215 self._is_initialized = True 

216 except ImportError as e: 

217 raise ImportError("anthropic package not installed. Install with: pip install anthropic") from e 

218 

219 async def close(self) -> None: 

220 """Close Anthropic client.""" 

221 if self._client: 

222 await self._client.close() # type: ignore[unreachable] 

223 self._is_initialized = False 

224 

225 async def validate_model(self) -> bool: 

226 """Validate model availability.""" 

227 valid_models = [ 

228 'claude-3-opus', 'claude-3-sonnet', 'claude-3-haiku', 

229 'claude-2.1', 'claude-2.0', 'claude-instant-1.2' 

230 ] 

231 return any(m in self.config.model for m in valid_models) 

232 

233 def get_capabilities(self) -> List[ModelCapability]: 

234 """Get Anthropic model capabilities.""" 

235 capabilities = [ 

236 ModelCapability.TEXT_GENERATION, 

237 ModelCapability.CHAT, 

238 ModelCapability.STREAMING, 

239 ModelCapability.CODE 

240 ] 

241 

242 # Claude 3+ models support vision and tools 

243 if 'claude-3' in self.config.model or 'claude-sonnet' in self.config.model or 'claude-opus' in self.config.model: 

244 capabilities.extend([ 

245 ModelCapability.VISION, 

246 ModelCapability.FUNCTION_CALLING 

247 ]) 

248 

249 return capabilities 

250 

251 async def complete( 

252 self, 

253 messages: Union[str, List[LLMMessage]], 

254 **kwargs 

255 ) -> LLMResponse: 

256 """Generate completion.""" 

257 if not self._is_initialized: 

258 await self.initialize() 

259 

260 # Convert to Anthropic format 

261 if isinstance(messages, str): 

262 prompt = messages 

263 else: 

264 # Build prompt from messages 

265 prompt = "" 

266 for msg in messages: 

267 if msg.role == 'system': 

268 prompt = msg.content + "\n\n" + prompt 

269 elif msg.role == 'user': 

270 prompt += f"\n\nHuman: {msg.content}" 

271 elif msg.role == 'assistant': 

272 prompt += f"\n\nAssistant: {msg.content}" 

273 prompt += "\n\nAssistant:" 

274 

275 # Make API call 

276 response = await self._client.messages.create( 

277 model=self.config.model, 

278 messages=[{"role": "user", "content": prompt}], 

279 max_tokens=self.config.max_tokens or 1024, 

280 temperature=self.config.temperature, 

281 top_p=self.config.top_p, 

282 stop_sequences=self.config.stop_sequences 

283 ) 

284 

285 return LLMResponse( 

286 content=response.content[0].text, 

287 model=response.model, 

288 finish_reason=response.stop_reason, 

289 usage={ 

290 'prompt_tokens': response.usage.input_tokens, 

291 'completion_tokens': response.usage.output_tokens, 

292 'total_tokens': response.usage.input_tokens + response.usage.output_tokens 

293 } if hasattr(response, 'usage') else None 

294 ) 

295 

296 async def stream_complete( 

297 self, 

298 messages: Union[str, List[LLMMessage]], 

299 **kwargs 

300 ) -> AsyncIterator[LLMStreamResponse]: 

301 """Generate streaming completion.""" 

302 if not self._is_initialized: 

303 await self.initialize() 

304 

305 # Convert to Anthropic format 

306 if isinstance(messages, str): 

307 prompt = messages 

308 else: 

309 prompt = self._build_prompt(messages) 

310 

311 # Stream API call 

312 async with self._client.messages.stream( 

313 model=self.config.model, 

314 messages=[{"role": "user", "content": prompt}], 

315 max_tokens=self.config.max_tokens or 1024, 

316 temperature=self.config.temperature 

317 ) as stream: 

318 async for chunk in stream: 

319 if chunk.type == 'content_block_delta': 

320 yield LLMStreamResponse( 

321 delta=chunk.delta.text, 

322 is_final=False 

323 ) 

324 

325 # Final message 

326 message = await stream.get_final_message() 

327 yield LLMStreamResponse( 

328 delta='', 

329 is_final=True, 

330 finish_reason=message.stop_reason 

331 ) 

332 

333 async def embed( 

334 self, 

335 texts: Union[str, List[str]], 

336 **kwargs 

337 ) -> Union[List[float], List[List[float]]]: 

338 """Anthropic doesn't provide embeddings.""" 

339 raise NotImplementedError("Anthropic doesn't provide embedding models") 

340 

341 async def function_call( 

342 self, 

343 messages: List[LLMMessage], 

344 functions: List[Dict[str, Any]], 

345 **kwargs 

346 ) -> LLMResponse: 

347 """Execute function calling with native Anthropic tools API (Claude 3+).""" 

348 if not self._is_initialized: 

349 await self.initialize() 

350 

351 # Convert to Anthropic message format 

352 anthropic_messages = [] 

353 system_content = self.config.system_prompt or '' 

354 

355 for msg in messages: 

356 if msg.role == 'system': 

357 # Anthropic uses system parameter, not system messages 

358 system_content = msg.content if not system_content else f"{system_content}\n\n{msg.content}" 

359 else: 

360 anthropic_messages.append({ 

361 'role': msg.role, 

362 'content': msg.content 

363 }) 

364 

365 # Convert functions to Anthropic tools format 

366 tools = [] 

367 for func in functions: 

368 tool = { 

369 'name': func.get('name', ''), 

370 'description': func.get('description', ''), 

371 'input_schema': func.get('parameters', { 

372 'type': 'object', 

373 'properties': {}, 

374 'required': [] 

375 }) 

376 } 

377 tools.append(tool) 

378 

379 # Make API call with tools 

380 try: 

381 response = await self._client.messages.create( 

382 model=self.config.model, 

383 messages=anthropic_messages, 

384 system=system_content if system_content else None, 

385 tools=tools, 

386 max_tokens=self.config.max_tokens or 1024, 

387 temperature=self.config.temperature, 

388 top_p=self.config.top_p 

389 ) 

390 

391 # Extract response content and tool use 

392 content = '' 

393 tool_use = None 

394 

395 for block in response.content: 

396 if block.type == 'text': 

397 content += block.text 

398 elif block.type == 'tool_use': 

399 tool_use = { 

400 'name': block.name, 

401 'arguments': block.input 

402 } 

403 

404 llm_response = LLMResponse( 

405 content=content, 

406 model=response.model, 

407 finish_reason=response.stop_reason, 

408 usage={ 

409 'prompt_tokens': response.usage.input_tokens, 

410 'completion_tokens': response.usage.output_tokens, 

411 'total_tokens': response.usage.input_tokens + response.usage.output_tokens 

412 }, 

413 function_call=tool_use 

414 ) 

415 

416 return llm_response 

417 

418 except Exception as e: 

419 # Fallback to prompt-based approach for older models 

420 import logging 

421 logging.warning(f"Anthropic native tools failed, falling back to prompt-based: {e}") 

422 

423 function_descriptions = "\n".join([ 

424 f"- {f['name']}: {f['description']}" 

425 for f in functions 

426 ]) 

427 

428 system_prompt = f"""You have access to the following functions: 

429{function_descriptions} 

430 

431When you need to call a function, respond with: 

432FUNCTION_CALL: {{ 

433 "name": "function_name", 

434 "arguments": {{...}} 

435}}""" 

436 

437 messages_with_system = [ 

438 LLMMessage(role='system', content=system_prompt) 

439 ] + list(messages) 

440 

441 response = await self.complete(messages_with_system, **kwargs) 

442 

443 # Parse function call from response 

444 if 'FUNCTION_CALL:' in response.content: 

445 try: 

446 func_json = response.content.split('FUNCTION_CALL:')[1].strip() 

447 function_call = json.loads(func_json) 

448 response.function_call = function_call 

449 except (json.JSONDecodeError, IndexError): 

450 pass 

451 

452 return response 

453 

454 def _build_prompt(self, messages: List[LLMMessage]) -> str: 

455 """Build Anthropic-style prompt from messages.""" 

456 prompt = "" 

457 for msg in messages: 

458 if msg.role == 'system': 

459 prompt = msg.content + "\n\n" + prompt 

460 elif msg.role == 'user': 

461 prompt += f"\n\nHuman: {msg.content}" 

462 elif msg.role == 'assistant': 

463 prompt += f"\n\nAssistant: {msg.content}" 

464 prompt += "\n\nAssistant:" 

465 return prompt