import os import anthropic from dotenv import load_dotenv from claude_space.settings import settings load_dotenv() syncClient = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY, timeout=5) asyncClient = anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY, timeout=60) class AnthropicCustom: def __init__(self, api_key, model, max_tokens=1000, prompt=""): self.api_key = api_key self.model = model self.max_tokens = max_tokens self.prompt = prompt if os.environ.get("ANTHROPIC_API_KEY") is not None: api_key = os.environ.get("ANTHROPIC_API_KEY") else: os.environ["ANTHROPIC_API_KEY"] = api_key def get_anthropic_response(self): response = syncClient.completions.create( prompt=self.prompt, model=self.model, max_tokens_to_sample=self.max_tokens, ) return response.completion async def get_anthropic_response_async(self): async for line in await asyncClient.completions.create( prompt=self.prompt, model=self.model, max_tokens_to_sample=self.max_tokens, stop_sequences=[ anthropic.HUMAN_PROMPT, ], stream=True, ): yield line.completion