vk_chat_bot/tg/utils.py
Kirill Kirilenko 4b265b5405 Устранено дублирование кода в AiAgent.
Добавлена возможность пересылки сообщений в личных чатах.
Обновлены зависимости.
Добавлен requirements.txt.
Исправлены предупреждения PyCharm 2026.1.
2026-04-07 01:01:34 +03:00

82 lines
2.4 KiB
Python

from datetime import datetime
from io import BytesIO
from typing import Optional
from aiogram import Bot
from aiogram.enums import ContentType
from aiogram.types import User, PhotoSize, Message, BufferedInputFile
import ai
import utils
async def get_user_name_for_ai(user: Optional[User]) -> str:
if user is None:
return "Неизвестный пользователь"
elif user.first_name and user.last_name:
return "{} {}".format(user.first_name, user.last_name)
elif user.first_name:
return user.first_name
elif user.username:
return user.username
else:
return str(user.id)
async def download_photo(photo: PhotoSize, bot: Bot) -> bytes:
# noinspection PyTypeChecker
photo_bytes: BytesIO = await bot.download(photo.file_id)
return photo_bytes.getvalue()
def get_message_text(message: Message) -> Optional[str]:
if message.content_type == ContentType.TEXT:
return message.text
elif message.content_type == ContentType.PHOTO:
return message.caption
else:
return None
async def create_ai_message(message: Message, bot: Bot) -> ai.Message:
ai_message = ai.Message()
ai_message.message_id = message.message_id
ai_message.user_name = await get_user_name_for_ai(message.from_user) if message.from_user else "Неизвестный"
if message.text is not None:
ai_message.text = message.text
elif message.photo is not None:
if message.media_group_id is None:
ai_message.text = message.caption
ai_message.image = await download_photo(message.photo[-1], bot)
else:
raise utils.UnsupportedContentType()
else:
raise utils.UnsupportedContentType()
return ai_message
def wrap_photo(image: bytes) -> BufferedInputFile:
return BufferedInputFile(image, 'image.jpg')
def wrap_document(document: bytes, name_prefix: str, extension: str) -> BufferedInputFile:
name = "{}_{}.{}".format(name_prefix, datetime.now().strftime("%Y%m%d_%H%M%S"), extension)
return BufferedInputFile(document, name)
def trim_caption(caption: Optional[str]) -> Optional[str]:
if caption is not None:
return caption[:1024]
else:
return None
__all__ = [
"create_ai_message",
"get_message_text",
"get_user_name_for_ai",
"trim_caption",
"wrap_photo",
"wrap_document",
"wrap_document"
]