Code refactoring

This commit is contained in:
hok7z 2022-12-01 12:54:32 +02:00
parent 149a21d6e3
commit fad8298680
18 changed files with 328 additions and 456 deletions

6
app.py
View File

@ -26,11 +26,11 @@ async def on_startup(dp):
if not db.get_columns('members'):
db.create_tables([Member,Restriction])
logging.warning("Member table is empty")
await bot.send_message(config.second_group_id,"First launch successful!")
await bot.send_message(config.second_group_id,"Member table is empty, run: `!reload`",parse_mode="Markdown")
await bot.send_message(config.second_group_id, "First launch successful!")
await bot.send_message(config.second_group_id, "Member table is empty, run: `!reload`",parse_mode="Markdown")
elif Member.select().count() == 0:
await bot.send_message(config.second_group_id,"Member table is empty, run `!reload`",parse_mode="Markdown")
await bot.send_message(config.second_group_id, "Member table is empty, run `!reload`",parse_mode="Markdown")
logging.warning("Member table is empty")
from utils.notify_start import notify_started_bot

View File

@ -1,4 +1,4 @@
from aiogram import Dispatcher,Bot
from aiogram import Dispatcher, Bot
from environs import Env
env = Env()

View File

@ -5,6 +5,9 @@ from playhouse.db_url import connect
from datetime import datetime, date
db = connect(config.db_url)
from enum import Enum
class MemberRoles(Enum):
OWNER = "owner"
@ -12,82 +15,30 @@ class MemberRoles(Enum):
HELPER = "helper"
MEMBER = "member"
db = connect(config.db_url)
class Member(Model):
user_id = BigIntegerField()
first_name = CharField()
username = CharField(null=True)
warns = BigIntegerField(default=0)
role = CharField(default="member")
warns = BigIntegerField(default=0)
joined = DateField(default=date.today())
@staticmethod
def exists(fieldname, value) -> bool | None:
"""Check if data exists in db"""
query = Member.select().where(fieldname == value)
if (query is None):
return None
return query.exists()
@staticmethod
def search(fieldname:Field, value):
if (not Member.exists(fieldname, value)):
return None
return Member.get(fieldname == value)
@staticmethod
def report(delete=False):
"""If the user exists, returns number reports. Gives the user a warning or retrieves it."""
count = Member.warns
if delete:count -= 1
else:count += 1
Member.update(warns = count).execute()
return count
class Meta:
db_table = "members"
database = db
class Restriction(Model):
action = CharField()
from_user = ForeignKeyField(Member, lazy_load=True)
to_user = ForeignKeyField(Member, lazy_load=True)
reason = CharField(null=True)
text = CharField()
message_id = BigIntegerField()
timestamp = DateTimeField(default=datetime.now().replace(microsecond=0))
@staticmethod
def search(to_user=None,id=None):
if (id):
query = Restriction.get(Restriction.id == id)
if (to_user):
query = Restriction.select().where(Restriction.to_user == to_user)
return query
class Meta:
db_table = "auditlog"
database = db
# if not db.get_columns('members'):
# db.create_tables([Member,Restriction])
# logging.warning("Members table is empty, you need get data(run !reload)")
#
# if Member.select().count() == 0:
# logging.warning("Members table is empty, you need get data(run !reload)")
# def build() -> None:
# db.create_tables([Member,Restriction])

View File

@ -12,11 +12,7 @@ class AvaibleRolesFilter(BoundFilter):
self.avaible_roles = available_roles
async def check(self,message:types.Message):
member = Member.search(Member.user_id,message.from_user.id)
if (member is None):
await message.answer("Something wrong: user not found in database(u should run !reload)")
return
member = Member.get(Member.user_id == message.from_user.id)
if (member.role == "owner"):
return True

View File

