vk_chat_bot/vk/tasks.py
Kirill Kirilenko 4b265b5405 Устранено дублирование кода в AiAgent.
Добавлена возможность пересылки сообщений в личных чатах.
Обновлены зависимости.
Добавлен requirements.txt.
Исправлены предупреждения PyCharm 2026.1.
2026-04-07 01:01:34 +03:00

159 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import traceback
from asyncio import sleep
from vkbottle import API, bold
from messages import *
import vk.vk_database as database
from vk.handlers.user import format_rating
from vk.utils import *
async def cleanup_chats(_api: API):
# TODO
pass
async def cleanup_users(api: API):
bot_id = get_bot_id(api)
for chat in database.DB.get_chats(bot_id):
if chat['active'] == 0:
continue
chat_id = chat['chat_id']
members = await api.messages.get_conversation_members(peer_id=chat_id, extended=False)
for user in database.DB.get_top_silent(bot_id, chat_id, threshold_days=14):
user_id = user['user_id']
found = False
for item in members.items:
if item.member_id == user_id:
found = True
break
if not found:
database.DB.delete_user(bot_id, chat_id, user_id)
print(f'Из чата (bot_id={bot_id}, chat_id={chat_id}) удален пользователь (id={user_id})')
async def reset_counters(reset_month: bool, api: API):
bot_id = get_bot_id(api)
database.DB.reset_messages_today(bot_id)
if not reset_month:
return
for chat in database.DB.get_chats(bot_id):
if chat['active'] == 0:
continue
chat_id = chat['chat_id']
top_users = database.DB.get_top_messages_month(bot_id, chat_id)
if len(top_users) == 0:
continue
message = bold('Итоговая статистика за прошедший месяц') + '\n'
message += await format_rating(top_users, api)
# noinspection PyArgumentList
await api.messages.send(random_id=0, peer_id=chat_id,
message=str(message), format_data=message.as_raw_data())
database.DB.reset_messages_month(bot_id)
async def check_birthdays(api: API):
bot_id = get_bot_id(api)
chats = database.DB.get_chats(bot_id)
today = datetime.datetime.today()
for chat in chats:
if chat['active'] == 0:
continue
chat_id = chat['chat_id']
# noinspection PyTypeChecker
members = await api.messages.get_conversation_members(peer_id=chat_id, fields=['bdate'])
if members.profiles is None:
break
for item in members.items:
user_id = item.member_id
if user_id < 0:
continue
user = database.DB.create_user_if_not_exists(bot_id, chat_id, user_id)
for profile in members.profiles:
if profile.id == user_id:
if profile.bdate is None or user['happy_birthday'] == 0:
break
parts = profile.bdate.split('.')
if len(parts) < 2:
break
day = int(parts[0])
month = int(parts[1])
if day == today.day and month == today.month:
message = chat['birthday_message'] or MESSAGE_DEFAULT_BIRTHDAY
message = message.format(name=f'@id{user_id} ({profile.first_name} {profile.last_name})')
await api.messages.send(random_id=0, peer_id=chat_id, message=message)
break
async def wait_until(target_time: datetime.datetime):
now = datetime.datetime.now(target_time.tzinfo)
if now >= target_time:
return
delay_seconds = (target_time - now).total_seconds()
await sleep(delay_seconds)
async def daily_maintenance_task(api: API):
tz = datetime.timezone(datetime.timedelta(hours=3), name="MSK")
target_time = datetime.time(6, 0, 0, tzinfo=tz)
now = datetime.datetime.now(tz)
if now.hour > target_time.hour or now.hour == target_time.hour and now.minute > target_time.minute:
target_date = now.date() + datetime.timedelta(days=1)
else:
target_date = now.date()
target_datetime = datetime.datetime.combine(target_date, target_time)
while True:
await wait_until(target_datetime)
try:
await cleanup_chats(api)
except Exception:
print(traceback.format_exc())
try:
await cleanup_users(api)
except Exception:
print(traceback.format_exc())
try:
await reset_counters(target_datetime.day == 1, api)
except Exception:
print(traceback.format_exc())
try:
await check_birthdays(api)
except Exception:
print(traceback.format_exc())
target_datetime = target_datetime + datetime.timedelta(days=1)
async def startup_task(api: API):
groups = (await api.groups.get_by_id()).groups
if groups is None:
print("Не удалось получить информацию о боте")
return
me = groups[0]
print(f"Бот '{me.name}' (id={me.id}) запущен.")