import datetime import json from dataclasses import dataclass from typing import List, Optional, Tuple, Union from openrouter import OpenRouter, RetryConfig from openrouter.components import AssistantMessage, AssistantMessageTypedDict, \ ChatMessageToolCall, MessageTypedDict, SystemMessageTypedDict, ToolDefinitionJSONTypedDict from openrouter.errors import ResponseValidationError, OpenRouterError from openrouter.utils import BackoffStrategy import ai.tool from database import BasicDatabase from ai.utils import * from ai.tools import * OPENROUTER_X_TITLE = "TG/VK Chat Bot" OPENROUTER_HTTP_REFERER = "https://ultracoder.org" GROUP_CHAT_MAX_MESSAGES = 40 PRIVATE_CHAT_MAX_MESSAGES = 40 MAX_OUTPUT_TOKENS = 500 @dataclass() class Message: user_name: str = None text: str = None image: bytes = None image_hires: bytes = None message_id: int = None class AiAgent: def __init__(self, openrouter_token: str, openrouter_model: str, fal_token: str, replicate_token: str, tavily_token: str, db: BasicDatabase, platform: str): retry_config = RetryConfig(strategy="backoff", backoff=BackoffStrategy( initial_interval=2000, max_interval=8000, exponent=2, max_elapsed_time=14000), retry_connection_errors=True) self.db = db self.openrouter_model = openrouter_model self.platform = platform self._load_prompts() self.client_openrouter = OpenRouter(api_key=openrouter_token, x_title=OPENROUTER_X_TITLE, http_referer=OPENROUTER_HTTP_REFERER, retry_config=retry_config) # Создание наборов инструментов self.toolsets: list[ai.tool.ToolSet] = [] self.toolsets.append( ImageGenerationToolSet(fal_token=fal_token, replicate_token=replicate_token) ) self.toolsets.append(TavilySearchToolSet(tavily_token=tavily_token)) # Сбор всех инструментов self.tools: list[ai.tool.Tool] = [] self.tools_descriptions: list[ToolDefinitionJSONTypedDict] = [] for toolset in self.toolsets: self.tools.extend(toolset.functions) self.tools_descriptions.extend(toolset.get_all_tools_description()) async def get_group_chat_reply(self, bot_id: int, chat_id: int, message: Message, forwarded_messages: List[Message]) -> Tuple[Message, bool]: message.text = _add_message_prefix(message.text, message.user_name) context = self._get_chat_context(is_group_chat=True, bot_id=bot_id, chat_id=chat_id) context.append(_serialize_message(role="user", text=message.text, image=message.image)) for fwd_message in forwarded_messages: message_text = '<Цитируемое сообщение от {}>'.format(fwd_message.user_name) if fwd_message.text is not None: message_text += '\n' + fwd_message.text fwd_message.text = message_text context.append(_serialize_message(role="user", text=fwd_message.text, image=fwd_message.image)) try: response = await self._generate_reply(bot_id, chat_id, context=context, allow_tools=True) ai_response = response.content tools_artifacts = {} if response.tool_calls is not None: tools_artifacts = await self._process_tool_calls(tool_calls=response.tool_calls, context=context) response2 = await self._generate_reply(bot_id, chat_id, context=context) ai_response = response2.content self.db.context_add_message(bot_id, chat_id, role="user", text=message.text, image=message.image, message_id=message.message_id, max_messages=GROUP_CHAT_MAX_MESSAGES) for fwd_message in forwarded_messages: self.db.context_add_message(bot_id, chat_id, role="user", text=fwd_message.text, image=fwd_message.image, message_id=fwd_message.message_id, max_messages=GROUP_CHAT_MAX_MESSAGES) self.db.context_add_message(bot_id, chat_id, role="assistant", text=ai_response, image=tools_artifacts.get("generated_image"), message_id=None, max_messages=GROUP_CHAT_MAX_MESSAGES) return Message(text=ai_response, image=tools_artifacts.get("generated_image"), image_hires=tools_artifacts.get("generated_image_hires")), True except Exception as e: if str(e).find("Rate limit exceeded") != -1: return Message(text="Извините, достигнут дневной лимит запросов к ИИ (обновляется в 03:00 МСК)."), False else: print(f"Ошибка выполнения запроса к ИИ: {e}") return Message(text=f"Извините, при обработке запроса произошла ошибка:\n{e}"), False async def get_private_chat_reply(self, bot_id: int, chat_id: int, message: Message) -> Tuple[Message, bool]: message.text = _add_message_prefix(message.text) context = self._get_chat_context(is_group_chat=False, bot_id=bot_id, chat_id=chat_id) context.append(_serialize_message(role="user", text=message.text, image=message.image)) try: response = await self._generate_reply(bot_id, chat_id, context=context, allow_tools=True) context.append(_serialize_assistant_message(response)) ai_response = response.content tools_artifacts = {} if response.tool_calls is not None: tools_artifacts = await self._process_tool_calls(tool_calls=response.tool_calls, context=context) response2 = await self._generate_reply(bot_id, chat_id, context=context) ai_response = response2.content self.db.context_add_message(bot_id, chat_id, role="user", text=message.text, image=message.image, message_id=message.message_id, max_messages=PRIVATE_CHAT_MAX_MESSAGES) self.db.context_add_message(bot_id, chat_id, role="assistant", text=ai_response, image=tools_artifacts.get("generated_image"), message_id=None, max_messages=PRIVATE_CHAT_MAX_MESSAGES) return Message(text=ai_response, image=tools_artifacts.get("generated_image"), image_hires=tools_artifacts.get("generated_image_hires")), True except Exception as e: if str(e).find("Rate limit exceeded") != -1: return Message(text="Извините, достигнут дневной лимит запросов к ИИ (обновляется в 03:00 МСК)."), False else: print(f"Ошибка выполнения запроса к ИИ: {e}") return Message(text=f"Извините, при обработке запроса произошла ошибка:\n{e}"), False def get_last_assistant_message_id(self, bot_id: int, chat_id: int): return self.db.context_get_last_assistant_message_id(bot_id, chat_id) def set_last_response_id(self, bot_id: int, chat_id: int, message_id: int): self.db.context_set_last_message_id(bot_id, chat_id, message_id) def clear_chat_context(self, bot_id: int, chat_id: int): self.db.context_clear(bot_id, chat_id) #################################################################################### def _get_chat_context(self, is_group_chat: bool, bot_id: int, chat_id: int) -> List[MessageTypedDict]: context: List[MessageTypedDict] = [ self._construct_system_prompt(is_group_chat=is_group_chat, bot_id=bot_id, chat_id=chat_id) ] for message in self.db.context_get_messages(bot_id, chat_id): context.append(_serialize_message(message["role"], message["text"], message["image"])) return context def _construct_system_prompt(self, is_group_chat: bool, bot_id: int, chat_id: int) -> SystemMessageTypedDict: prompt = self.system_prompt_group_chat if is_group_chat else self.system_prompt_private_chat prompt = prompt.replace('{platform}', 'Telegram' if self.platform == 'tg' else 'VK') prompt += '\n# Доступные инструменты\n' for toolset in self.toolsets: prompt += '\n' + toolset.system_prompt prompt += '\n' + '# Дополнительные инструкции\n' bot = self.db.get_bot(bot_id) if bot['ai_prompt'] is not None: prompt += '\n' + bot['ai_prompt'] + '\n' chat = self.db.create_chat_if_not_exists(bot_id, chat_id) if chat['ai_prompt'] is not None: prompt += '\n' + chat['ai_prompt'] return {"role": "system", "content": prompt} async def _generate_reply(self, bot_id: int, chat_id: int, context: List[MessageTypedDict], allow_tools: bool = False) -> AssistantMessage: response = await self._async_chat_completion_request( model=self.openrouter_model, messages=context, tools=self.tools_descriptions if allow_tools else None, tool_choice="auto" if allow_tools else None, max_tokens=MAX_OUTPUT_TOKENS, user=f'{self.platform}_{bot_id}_{chat_id}' ) return self._filter_response(response.choices[0].message) async def _process_tool_calls(self, tool_calls: List[ChatMessageToolCall], context: List[MessageTypedDict]) -> dict: artifacts = {} if tool_calls is None: return artifacts tools_map = {tool.name: tool for tool in self.tools} for tool_call in tool_calls: tool_name = tool_call.function.name tool_args = json.loads(tool_call.function.arguments) if tool_name in tools_map: tool = tools_map[tool_name] # Вызов инструмента с передачей artifacts tool_result = await tool.execute(tool_args, artifacts) context.append({ "role": "tool", "tool_call_id": tool_call.id, "content": tool_result }) return artifacts async def _async_chat_completion_request(self, **kwargs): try: return await self.client_openrouter.chat.send_async(**kwargs) except ResponseValidationError as e: # Костыль для OpenRouter SDK: # https://github.com/OpenRouterTeam/python-sdk/issues/44 body = json.loads(e.body) if "error" in body: try: raw_response = json.loads(body["error"]["metadata"]["raw"]) message = str(raw_response["error"]["message"]) e = RuntimeError(message) except Exception: pass raise e except OpenRouterError as e: if e.message == "Provider returned error": body = json.loads(e.body) try: raw_response = json.loads(body["error"]["metadata"]["raw"]) message = str(raw_response["error"]["message"]) e = RuntimeError(message) except Exception: pass raise e @staticmethod def _filter_response(response: AssistantMessage) -> AssistantMessage: text = str(response.content) text = text.replace("", "") response.content = text return response def _load_prompts(self): with open("ai/prompts/group_chat.md", "r") as f: self.system_prompt_group_chat = f.read() with open("ai/prompts/private_chat.md", "r") as f: self.system_prompt_private_chat = f.read() def _add_message_prefix(text: Optional[str], username: Optional[str] = None) -> str: current_time = datetime.datetime.now().strftime("%d.%m.%Y %H:%M") prefix = f"[{current_time}, {username}]" if username is not None else f"[{current_time}]" return f"{prefix}: {text}" if text is not None else f"{prefix}:" def _serialize_message(role: str, text: Optional[str], image: Optional[bytes]) -> dict: return {"role": role, "content": serialize_message_content(text, image)} def _serialize_assistant_message(message: AssistantMessage) -> AssistantMessageTypedDict: return _remove_none_recursive(message.model_dump(by_alias=True)) def _remove_none_recursive(data: Union[dict, list, any]) -> Union[dict, list, any]: if isinstance(data, dict): return { k: _remove_none_recursive(v) for k, v in data.items() if v is not None } elif isinstance(data, list): return [ _remove_none_recursive(item) for item in data if item is not None ] else: return data