@ -1,23 +1,29 @@
import logging
from load import dp,bot
from peewee import DoesNotExist
from load import dp, bot, types
import config
from aiogram.utils.exceptions import Unauthorized
@dp.errors_handler()
async def errors_handler(update, exception):
async def errors_handler(update: types.Update, exception):
if (isinstance(exception,Unauthorized)):
logging.info(f"Unathorized:{config.token}")
return True
if (isinstance(exception,DoesNotExist)):
await update.message.reply("Membser not found, you shoud update database data `!reload`",
parse_mode="Markdown")
return True
await update.message.answer("Error happaned!\nBot terminated!")
await bot.send_message(
config.second_group_id,
f"**Bot terminated**!\nException:{exception}",
parse_mode="Markdown"
await bot.send_message(config.second_group_id,
(
"Bot terminated"
f"{exception}"
),parse_mode="Markdown"
)
logging.info(f"Bot terminated!")

View File

@ -1,3 +1,3 @@
from . import moderator
from . import user
from . import service
from . import simple_admin
from . import main
from . import new_chat_member

View File

@ -3,26 +3,27 @@ from load import bot, dp, types
import config
from database import Member
@dp.message_handler(commands=["start","help"],chat_type=[types.ChatType.SUPERGROUP])
async def start_command_group(message:types.Message):
await message.answer((
f"Hi,**{message.from_user.first_name}**!\n"
"My commands:\n"
" /help , /start - read the message.\n"
" /me , /bio - member information (if member group)."),
" /help , /start - read the message\n"
" /me , /bio - member information (if member group)"),
parse_mode="Markdown"
)
@dp.message_handler(commands=["leave"],chat_type=[types.ChatType.SUPERGROUP])
async def leave_group(message:types.Message):
user = message.from_user
arguments = message.get_args()
if (arguments != "I UNDERSTAND!"):
if (message.text.split()[0] != "I UNDERSTAND!"):
await message.answer("use /leave I UNDERSTAND")
return
Member.delete().where(Member.user_id == user.id).execute()
Member.delete().get(Member.user_id == user.id)
# Ban user and save (bool)
status = await bot.kick_chat_member(chat_id=message.chat.id,user_id=user.id,until_date=None)
@ -34,11 +35,7 @@ async def leave_group(message:types.Message):
@dp.message_handler(commands=["bio","me"],chat_type=[types.ChatType.SUPERGROUP])
async def get_information(message: types.Message):
user = Member.search(Member.user_id, message.from_user.id)
if (not user):
await message.answer("Something wrong!")
return
user = Member.get(Member.user_id == message.from_user.id)
await message.answer((
f"[{user.first_name}](tg://user?id={user.user_id}) ({user.role})\n"
@ -47,8 +44,12 @@ async def get_information(message: types.Message):
)
@dp.message_handler(commands=["report"],replied=True,chat_type=[types.ChatType.SUPERGROUP])
async def report(message: types.Message):
@dp.message_handler(
commands=["report"],
replied=True,
chat_type=[types.ChatType.SUPERGROUP]
)
async def user_report(message: types.Message):
args = message.text.split()
if (len(args) != 2):
@ -59,14 +60,20 @@ async def report(message: types.Message):
reporter_user = message.from_user
reason = args[1]
# TODO: translate it
msg = ("Жалоба на: [{}](tg://user?id={})\nПожаловался:[{}](tg://user?id={})\nПричина: {}\n{}"
.format(reported_user['first_name'],
reported_user['id'],
await bot.send_message(
config.second_group_id,
(
"Жалоба на: [{}](tg://user?id={})\n"
"Пожаловался: [{}](tg://user?id={})\n"
"Причина: {}\n"
"{}"
).format(
reported_user.first_name,
reported_user.id,
reporter_user.first_name,
reporter_user.id,
reason,
message.reply_to_message.link("Link message", as_html=False)
))
await bot.send_message(config.second_group_id, msg, parse_mode="Markdown")
),
parse_mode="Markdown",
)

View File

@ -0,0 +1,29 @@
from load import dp, types
from database import Member
@dp.message_handler(content_types=["new_chat_members"])
async def welcome_message(message:types.Message):
user = Member.get_or_none(Member.user_id == message.from_user.id)
if (user):
await message.answer("Спасибо что вы с нами.")
if not (user):
Member.create(
user_id = message.from_user.id,
first_name = message.from_user.first_name,
username = message.from_user.username,
)
# TODO: translate it
await message.answer((
f"Привет,{user.first_name}\n"
"Просим ознакомится с [правилами](https://telegra.ph/Pravila-CHata-Open-Source-05-29)\n"
"Советы на 'хороший тон':\n"
"\t\t1.Формулируй свою мысль в 1-2 предложения\n"
"\t\t1.Не задавай [мета](nometa.xyz) вопросы\n"),
parse_mode="Markdown")
await message.delete()

View File

@ -1,57 +0,0 @@
from load import dp, types
from database import Member
# TODO: fix it
# import utils
# import config
# vt = utils.VirusTotalAPI(config.vt_api,True)
# @dp.message_handler(content_types=["document"],chat_type=[types.ChatType.SUPERGROUP])
# async def file_handler(message:types.Message):
# file = await bot.get_file(message.document.file_id)
#
# await bot.send_message(
# message.chat.id,
# await vt.scan_file(file.file_path),
# parse_mode="Markdown"
# )
@dp.message_handler(content_types=["new_chat_members"])
async def welcome_message(message:types.Message):
# User
user = message.from_user
exists = Member.exists(Member.user_id,user.id)
if (exists):
await message.answer("Спасибо что вы с нами.")
if not (exists):
Member.create(
user_id = user.id,
first_name = user.first_name,
username = user.username,
)
# TODO: translate it
await message.answer((
f"Привет,{user.first_name}\n"
"Просим ознакомится с [правилами](https://telegra.ph/Pravila-CHata-Open-Source-05-29)\n"
"Советы на 'хороший тон':\n"
"\t\t1.Формулируй свою мысль в 1-2 предложения\n"
"\t\t1.Не задавай [мета](nometa.xyz) вопросы\n"),
parse_mode="Markdown")
await message.delete()
# @dp.message_handler()
# async def filter_link_shorts(message:types.Message):
# link_shorters = open("txt/link_shorters.txt","r").read().split()
#
# for y in link_shorters:
# for user_message in message.text.lower().split():
# if (y in user_message):await message.delete()
@dp.message_handler(content_types=types.ContentType.VOICE)
async def voice_message(message:types.Message):
pass

View File

@ -2,24 +2,20 @@ from load import bot, dp, types
from aiogram.types.chat_permissions import ChatPermissions
import config
import utils
from database import Member, Restriction
from database import MemberRoles
from utils import getCommandArgs, getArgument, checkArg, parse_duration, delete_substring_from_string
from utils import get_command_args, get_argument, parse_timedelta_from_message
# Filters
# is_admin=True - Check admin permission, if user is admin, continue.
# replied=True - If message is answer, continue.
# accessed_roles - list roles.
@dp.message_handler(commands=["ban","sban"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
@dp.message_handler(
commands=["ban", "sban"],
commands_prefix="!",
available_roles=[MemberRoles.HELPER, MemberRoles.ADMIN]
)
async def ban_user(message: types.Message):
command = await getCommandArgs(message)
reason = getArgument(command.arguments)
command = await get_command_args(message)
to_user = command.to_user
from_user = command.from_user
@ -43,13 +39,17 @@ async def ban_user(message: types.Message):
Restriction.create(
from_user=from_user,
to_user=to_user,
action="Ban user",
reason=reason,
text=message.text,
message_id=message.message_id
)
@dp.message_handler(commands=["unban","sunban"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
@dp.message_handler(
commands=["unban", "sunban"],
commands_prefix="!",
available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN]
)
async def unban_user(message: types.Message):
command = await getCommandArgs(message)
command = await get_command_args(message)
to_user = command.to_user
from_user = command.from_user
@ -62,22 +62,27 @@ async def unban_user(message: types.Message):
)
return
# Unban user and set status (bool)
# Unban user and set status
status = await bot.unban_chat_member(chat_id=message.chat.id, user_id=to_user.user_id)
if status and (not command.is_silent):
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has unbanned [{to_user.first_name}](tg://user?id={to_user.user_id})",parse_mode="Markdown")
Member.create(
user_id = to_user.user_id,
first_name = to_user.first_name,
username = to_user.username
Restriction.create(
from_user=from_user,
to_user=to_user,
text=message.text,
message_id=message.message_id
)
@dp.message_handler(commands=["info"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
@dp.message_handler(
commands=["info"],
commands_prefix="!",
available_roles=[MemberRoles.HELPER, MemberRoles.ADMIN]
)
async def info_user(message: types.Message):
command = await getCommandArgs(message)
command = await get_command_args(message)
to_user = command.to_user
@ -94,16 +99,17 @@ async def info_user(message: types.Message):
parse_mode="Markdown"
)
@dp.message_handler(commands=["kick","skick"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
async def kick_user(message:types.Message):
command = await getCommandArgs(message)
arguments = command.arguments
@dp.message_handler(
commands=["kick", "skick"],
commands_prefix="!",
available_roles=[MemberRoles.HELPER, MemberRoles.ADMIN]
)
async def kick_user(message: types.Message):
command = await get_command_args(message)
to_user = command.to_user
from_user = command.from_user
reason = getArgument(arguments)
if (not to_user) or (not from_user):
await message.answer((
"Usage: !kick (@username|id) reason=None\n"
@ -119,22 +125,29 @@ async def kick_user(message:types.Message):
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has kicked [{to_user.first_name}](tg://user?id={to_user.user_id})",parse_mode="Markdown")
Restriction.create(
from_user=from_user,
to_user=to_user,
action="Kick user",
reason=reason,
text=message.text,
message_id=message.message_id
)
@dp.message_handler(commands=["mute","smute"],commands_prefix="!",available_roles=[MemberRoles.ADMIN])
@dp.message_handler(
commands=["mute", "smute"],
commands_prefix="!",
available_roles=[MemberRoles.ADMIN]
)
async def mute_user(message:types.Message):
command = await getCommandArgs(message)
arguments = command.arguments
command = await get_command_args(message)
to_user = command.to_user
from_user = command.from_user
duration = await parse_timedelta_from_message(message)
if (not to_user) or (not from_user):
await message.answer((
"Usage:!mute (@username|id) (duration)\n"
@ -142,25 +155,6 @@ async def mute_user(message:types.Message):
)
return
duration_string = parse_duration(arguments)
duration = None
reason = None
if (duration_string):
duration = utils.parse_timedelta(duration_string)
if (not duration):
await message.answer(f"Error: \"{duration}\" — неверный формат времени. Examles: 3ч, 5м, 4h30s.")
return
reason = delete_substring_from_string(" ".join(arguments),duration_string)
if (not duration_string):
duration_string = "forever"
if (arguments):
reason = " ".join(arguments)
permissions = ChatPermissions(can_send_messages=False)
status = await bot.restrict_chat_member(
@ -171,21 +165,25 @@ async def mute_user(message:types.Message):
)
if status and (not command.is_silent):
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has muted [{to_user.first_name}](tg://user?id={to_user.user_id}) for {duration_string}",parse_mode="Markdown")
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has muted [{to_user.first_name}](tg://user?id={to_user.user_id})",parse_mode="Markdown")
Restriction.create(
from_user=from_user,
to_user=to_user,
action="Mute user",
reason=reason,
text=message.text,
message_id=message.message_id
)
@dp.message_handler(commands=["unmute","sunmute"],commands_prefix="!",available_roles=[MemberRoles.ADMIN])
@dp.message_handler(
commands=["unmute","sunmute"],
commands_prefix="!",
available_roles=[MemberRoles.ADMIN]
)
async def umute_user(message: types.Message):
# Get information
command = await getCommandArgs(message)
command = await get_command_args(message)
to_user = command.to_user
from_user = command.from_user
@ -203,14 +201,14 @@ async def umute_user(message: types.Message):
# Set permissions
permissions = ChatPermissions(
can_send_messages= group_permissions["can_send_messages"],
can_send_media_messages= group_permissions["can_send_media_messages"],
can_send_polls= group_permissions["can_send_polls"],
can_send_other_messages= group_permissions["can_send_other_messages"],
can_add_web_page_previews= group_permissions["can_add_web_page_previews"],
can_change_info= group_permissions["can_change_info"],
can_invite_users= group_permissions["can_invite_users"],
can_pin_messages= group_permissions["can_pin_messages"]
can_send_messages = group_permissions["can_send_messages"],
can_send_media_messages = group_permissions["can_send_media_messages"],
can_send_polls = group_permissions["can_send_polls"],
can_send_other_messages = group_permissions["can_send_other_messages"],
can_add_web_page_previews = group_permissions["can_add_web_page_previews"],
can_change_info = group_permissions["can_change_info"],
can_invite_users = group_permissions["can_invite_users"],
can_pin_messages = group_permissions["can_pin_messages"]
)
# Restrict user and save
@ -223,41 +221,54 @@ async def umute_user(message: types.Message):
if status and (not command.is_silent):
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has unmuted [{to_user.first_name}](tg://user?id={to_user.user_id})",parse_mode="Markdown")
@dp.message_handler(commands=["pin"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
async def pin_message(message:types.Message):
@dp.message_handler(
commands=["pin"],
commands_prefix="!",
available_roles=[MemberRoles.HELPER, MemberRoles.ADMIN]
)
async def pin_message(message: types.Message):
await bot.pin_chat_message(message.chat.id, message.reply_to_message.message_id)
@dp.message_handler(commands=["readonly","ro"],commands_prefix="!",available_roles=[MemberRoles.ADMIN])
async def readonly_mode(message:types.Message):
@dp.message_handler(
commands=["readonly","ro"],
commands_prefix="!",
available_roles=[MemberRoles.ADMIN]
)
async def readonly_mode(message: types.Message):
group_permissions = config.group_permissions
status = config.group_permissions['can_send_messages']
if (status):
await message.answer("🔕 Readonly mode enabled!")
chat_permissions = ChatPermissions(
can_send_messages=not status
can_send_messages = not status
)
else:
await message.answer("🔔 Readonly mode disabled!")
chat_permissions = ChatPermissions(
can_send_messages=group_permissions['can_send_messages'],
can_send_media_messages=group_permissions["can_send_media_messages"],
can_send_other_messages=group_permissions['can_send_other_messages'],
can_send_polls=group_permissions['can_send_polls'],
can_invite_users=group_permissions['can_invite_users'],
can_change_info=group_permissions['can_change_info'],
can_add_web_page_previews=group_permissions['can_add_web_page_previews'],
can_pin_messages=group_permissions['can_pin_messages']
can_send_messages = group_permissions['can_send_messages'],
can_send_media_messages = group_permissions["can_send_media_messages"],
can_send_other_messages = group_permissions['can_send_other_messages'],
can_send_polls = group_permissions['can_send_polls'],
can_invite_users = group_permissions['can_invite_users'],
can_change_info = group_permissions['can_change_info'],
can_add_web_page_previews = group_permissions['can_add_web_page_previews'],
can_pin_messages = group_permissions['can_pin_messages']
)
config.group_permissions["can_send_messages"] = not status
await bot.set_chat_permissions(chat_id=message.chat.id, permissions=chat_permissions)
await bot.set_chat_permissions(chat_id = message.chat.id, permissions = chat_permissions)
@dp.message_handler(commands=["warn","w"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
@dp.message_handler(
commands=["warn","w"],
commands_prefix="!",
available_roles=[MemberRoles.HELPER, MemberRoles.ADMIN]
)
async def warn_user(message: types.Message):
# Get information
command = await getCommandArgs(message)
reason = getArgument(command.arguments)
command = await get_command_args(message)
to_user = command.to_user
from_user = command.from_user
@ -279,36 +290,42 @@ async def warn_user(message: types.Message):
await message.answer(f"[{to_user.first_name}](tg://user?id={to_user.user_id}) has been banned!",parse_mode="Markdown")
await bot.kick_chat_member(chat_id=message.chat.id, user_id=to_user.user_id, until_date=None)
Restriction.create(
to_user=to_user,
from_user=from_user,
action="Warn user",
reason=reason,
to_user=to_user,
text=message.text,
message_id=message.message_id
)
@dp.message_handler(commands=["reload"],commands_prefix="!")
async def reload(message:types.Message):
@dp.message_handler(
commands=["reload"],
commands_prefix="!"
)
async def reload(message: types.Message):
from load import tgc
if (not Member.search(Member.role,"owner")):
owner_exists = Member.get_or_none(Member.role == "owner")
if (not owner_exists):
Member.create(
user_id = message.from_user.id,
first_name = message.from_user.first_name,
username = message.from_user.username,
role="owner",
role = "owner",
)
# TODO: do this every 1 hours
members = await tgc.members_list(config.group_id)
for member in members:
user = Member.search(Member.user_id,member["id"])
user = Member.get_or_none(Member.user_id == member["id"])
if (not user):
Member.create(
user_id=member["id"],
first_name=member["first_name"],
username=member["username"],
user_id = member["id"],
first_name = member["first_name"],
username = member["username"],
)
else:
user.first_name = member["first_name"]
@ -325,10 +342,14 @@ async def reload(message:types.Message):
await message.answer("Reloaded!")
@dp.message_handler(commands=["setrole"],commands_prefix="!",available_roles=[MemberRoles.ADMIN])
@dp.message_handler(
commands=["setrole"],
commands_prefix="!",
available_roles=[MemberRoles.ADMIN]
)
async def set_role(message:types.Message):
command = await getCommandArgs(message)
new_role = getArgument(command.arguments)
command = await get_command_args(message)
new_role = get_argument(command.arguments)
to_user = command.to_user
from_user = command.from_user
@ -351,5 +372,5 @@ async def set_role(message:types.Message):
to_user.role = new_role
to_user.save()
await message.answer(f"{new_role.capitalize()} role set for [{to_user.first_name}](tg://user?id={to_user.user_id})",
parse_mode="Markdown")
await message.answer(f"{new_role.capitalize()} role set for \
[{to_user.first_name}](tg://user?id={to_user.user_id})",parse_mode="Markdown")

View File

@ -45,7 +45,7 @@ async def about_us(message:types.Message):
@dp.message_handler(Text(equals=["Check restrictions"]),state=None)
async def check_for_restrict(message:types.Message):
user = Member.get(Member.user_id == message.from_user.id)
restrictions = Restriction.search(to_user=user)
restrictions = Restriction.select().where(Restriction.to_user == user)
if (not restrictions):
await message.answer("✅No restrictions.")
@ -55,8 +55,19 @@ async def check_for_restrict(message:types.Message):
callback = report_callback.new(restriction_id=restriction.id)
markup = report_button("✉️ Report restriction",callback)
await message.answer(f"Restriction\n{restriction.operation}\nReason:{restriction.reason}\nDate:{restriction.timestamp}",
reply_markup=markup)
from_user = restriction.from_user
to_user = restriction.to_user
await message.answer(
(
f"Restriction #{restriction.id}\n"
f"from user: [{from_user.first_name}](tg://user?id={from_user.user_id})\n"
f"to user: [{from_user.first_name}](tg://user?id={to_user.user_id})\n"
f"{restriction.text}\n"
f"{restriction.timestamp}\n"
),parse_mode="Markdown",
reply_markup=markup
)
await States.state1.set()
@ -84,25 +95,31 @@ async def get_message_report(message:types.Message, state:FSMContext):
if not ("Cancel" in answer):
data = await state.get_data()
restriction_id = data.get("restriction_id")
restriction = Restriction.search(id=restriction_id)
if (restriction is None):
return
restriction = Restriction.get(id=restriction_id)
from_user = restriction.from_user
to_user = restriction.to_user
reason = restriction.reason
if (not reason):
reason = "No reason"
await bot.send_message(config.second_group_id,(
f"Report on restriction #{restriction_id}\n"
f"From user:[{from_user.first_name}](tg://user?id={from_user.id})\n"
f"To user:[{from_user.first_name}](tg://user?id={to_user.id})\n"
f"Reason:{reason}\n"
f"{answer}"
),parse_mode="Markdown")
await bot.send_message(config.second_group_id,
(
"Report on restriction #{}\n"
"from user: [{}](tg://user?id={})\n"
"to user: [{}](tg://user?id={})\n"
"{}\n"
"{}\n"
"Message:{}"
).format(
restriction_id,
from_user.first_name,
from_user.user_id,
to_user.first_name,
to_user.user_id,
restriction.text,
restriction.timestamp,
answer,
)
,parse_mode="Markdown"
)
await message.answer("Report restriction sended",reply_markup=ReplyKeyboardRemove())
else:

View File

@ -4,6 +4,5 @@ from .default_commands import set_default_commands
from .telegram_client import TelegramClient
from .parse_timedelta import parse_timedelta
from .virustotal import VirusTotalAPI
from .arguments_parser import getArgument,getCommandArgs,checkArg,parse_duration,delete_substring_from_string
from .command_parser import get_argument, get_command_args
from .parse_timedelta import parse_timedelta_from_message

View File

@ -1,86 +0,0 @@
from dataclasses import dataclass
from database import Member
from aiogram import types
import re
def getArgument(arguments:list,index:int=0) -> str | None:
""" Get element from a list.If element not exist return None """
if not (arguments):
return None
if (len(arguments) > index):
return arguments[index]
else:
return None
@dataclass
class CommandArguments:
to_user:Member | None
from_user:Member | None
arguments:list
is_silent:bool
async def getCommandArgs(message: types.Message) -> CommandArguments:
"""
Describe user data and arguments from message
!command (username|id) ...
"""
silent = False
if (message.text.split()[0] == "s"):
silent = True
arguments = message.text.split()[1:]
to_user = None
from_user = Member.search(Member.user_id, message.from_user.id)
# If message replied
if (message.reply_to_message):
to_user = Member.search(Member.user_id, message.reply_to_message)
else:
user_data = getArgument(arguments)
if (user_data):
if (user_data.isdigit()):
to_user = Member.search(Member.user_id, user_data)
if (user_data[0] == "@"):
to_user = Member.search(Member.username, user_data)
if (arguments) and (not to_user):
await message.answer(f"❌ User {to_user} not exist.")
arguments = arguments[1:]
return CommandArguments(to_user, from_user, arguments,silent)
def delete_substring_from_string(string:str,substring:str) -> str:
string_list = string.split(substring)
return "".join(string_list).lstrip()
def parse_duration(message) -> str:
duration = re.findall(r"(\d+d|\d+h|\d+m|\d+s)",''.join(message))
duration = " ".join(duration)
return duration
def checkArg(message:str) -> bool | None:
""" Check if first argument in ["enable","on","true"] then return true """
if (not message):
return None
argument = message.split()
argument = getArgument(message.split(),1)
if (argument is None):
return None
on = ['enable','on','true']
off = ['disable','off','false']
if (argument in on):
return True
if (argument in off):
return False

47
utils/command_parser.py Normal file
View File

@ -0,0 +1,47 @@
import typing
from dataclasses import dataclass
from database import Member
from load import types
def get_argument(arguments:list,index:int=0) -> typing.Optional[str]:
""" Get element from a list.If element not exist return None """
if not (arguments):
return None
if (len(arguments) > index):
return arguments[index]
else:
return None
@dataclass
class CommandArguments:
to_user:Member | None
from_user:Member | None
arguments:list
is_silent:bool
async def get_command_args(message: types.Message) -> CommandArguments:
"""Describe user data and arguments from message"""
silent = message.text.split()[0] == "s"
arguments = message.text.split()[1:]
to_user = None
from_user = Member.get(Member.user_id == message.from_user.id)
# If message replied
if (message.reply_to_message):
to_user = Member.get_or_none(Member.user_id == message.reply_to_message)
else:
user_data = get_argument(arguments)
if (user_data):
if (user_data.isdigit()):
to_user = Member.get(Member.user_id == user_data)
if (user_data[0] == "@"):
to_user = Member.get(Member.username == user_data)
arguments = arguments[1:]
return CommandArguments(to_user, from_user, arguments, silent)

View File

@ -1,12 +1,31 @@
import re
import datetime as dt
from typing import Union
import typing
def parse_timedelta(specification: str) -> Union[None, dt.timedelta]:
specification = specification.strip().replace(' ', '')
import datetime
from load import types
def parse_timedelta(value: str) -> typing.Optional[datetime.timedelta]:
specification = value.strip().replace(' ', '')
match = re.fullmatch(r'(?:(\d+)(?:d|д))?(?:(\d+)(?:h|ч))?(?:(\d+)(?:m|м))?(?:(\d+)(?:s|с))?', specification)
if match:
units = [(0 if i is None else int(i)) for i in match.groups()]
return dt.timedelta(days=units[0], hours=units[1], minutes=units[2], seconds=units[3])
return datetime.timedelta(days=units[0], hours=units[1], minutes=units[2], seconds=units[3])
else:
return None
async def parse_timedelta_from_message(
message: types.Message,
) -> typing.Optional[datetime.timedelta]:
_, *args = message.text.split()
if args:
duration = re.findall(r"(\d+d|\d+h|\d+m|\d+s)",''.join(message.text))
duration = " ".join(duration)
duration = parse_timedelta(duration)
return duration
else:
return datetime.timedelta(0,0,0) # forever

View File

@ -1,5 +1,6 @@
from pyrogram.client import Client
class TelegramClient:
def __init__(self,api_id,api_hash,token):
self.api_id = api_id
@ -11,7 +12,6 @@ class TelegramClient:
bot_token=self.token
)
async def members_list(self,chat_id:int):
members = []

View File

@ -1,77 +0,0 @@
import io
from typing import Union,Any
import aiohttp
# TODO: skip queue virustotal
class VirusTotalAPI:
def __init__(self,apikey:str,local_telegram_api:bool):
self.apikey = apikey
self.local_telegram_api = local_telegram_api
async def __download_file(self,filepath:str,
*args,**kw) -> Union[io.BytesIO,Any]:
if ( self.local_telegram_api ):
with open(filepath,'rb') as bf:
return io.BytesIO(bf.read())
else:
from load import bot
return await bot.download_file(filepath,
*args,**kw)
async def __file_scan(self,filepath) -> None:
file = await self.__download_file(filepath)
url = "https://www.virustotal.com/vtapi/v2/file/scan"
params = {"apikey":self.apikey,"file":file}
async with aiohttp.ClientSession() as session:
response = await session.post(url,data=params)
response = await response.json()
return response["sha1"]
async def __file_report(self,resource) -> dict:
url = "https://www.virustotal.com/vtapi/v2/file/report"
params = {"apikey":self.apikey,"resource":resource}
async with aiohttp.ClientSession() as session:
response = await session.get(url,params=params)
response = await response.json()
return response
def format_output(self,file_report:dict) -> str:
"""Format file_report
File Analys
Status:Infected/Clear
Positives:positives/total percent%
File Report
"""
total = file_report["total"]
positives = file_report["positives"]
permalink = file_report["permalink"]
percent = round(positives/total*100)
if (percent >= 40):
status = "Infected ☣️"
else:
status = "Clear ✅"
output = (
(
"File Analys\n"
f"Detected:{positives}/{total} %{percent}\n"
f"Status:{status}\n"
f"[File Report]({permalink})\n"
)
)
return output
async def scan_file(self,filepath:str) -> str:
resource = await self.__file_scan(filepath)
file_report = await self.__file_report(resource)
return file_report