Some fix database and etc
This commit is contained in:
parent
2f7cae39f4
commit
d390d2ebe9
21 changed files with 437 additions and 478 deletions
|
@ -1,5 +1,7 @@
|
|||
bot_token = ""
|
||||
|
||||
limit_of_warns = 5
|
||||
|
||||
api_id = ""
|
||||
api_hash = ""
|
||||
|
||||
|
|
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -9,3 +9,4 @@ modules/__pycache__
|
|||
pyrightconfig.json
|
||||
session.session
|
||||
session.session-journal
|
||||
venv
|
||||
|
|
|
@ -40,7 +40,8 @@ Logging admin command actions in database.
|
|||
- [ ] Analys file for malware 🔎
|
||||
- [ ] Paste text to PasteBin or PrivNote 📋
|
||||
- [ ] Site for group moderator 🌍
|
||||
- [ ] Fix database errors ForeignKeys
|
||||
- [ ] Change PeeWee to SQLAlchemy
|
||||
- [x] Some fix in database
|
||||
|
||||
## Support
|
||||
Every investition helps in maintaining this project and making it better.
|
||||
|
|
9
app.py
9
app.py
|
@ -1,7 +1,8 @@
|
|||
#!/usr/bin/env python3
|
||||
import logging
|
||||
from aiogram import executor
|
||||
from database import models
|
||||
from database import build
|
||||
|
||||
|
||||
from load import dp, bot
|
||||
import filters
|
||||
|
@ -12,7 +13,6 @@ dp.filters_factory.bind(filters.ReplayMessageFilter)
|
|||
import handlers
|
||||
import config
|
||||
|
||||
|
||||
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
|
||||
|
||||
WEBAPP_HOST = '127.0.0.1'
|
||||
|
@ -43,9 +43,9 @@ async def on_shutdown(dp):
|
|||
await dp.storage.wait_closed()
|
||||
|
||||
def main() -> None:
|
||||
models.build()
|
||||
build()
|
||||
|
||||
if config.use_webhook:
|
||||
if config.USE_WEBHOOK:
|
||||
executor.start_webhook(
|
||||
dispatcher=dp,
|
||||
webhook_path=WEBHOOK_PATH,
|
||||
|
@ -58,7 +58,6 @@ def main() -> None:
|
|||
|
||||
else:
|
||||
executor.start_polling(dp,skip_updates=True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
|
|
@ -4,13 +4,15 @@ from environs import Env
|
|||
env = Env()
|
||||
env.read_env()
|
||||
|
||||
use_webhook = True
|
||||
USE_WEBHOOK = True
|
||||
|
||||
# bot token
|
||||
token = env.str("bot_token")
|
||||
|
||||
group_id = env.str("group_id")
|
||||
second_group_id = env.str("second_group_id")
|
||||
group_id = env.int("group_id")
|
||||
second_group_id = env.int("second_group_id")
|
||||
|
||||
limit_of_warns = 5
|
||||
|
||||
# Telegram Application
|
||||
api_id = env.int("api_id")
|
||||
|
@ -19,6 +21,7 @@ api_hash = env.str("api_hash")
|
|||
# Virus Total API
|
||||
vt_api = env.str("vt_api")
|
||||
|
||||
|
||||
group_permissions = {
|
||||
"can_send_messages":True,
|
||||
"can_send_media_messages":False,
|
||||
|
@ -32,11 +35,5 @@ group_permissions = {
|
|||
|
||||
db_url = env.str("db_url")
|
||||
|
||||
# telegram-bot-api-service
|
||||
telegram_api_server = env.str("telegram_api_server").split(":")
|
||||
telegram_api_server = {
|
||||
"ip":telegram_api_server[0],
|
||||
"port":telegram_api_server[1]
|
||||
}
|
||||
|
||||
telegram_api_server = f"http://{telegram_api_server['ip']}:{telegram_api_server['port']}"
|
||||
telegram_api_server = f"http://{telegram_api_server[0]}:{telegram_api_server[1]}"
|
||||
|
|
87
database.py
Normal file
87
database.py
Normal file
|
@ -0,0 +1,87 @@
|
|||
from peewee import Field, Model, BigIntegerField, CharField, DateField, DateTimeField, ForeignKeyField
|
||||
|
||||
import config
|
||||
from playhouse.db_url import connect
|
||||
|
||||
from datetime import datetime, date
|
||||
|
||||
from enum import Enum
|
||||
class MemberRoles(Enum):
|
||||
OWNER = "owner"
|
||||
ADMIN = "admin"
|
||||
HELPER = "helper"
|
||||
MEMBER = "member"
|
||||
|
||||
|
||||
db = connect(config.db_url)
|
||||
|
||||
class Member(Model):
|
||||
user_id = BigIntegerField()
|
||||
first_name = CharField()
|
||||
username = CharField(null=True)
|
||||
|
||||
warns = BigIntegerField(default=0)
|
||||
|
||||
role = CharField(default="member")
|
||||
|
||||
joined = DateField(default=date.today())
|
||||
|
||||
|
||||
@staticmethod
|
||||
def exists(fieldname, value) -> bool | None:
|
||||
"""Check if data exists in db"""
|
||||
query = Member.select().where(fieldname == value)
|
||||
|
||||
if (query is None):
|
||||
return None
|
||||
|
||||
return query.exists()
|
||||
|
||||
@staticmethod
|
||||
def search(fieldname:Field, value):
|
||||
if (not Member.exists(fieldname, value)):
|
||||
return None
|
||||
|
||||
return Member.get(fieldname == value)
|
||||
|
||||
@staticmethod
|
||||
def report(delete=False):
|
||||
"""If the user exists, returns number reports. Gives the user a warning or retrieves it."""
|
||||
count = Member.warns
|
||||
|
||||
if delete:count -= 1
|
||||
else:count += 1
|
||||
|
||||
Member.update(warns = count).execute()
|
||||
|
||||
return count
|
||||
|
||||
class Meta:
|
||||
db_table = "members"
|
||||
database = db
|
||||
|
||||
class Restriction(Model):
|
||||
# TODO: not forget rename all operation to action
|
||||
action = CharField()
|
||||
|
||||
from_user = ForeignKeyField(Member, lazy_load=True)
|
||||
to_user = ForeignKeyField(Member, lazy_load=True)
|
||||
|
||||
reason = CharField(null=True)
|
||||
timestamp = DateTimeField(default=datetime.now().replace(microsecond=0))
|
||||
|
||||
@staticmethod
|
||||
def search(to_user=None,id=None):
|
||||
if (id):
|
||||
query = Restriction.get(Restriction.id == id)
|
||||
if (to_user):
|
||||
query = Restriction.select().where(Restriction.to_user == to_user)
|
||||
|
||||
return query
|
||||
|
||||
class Meta:
|
||||
db_table = "auditlog"
|
||||
database = db
|
||||
|
||||
def build() -> None:
|
||||
db.create_tables([Member,Restriction])
|
|
@ -1 +0,0 @@
|
|||
from .database import Database
|
|
@ -1,109 +0,0 @@
|
|||
from .models import Member,Restriction
|
||||
from peewee import Field
|
||||
|
||||
class Database:
|
||||
def check_data_exists(self, fieldname:Field, value) -> bool | None:
|
||||
"""Check if data exists in db"""
|
||||
query = Member.select().where(fieldname == value)
|
||||
|
||||
if (query is None):
|
||||
return None
|
||||
|
||||
return query.exists()
|
||||
|
||||
def register_user(self, user_id, first_name, user_name=None, role:str='member') -> bool:
|
||||
"""If the user doesn't exist, returns true. Registers a user in the db."""
|
||||
|
||||
if self.check_data_exists(Member.user_id,user_id):
|
||||
return False
|
||||
|
||||
Member.create(
|
||||
user_id = user_id,
|
||||
first_name = first_name,
|
||||
user_name = user_name,
|
||||
|
||||
role = role,
|
||||
|
||||
reports = 0,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
def search_single_member(self,fieldname:Field,value) -> Member | None:
|
||||
"""If the user is found, returns dataclass. Returns user info."""
|
||||
exists = self.check_data_exists(fieldname,value)
|
||||
|
||||
if not (exists):
|
||||
return None
|
||||
|
||||
user = Member.get(fieldname == value)
|
||||
|
||||
return user
|
||||
|
||||
def create_restriction(self, from_user_id, to_user_id, operation, reason):
|
||||
from_admin = self.search_single_member(Member.user_id,to_user_id)
|
||||
to_user = self.search_single_member(Member.user_id,from_user_id)
|
||||
|
||||
if not (from_admin) or not (to_user):
|
||||
return None
|
||||
|
||||
Restriction.create(
|
||||
operation = operation,
|
||||
|
||||
from_admin = from_admin,
|
||||
to_user = to_user,
|
||||
|
||||
reason = reason,
|
||||
)
|
||||
|
||||
def search_user_restriction(self, user_id) -> list[Restriction] | None:
|
||||
user = Member.get(Member.user_id == user_id)
|
||||
|
||||
query = Restriction.select().join(Member,on=Restriction.to_user)
|
||||
|
||||
if (query is None):
|
||||
return None
|
||||
|
||||
return query.where(Restriction.to_user == user)
|
||||
|
||||
def delete_user(self,user_id) -> bool:
|
||||
"""If the user exists, returns true. Deletes the user from the db."""
|
||||
|
||||
exists = self.check_data_exists(Member.user_id,user_id)
|
||||
|
||||
if not (exists):
|
||||
return False
|
||||
|
||||
Member.delete().where(Member.user_id == user_id)
|
||||
|
||||
return True
|
||||
|
||||
def update_member_data(self, user_id, fieldnames:list[Field], newvalues:list) -> bool:
|
||||
"""Update member data."""
|
||||
exists = self.check_data_exists(Member.user_id,user_id)
|
||||
|
||||
if (not exists):
|
||||
return False
|
||||
|
||||
for i in range(len(newvalues)):
|
||||
query = Member.update({fieldnames[i]:newvalues[i]}).where(Member.user_id == user_id).execute()
|
||||
if (query is None):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def change_reports(self,user_id,delete=False) -> int | None:
|
||||
"""If the user exists, returns number reports. Gives the user a warning or retrieves it."""
|
||||
exists = self.check_data_exists(Member.user_id,user_id)
|
||||
|
||||
if not (exists):
|
||||
return False
|
||||
|
||||
count = Member.get(Member.user_id == user_id).reports
|
||||
|
||||
if delete:count += 1
|
||||
else:count -= 1
|
||||
|
||||
query = Member.update(reports = count).where(Member.user_id == user_id).execute()
|
||||
|
||||
return count
|
|
@ -1,46 +0,0 @@
|
|||
from peewee import Model, BigIntegerField, CharField, DateField, DateTimeField, ForeignKeyField
|
||||
|
||||
import config
|
||||
from playhouse.db_url import connect
|
||||
|
||||
from datetime import datetime, date
|
||||
|
||||
from enum import Enum
|
||||
class MemberRoles(Enum):
|
||||
OWNER = "owner"
|
||||
ADMIN = "admin"
|
||||
HELPER = "helper"
|
||||
MEMBER = "member"
|
||||
|
||||
|
||||
db = connect(config.db_url)
|
||||
|
||||
class Member(Model):
|
||||
user_id = BigIntegerField()
|
||||
first_name = CharField()
|
||||
user_name = CharField(null=True)
|
||||
role = CharField()
|
||||
|
||||
join_date = DateField(default=date.today())
|
||||
|
||||
reports = BigIntegerField()
|
||||
|
||||
class Meta:
|
||||
db_table = "members"
|
||||
database = db
|
||||
|
||||
class Restriction(Model):
|
||||
operation = CharField()
|
||||
|
||||
from_admin = ForeignKeyField(Member,lazy_load=False)
|
||||
to_user = ForeignKeyField(Member,lazy_load=False)
|
||||
|
||||
reason = CharField(null=True)
|
||||
date = DateTimeField(default=datetime.now)
|
||||
|
||||
class Meta:
|
||||
db_table = "restrictions"
|
||||
database = db
|
||||
|
||||
def build() -> None:
|
||||
db.create_tables([Member,Restriction])
|
|
@ -1,10 +1,7 @@
|
|||
from aiogram import types
|
||||
from aiogram.dispatcher.filters import BoundFilter
|
||||
|
||||
from database.database import Member
|
||||
from database.models import MemberRoles
|
||||
|
||||
from load import database
|
||||
from database import Member, MemberRoles
|
||||
|
||||
class AvaibleRolesFilter(BoundFilter):
|
||||
"""Filter accessed roles"""
|
||||
|
@ -15,7 +12,12 @@ class AvaibleRolesFilter(BoundFilter):
|
|||
self.avaible_roles = available_roles
|
||||
|
||||
async def check(self,message:types.Message):
|
||||
member = database.search_single_member(Member.user_id,message.from_user.id)
|
||||
member = Member.search(Member.user_id,message.from_user.id)
|
||||
|
||||
if (member is None):
|
||||
return False
|
||||
|
||||
# member = database.search_single_member(Member.user_id,message.from_user.id)
|
||||
|
||||
if (member.role == "owner"):
|
||||
return True
|
||||
|
|
|
@ -3,4 +3,4 @@ from load import dp,types
|
|||
# TODO: channel post forward in chat
|
||||
@dp.channel_post_handler()
|
||||
async def channel_handler(message:types.Message):
|
||||
print(message.text)
|
||||
pass
|
||||
|
|
|
@ -1,248 +1,197 @@
|
|||
from load import bot, database, dp, types
|
||||
from load import bot, dp, types
|
||||
from aiogram.types.chat_permissions import ChatPermissions
|
||||
|
||||
import config
|
||||
import utils
|
||||
|
||||
from database.models import Member
|
||||
|
||||
import re
|
||||
from database import Member, Restriction
|
||||
from database import MemberRoles
|
||||
|
||||
from dataclasses import dataclass
|
||||
from utils import getCommandArgs, getArgument, checkArg, parse_duration, delete_substring_from_string
|
||||
|
||||
from database.models import MemberRoles
|
||||
|
||||
def getArgument(arguments:list,index:int=0) -> str | None:
|
||||
""" Get element from a list.If element not exist return None """
|
||||
if not (arguments):
|
||||
return None
|
||||
|
||||
if (len(arguments) > index):
|
||||
return arguments[index]
|
||||
else:
|
||||
return None
|
||||
# Filters
|
||||
# is_admin=True - Check admin permission, if user is admin, continue.
|
||||
# replied=True - If message is answer, continue.
|
||||
# accessed_roles - list roles.
|
||||
|
||||
@dataclass
|
||||
class CommandArguments:
|
||||
user:Member | None
|
||||
arguments:list
|
||||
|
||||
async def getCommandArgs(message:types.Message) -> CommandArguments:
|
||||
""" Describe user data and arguments from message """
|
||||
|
||||
"""
|
||||
!command (@username/id) reason=None
|
||||
"""
|
||||
|
||||
arguments_list = message.text.split()[1:]
|
||||
|
||||
is_reply = message.reply_to_message
|
||||
|
||||
member = None
|
||||
arguments = []
|
||||
|
||||
if (is_reply):
|
||||
member = database.search_single_member(Member.user_id,message.reply_to_message)
|
||||
arguments = arguments_list
|
||||
else:
|
||||
first_word = getArgument(arguments_list)
|
||||
if (first_word):
|
||||
if (first_word.isdigit()):
|
||||
member = database.search_single_member(Member.user_id,first_word)
|
||||
|
||||
if (first_word[0] == "@") :
|
||||
member = database.search_single_member(Member.user_name,first_word)
|
||||
|
||||
arguments = arguments_list[1:]
|
||||
else:
|
||||
arguments = arguments_list
|
||||
|
||||
if (member is None) and (first_word):
|
||||
await message.answer(f"❌ User {first_word} not exist.")
|
||||
|
||||
return CommandArguments(member,arguments)
|
||||
|
||||
def checkArg(message:str) -> bool | None:
|
||||
""" Check if first argument in ["enable","on","true"] then return true """
|
||||
if (not message):
|
||||
return None
|
||||
|
||||
argument = message.split()
|
||||
argument = getArgument(message.split(),1)
|
||||
|
||||
if (argument is None):
|
||||
return None
|
||||
|
||||
on = ['enable','on','true']
|
||||
off = ['disable','off','false']
|
||||
|
||||
if (argument in on):
|
||||
return True
|
||||
if (argument in off):
|
||||
return False
|
||||
|
||||
|
||||
def delete_substring_from_string(string:str,substring:str) -> str:
|
||||
string_list = string.split(substring)
|
||||
return "".join(string_list).lstrip()
|
||||
|
||||
# Filters:
|
||||
# is_admin=True - Check admin permission, if user is admin, continue.
|
||||
# replied=True - If message is answer, continue.
|
||||
# accessed_roles - list roles.
|
||||
|
||||
@dp.message_handler(commands=["ban"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.ADMIN,MemberRoles.HELPER])
|
||||
@dp.message_handler(commands=["ban"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
|
||||
async def ban_user(message: types.Message):
|
||||
"""
|
||||
!ban (@username/id) reason=None
|
||||
"""
|
||||
command = await getCommandArgs(message)
|
||||
reason = getArgument(command.arguments)
|
||||
|
||||
user = command.user
|
||||
admin = message.from_user
|
||||
to_user = command.to_user
|
||||
from_user = command.from_user
|
||||
|
||||
# If can't descibe user data
|
||||
if (user is None):
|
||||
if (not to_user) or (not from_user):
|
||||
await message.answer((
|
||||
"Usage:!ban (@username|id) reason=None.\n"
|
||||
"Reply to a message or use with a username.")
|
||||
"Usage: !ban (@username|id) reason=None\n"
|
||||
"Reply to a message or use with a username")
|
||||
)
|
||||
return
|
||||
|
||||
# Ban user and save (bool)
|
||||
status = await bot.kick_chat_member(chat_id=message.chat.id, user_id=user.user_id, until_date=None)
|
||||
status = await bot.kick_chat_member(chat_id=message.chat.id, user_id=to_user.user_id, until_date=None)
|
||||
|
||||
if status:
|
||||
await message.answer(f"User [{user.first_name}](tg://user?id={user.user_id}) has been banned.",
|
||||
parse_mode="Markdown")
|
||||
|
||||
# Delete user from database
|
||||
database.delete_user(user.user_id)
|
||||
|
||||
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has banned [{to_user.first_name}](tg://user?id={to_user.user_id})",parse_mode="Markdown")
|
||||
|
||||
|
||||
# Open restrict
|
||||
database.create_restriction(admin.id, user.user_id, "ban", reason)
|
||||
Restriction.create(
|
||||
from_user=from_user,
|
||||
to_user=to_user,
|
||||
action="Ban user",
|
||||
reason=reason,
|
||||
)
|
||||
|
||||
|
||||
@dp.message_handler(commands=["unban"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.ADMIN,MemberRoles.HELPER])
|
||||
@dp.message_handler(commands=["unban"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
|
||||
async def unban_user(message: types.Message):
|
||||
"""
|
||||
!unban (@username/id) reason=None
|
||||
"""
|
||||
command = await getCommandArgs(message)
|
||||
user = command.user
|
||||
|
||||
to_user = command.to_user
|
||||
from_user = command.from_user
|
||||
|
||||
# If can't descibe user data
|
||||
if (user is None):
|
||||
if (not to_user) or (not from_user):
|
||||
await message.answer((
|
||||
"Usage:!unban (@username|id) reason=None.\n"
|
||||
"Reply to a message or use with username/id.")
|
||||
"Usage: !unban (@username|id) reason=None\n"
|
||||
"Reply to a message or use with username/id")
|
||||
)
|
||||
return
|
||||
|
||||
# Unban user and set status (bool)
|
||||
status = await bot.unban_chat_member(chat_id=message.chat.id, user_id=user.user_id)
|
||||
|
||||
# add user to database
|
||||
database.register_user(user.user_id, user.first_name)
|
||||
status = await bot.unban_chat_member(chat_id=message.chat.id, user_id=to_user.user_id)
|
||||
|
||||
|
||||
if status:
|
||||
await message.answer(f"User [{user.first_name}](tg://user?id={user.user_id}) has been unbaned.",
|
||||
parse_mode="Markdown")
|
||||
|
||||
|
||||
@dp.message_handler(commands=["kick"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
|
||||
async def kick_user(message:types.Message):
|
||||
"""
|
||||
!kick (@username/id) reason=None
|
||||
"""
|
||||
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has unbanned [{to_user.first_name}](tg://user?id={to_user.user_id})",parse_mode="Markdown")
|
||||
|
||||
Member.create(
|
||||
user_id = to_user.user_id,
|
||||
first_name = to_user.first_name,
|
||||
username = to_user.username
|
||||
)
|
||||
|
||||
@dp.message_handler(commands=["info"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
|
||||
async def info_user(message: types.Message):
|
||||
command = await getCommandArgs(message)
|
||||
|
||||
to_user = command.to_user
|
||||
|
||||
if (not to_user):
|
||||
await message.answer((
|
||||
"Usage: !info (@username|id)\n"
|
||||
"Reply to a message or use with username/id")
|
||||
)
|
||||
return
|
||||
|
||||
await message.answer((
|
||||
f"[{to_user.first_name}](tg://user?id={to_user.user_id}) ({to_user.role})\n"
|
||||
f"Warns: {to_user.warns}/{config.limit_of_warns}"),
|
||||
parse_mode="Markdown"
|
||||
)
|
||||
|
||||
@dp.message_handler(commands=["kick"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
|
||||
async def kick_user(message:types.Message):
|
||||
command = await getCommandArgs(message)
|
||||
arguments = command.arguments
|
||||
|
||||
user = command.user
|
||||
admin = message.from_user
|
||||
to_user = command.to_user
|
||||
from_user = command.from_user
|
||||
|
||||
reason = getArgument(arguments)
|
||||
|
||||
if (user is None):
|
||||
if (not to_user) or (not from_user):
|
||||
await message.answer((
|
||||
"Usage:!kick (@username|id) reason=None.\n"
|
||||
"Reply to a message or use with a username/id.")
|
||||
"Usage: !kick (@username|id) reason=None\n"
|
||||
"Reply to a message or use with a username/id")
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
status1 = await bot.kick_chat_member(chat_id=message.chat.id, user_id=user.user_id, until_date=None)
|
||||
status2 = await bot.unban_chat_member(chat_id=message.chat.id, user_id=user.user_id)
|
||||
status1 = await bot.kick_chat_member(chat_id=message.chat.id, user_id=to_user.user_id, until_date=None)
|
||||
status2 = await bot.unban_chat_member(chat_id=message.chat.id, user_id=to_user.user_id)
|
||||
|
||||
if (status1 and status2):
|
||||
await message.answer(f"User [{user.first_name}](tg://user?id={user.user_id}) has been kicked.",
|
||||
parse_mode="Markdown")
|
||||
if (not status1 and status2):
|
||||
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has kicked [{to_user.first_name}](tg://user?id={to_user.user_id})",parse_mode="Markdown")
|
||||
|
||||
database.create_restriction(admin.id,user.user_id,"kick",reason)
|
||||
|
||||
Restriction.create(
|
||||
from_user=from_user,
|
||||
to_user=to_user,
|
||||
action="Kick user",
|
||||
reason=reason,
|
||||
)
|
||||
|
||||
|
||||
@dp.message_handler(commands=["mute"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.ADMIN])
|
||||
@dp.message_handler(commands=["mute"],commands_prefix="!",available_roles=[MemberRoles.ADMIN])
|
||||
async def mute_user(message:types.Message):
|
||||
"""
|
||||
!mute (@username/id) reason=None
|
||||
"""
|
||||
|
||||
command = await getCommandArgs(message)
|
||||
arguments = command.arguments
|
||||
|
||||
user = command.user
|
||||
admin = message.from_user
|
||||
to_user = command.to_user
|
||||
from_user = command.from_user
|
||||
|
||||
if (user is None):
|
||||
if (not to_user) or (not from_user):
|
||||
await message.answer((
|
||||
"Usage:!mute (@username|id) [duration].\n"
|
||||
"Reply to a message or use with a username/id.")
|
||||
"Usage:!mute (@username|id) (duration)\n"
|
||||
"Reply to a message or use with a username/id")
|
||||
)
|
||||
return
|
||||
|
||||
duration = re.findall(r"(\d+d|\d+h|\d+m|\d+s)",''.join(arguments))
|
||||
duration = " ".join(duration)
|
||||
reason = delete_substring_from_string(" ".join(arguments),duration)
|
||||
duration_timedelta = utils.parse_timedelta(duration)
|
||||
|
||||
if not duration:
|
||||
await message.answer(f"Error: \"{duration}\" — неверный формат времени. Examles: 3ч, 5м, 4h30s.")
|
||||
return
|
||||
|
||||
duration_string = parse_duration(arguments)
|
||||
duration = None
|
||||
reason = None
|
||||
|
||||
if (duration_string):
|
||||
duration = utils.parse_timedelta(duration_string)
|
||||
|
||||
if (not duration):
|
||||
await message.answer(f"Error: \"{duration}\" — неверный формат времени. Examles: 3ч, 5м, 4h30s.")
|
||||
return
|
||||
|
||||
reason = delete_substring_from_string(" ".join(arguments),duration_string)
|
||||
|
||||
if (not duration_string):
|
||||
duration_string = "forever"
|
||||
|
||||
if (arguments):
|
||||
reason = " ".join(arguments)
|
||||
|
||||
permissions = ChatPermissions(can_send_messages=False)
|
||||
|
||||
status = await bot.restrict_chat_member(
|
||||
chat_id=message.chat.id,
|
||||
user_id=user.user_id,
|
||||
until_date=duration_timedelta,
|
||||
user_id=to_user.user_id,
|
||||
until_date=duration,
|
||||
permissions=permissions
|
||||
)
|
||||
|
||||
|
||||
if status:
|
||||
await message.answer(f"User **{user.first_name}** has been muted.",
|
||||
parse_mode="Markdown")
|
||||
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has muted [{to_user.first_name}](tg://user?id={to_user.user_id}) for {duration_string}",parse_mode="Markdown")
|
||||
|
||||
|
||||
database.create_restriction(user.user_id, admin.id, "mute", reason)
|
||||
Restriction.create(
|
||||
from_user=from_user,
|
||||
to_user=to_user,
|
||||
action="Mute user",
|
||||
reason=reason,
|
||||
)
|
||||
|
||||
|
||||
@dp.message_handler(commands=["umute"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.ADMIN])
|
||||
@dp.message_handler(commands=["unmute"],commands_prefix="!",available_roles=[MemberRoles.ADMIN])
|
||||
async def umute_user(message: types.Message):
|
||||
"""
|
||||
!umute (@username/id) reason=None
|
||||
"""
|
||||
# Get information
|
||||
command = await getCommandArgs(message)
|
||||
user = command.user
|
||||
|
||||
to_user = command.to_user
|
||||
from_user = command.from_user
|
||||
|
||||
# If can't
|
||||
if (user is None):
|
||||
if (not to_user) or (not from_user):
|
||||
await message.answer((
|
||||
"Usage:!unmute (@username|id) reason=None.\n"
|
||||
"Reply to a message or use with a username/id.")
|
||||
|
@ -267,29 +216,23 @@ async def umute_user(message: types.Message):
|
|||
# Restrict user and save
|
||||
status = await bot.restrict_chat_member(
|
||||
chat_id=message.chat.id,
|
||||
user_id=user.user_id,
|
||||
user_id=to_user.user_id,
|
||||
permissions=permissions
|
||||
)
|
||||
|
||||
if status:
|
||||
await message.answer(f"User [{user.first_name}](tg://user?id={user.user_id}) has been unmuted.",
|
||||
parse_mode="Markdown")
|
||||
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has unmuted [{to_user.first_name}](tg://user?id={to_user.user_id})",parse_mode="Markdown")
|
||||
|
||||
@dp.message_handler(commands=["pin"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.ADMIN,MemberRoles.HELPER])
|
||||
@dp.message_handler(commands=["pin"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
|
||||
async def pin_message(message:types.Message):
|
||||
await bot.pin_chat_message(message.chat.id, message.reply_to_message.message_id)
|
||||
|
||||
@dp.message_handler(commands=["readonly","ro"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.ADMIN])
|
||||
@dp.message_handler(commands=["readonly","ro"],commands_prefix="!",available_roles=[MemberRoles.ADMIN])
|
||||
async def readonly_mode(message:types.Message):
|
||||
"""
|
||||
!ro/!readonly (@username/id)
|
||||
"""
|
||||
check = checkArg(message.text)
|
||||
|
||||
if (check is None):
|
||||
await message.answer("!ro on/off alias:disable,enable,start,stop.")
|
||||
if (not check):
|
||||
await message.answer("Usage:!ro on,enable,start/off,disable,off\n")
|
||||
return
|
||||
|
||||
# Get chat permissions
|
||||
|
@ -318,13 +261,12 @@ async def readonly_mode(message:types.Message):
|
|||
await message.answer(f"readonly - {check}")
|
||||
|
||||
|
||||
@dp.message_handler(commands=["media"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.ADMIN,MemberRoles.HELPER])
|
||||
@dp.message_handler(commands=["media"],commands_prefix="!",available_roles=[MemberRoles.ADMIN,MemberRoles.HELPER])
|
||||
async def media_content(message: types.Message):
|
||||
check = checkArg(message.text)
|
||||
|
||||
if (check is None):
|
||||
await message.answer("!media on/off alias:disable,enable,start,stop.")
|
||||
if (not check):
|
||||
await message.answer("Usage: !media on,enable,start/off,disable,off")
|
||||
return
|
||||
|
||||
# Get chat permissions
|
||||
|
@ -346,17 +288,16 @@ async def media_content(message: types.Message):
|
|||
status = await bot.set_chat_permissions(chat_id=message.chat.id, permissions=chat_permissions)
|
||||
|
||||
if status:
|
||||
await message.answer(f"media - {check}.")
|
||||
await message.answer(f"media - {check}")
|
||||
|
||||
|
||||
@dp.message_handler(commands=["stickers"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.ADMIN,MemberRoles.HELPER])
|
||||
@dp.message_handler(commands=["stickers"],commands_prefix="!",available_roles=[MemberRoles.ADMIN,MemberRoles.HELPER])
|
||||
async def send_stickers(message: types.Message):
|
||||
# Get arguments
|
||||
check = checkArg(message.text)
|
||||
|
||||
if (check is None):
|
||||
await message.answer("!stickers on/off alias:disable,enable,start,stop")
|
||||
if (not check):
|
||||
await message.answer("Usage: !stickers on,enable,start/off,disable,off")
|
||||
return
|
||||
|
||||
# Get chat permissions
|
||||
|
@ -378,38 +319,71 @@ async def send_stickers(message: types.Message):
|
|||
status = await bot.set_chat_permissions(chat_id=message.chat.id, permissions=chat_permissions)
|
||||
|
||||
if status:
|
||||
await message.answer(f"stickes - {check}.")
|
||||
await message.answer(f"stickes - {check}")
|
||||
|
||||
|
||||
@dp.message_handler(commands=["w","warn"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.ADMIN,MemberRoles.HELPER])
|
||||
@dp.message_handler(commands=["warn","w"],commands_prefix="!",available_roles=[MemberRoles.HELPER,MemberRoles.ADMIN])
|
||||
async def warn_user(message: types.Message):
|
||||
# Get information
|
||||
command = await getCommandArgs(message)
|
||||
reason = getArgument(command.arguments)
|
||||
|
||||
user = command.user
|
||||
admin = message.from_user
|
||||
|
||||
if (user is None):
|
||||
to_user = command.to_user
|
||||
from_user = command.from_user
|
||||
|
||||
if (not to_user) or (not from_user):
|
||||
await message.answer((
|
||||
"Usage:!warn (@username/id) reason=None.\n"
|
||||
"Reply to a message or use with username/id.")
|
||||
"Usage: !warn (@username|id) reason=None\n"
|
||||
"Reply to a message or use with username/id")
|
||||
)
|
||||
return
|
||||
|
||||
# Add warning
|
||||
database.change_reports(user.user_id, delete=True)
|
||||
to_user.warns += 1
|
||||
to_user.save()
|
||||
|
||||
await message.answer(f"User [{user.first_name}](tg://user?id={user.user_id}) has gotten a warning.",
|
||||
parse_mode="Markdown")
|
||||
await message.answer(f"[{from_user.first_name}](tg://user?id={from_user.user_id}) has warned [{to_user.first_name}](tg://user?id={to_user.user_id}) ({to_user.warns}/{config.limit_of_warns})",parse_mode="Markdown")
|
||||
|
||||
database.create_restriction(user.user_id, admin.id, "warn", reason)
|
||||
|
||||
if (to_user.warns == config.limit_of_warns):
|
||||
await message.answer(f"[{to_user.first_name}](tg://user?id={to_user.user_id}) has been banned!",parse_mode="Markdown")
|
||||
await bot.kick_chat_member(chat_id=message.chat.id, user_id=to_user.user_id, until_date=None)
|
||||
|
||||
Restriction.create(
|
||||
to_user=to_user,
|
||||
from_user=from_user,
|
||||
action="Warn user",
|
||||
reason=reason,
|
||||
)
|
||||
|
||||
|
||||
@dp.message_handler(commands=["reload"],commands_prefix="!",available_roles=[MemberRoles.ADMIN,MemberRoles.HELPER])
|
||||
@dp.message_handler(commands=["reload"],commands_prefix="!")
|
||||
async def reload(message:types.Message):
|
||||
await utils.check_user_data()
|
||||
from load import tgc
|
||||
|
||||
if (not Member.search(Member.role,"owner")):
|
||||
Member.create(
|
||||
user_id = message.from_user.id,
|
||||
first_name = message.from_user.first_name,
|
||||
username = message.from_user.username,
|
||||
role="owner",
|
||||
)
|
||||
# TODO: do this every 1 hours
|
||||
members = await tgc.members_list(config.group_id)
|
||||
|
||||
for member in members:
|
||||
user = Member.search(Member.user_id,member["id"])
|
||||
|
||||
if (not user):
|
||||
Member.create(
|
||||
user_id=member["id"],
|
||||
first_name=member["first_name"],
|
||||
username=member["username"],
|
||||
)
|
||||
else:
|
||||
user.first_name = member["first_name"]
|
||||
user.username = member["username"]
|
||||
user.save()
|
||||
|
||||
|
||||
group = await bot.get_chat(message.chat.id)
|
||||
group_permissions = dict(group["permissions"])
|
||||
|
@ -417,33 +391,34 @@ async def reload(message:types.Message):
|
|||
for permission in group_permissions.keys():
|
||||
config.group_permissions[permission] = group_permissions[permission]
|
||||
|
||||
await message.answer(f"✅ The synchronization was successful.")
|
||||
await message.answer("Reloaded!")
|
||||
|
||||
|
||||
@dp.message_handler(commands=["set_role"],commands_prefix="!",
|
||||
available_roles=[MemberRoles.ADMIN])
|
||||
@dp.message_handler(commands=["setrole"],commands_prefix="!",available_roles=[MemberRoles.ADMIN])
|
||||
async def set_role(message:types.Message):
|
||||
command = await getCommandArgs(message)
|
||||
new_role = getArgument(command.arguments)
|
||||
|
||||
user = command.user
|
||||
admin = database.search_single_member(Member.user_id,message.from_user)
|
||||
to_user = command.to_user
|
||||
from_user = command.from_user
|
||||
|
||||
if (user is None) or (new_role is None):
|
||||
if (not to_user) or (not from_user) or (not new_role):
|
||||
await message.answer((
|
||||
"!srole (@username|id) role(owner,admin,helper,member).\n"
|
||||
"!setrole (@username|id) role(owner,admin,helper,member).\n"
|
||||
"Reply to a message or use with username."
|
||||
))
|
||||
return
|
||||
|
||||
if not (new_role in [member.value for member in MemberRoles]):
|
||||
await message.answer(f"Role {new_role} not exists.")
|
||||
await message.answer(f"Role {new_role} not exists")
|
||||
return
|
||||
|
||||
if (admin.user_id == user.user_id):
|
||||
await message.answer("❌ You can't set role yourself.")
|
||||
if (from_user.user_id == to_user.user_id):
|
||||
await message.answer("❌ You can't set role yourself")
|
||||
return
|
||||
|
||||
to_user.role = new_role
|
||||
to_user.save()
|
||||
|
||||
database.update_member_data(user.user_id,[Member.role],[new_role])
|
||||
|
||||
await message.answer(f"{new_role.capitalize()} role set for [{user.first_name}](tg://user?id={user.user_id}).",parse_mode="Markdown")
|
||||
await message.answer(f"{new_role.capitalize()} role set for [{to_user.first_name}](tg://user?id={to_user.user_id})",
|
||||
parse_mode="Markdown")
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
from load import dp, database, types
|
||||
from database.models import Member
|
||||
|
||||
from load import dp, types
|
||||
from database import Member
|
||||
|
||||
# TODO: fix it
|
||||
# import utils
|
||||
|
@ -21,13 +20,18 @@ async def welcome_message(message:types.Message):
|
|||
# User
|
||||
user = message.from_user
|
||||
|
||||
exists = database.check_data_exists(Member.user_id,user.id)
|
||||
|
||||
exists = Member.exists(Member.user_id,user.id)
|
||||
|
||||
if (exists):
|
||||
await message.answer("Спасибо что вы с нами.")
|
||||
|
||||
if not (exists):
|
||||
database.register_user(user.id,user.first_name,user.username)
|
||||
Member.create(
|
||||
user_id = user.id,
|
||||
first_name = user.first_name,
|
||||
username = user.username,
|
||||
)
|
||||
|
||||
# TODO: translate it
|
||||
await message.answer((
|
||||
f"Привет,{user.first_name}\n"
|
||||
|
@ -48,8 +52,6 @@ async def welcome_message(message:types.Message):
|
|||
# for user_message in message.text.lower().split():
|
||||
# if (y in user_message):await message.delete()
|
||||
|
||||
# Joke
|
||||
@dp.message_handler(content_types=types.ContentType.VOICE)
|
||||
async def voice_message(message:types.Message):
|
||||
photo = types.InputFile(path_or_bytesio="media/photo.jpg")
|
||||
await message.answer_photo(photo)
|
||||
pass
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
from load import bot, dp, types
|
||||
|
||||
import config
|
||||
|
||||
from load import database
|
||||
from database.models import Member
|
||||
from database import Member
|
||||
|
||||
@dp.message_handler(commands=["leave"],chat_type=[types.ChatType.SUPERGROUP])
|
||||
async def leave_group(message:types.Message):
|
||||
|
@ -14,8 +12,9 @@ async def leave_group(message:types.Message):
|
|||
if (len(args) < 1) or not ( ' '.join(args[1:]) == "I UNDERSTAND" ):
|
||||
await message.answer("Для того чтобы покинуть чат вам нужно ввести /leave I UNDERSTANT!")
|
||||
return
|
||||
|
||||
database.delete_user(user.id)
|
||||
|
||||
# TODO: rewrite it
|
||||
# database.delete_user(user.id)
|
||||
|
||||
# Ban user and save (bool)
|
||||
status = await bot.kick_chat_member(chat_id=message.chat.id,user_id=user.id,until_date=None)
|
||||
|
@ -37,17 +36,15 @@ async def start_command_group(message:types.Message):
|
|||
|
||||
@dp.message_handler(commands=["bio","me"],chat_type=[types.ChatType.SUPERGROUP])
|
||||
async def get_information(message: types.Message):
|
||||
user = database.search_single_member(Member.user_id,message.from_user.id)
|
||||
user = Member.search(Member.user_id, message.from_user.id)
|
||||
|
||||
role_level = config.roles["level"]
|
||||
if (not user):
|
||||
await message.answer("Something wrong!")
|
||||
return
|
||||
|
||||
if (user is None):
|
||||
await message.answer("❌Sorry,you not member group.")
|
||||
return
|
||||
|
||||
await message.answer((
|
||||
f"User:[{user.first_name}](tg://user?id={user.user_id})\n"
|
||||
f"level:{role_level[user.role]}\n"),
|
||||
f"[{user.first_name}](tg://user?id={user.user_id}) ({user.role})\n"
|
||||
f"Warns: {user.warns}/{config.limit_of_warns}"),
|
||||
parse_mode="Markdown"
|
||||
)
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from load import dp,types,database,bot
|
||||
from database.models import Member
|
||||
from load import dp, types, bot
|
||||
from database import Member, Restriction
|
||||
|
||||
from aiogram.types import KeyboardButton,ReplyKeyboardMarkup
|
||||
from aiogram.types.reply_keyboard import ReplyKeyboardRemove
|
||||
|
@ -19,7 +19,7 @@ from keyboards.inline.callback_data import report_callback
|
|||
@dp.message_handler(commands=["start","help"],chat_type=[types.ChatType.PRIVATE])
|
||||
async def start_command_private(message:types.Message):
|
||||
await message.answer((
|
||||
f"Hello,**{message.from_user.first_name}**!\n"
|
||||
f"Hi, **{message.from_user.first_name}**!\n"
|
||||
"My commands:\n"
|
||||
"\t\t/help /start - read this message.")
|
||||
,parse_mode="Markdown",reply_markup=menu
|
||||
|
@ -44,28 +44,29 @@ async def about_us(message:types.Message):
|
|||
|
||||
@dp.message_handler(Text(equals=["Check restrictions"]),state=None)
|
||||
async def check_for_restrict(message:types.Message):
|
||||
user = message.from_user
|
||||
restrictions = database.search_user_restriction(user_id=user.id)
|
||||
user = Member.get(Member.user_id == message.from_user.id)
|
||||
restrictions = Restriction.search(to_user=user)
|
||||
|
||||
if (restrictions is None):
|
||||
if (not restrictions):
|
||||
await message.answer("✅No restrictions.")
|
||||
return
|
||||
|
||||
for restriction in restrictions:
|
||||
callback = report_callback.new(user_id=message.from_user.id)
|
||||
callback = report_callback.new(restriction_id=restriction.id)
|
||||
markup = report_button("✉️ Report restriction",callback)
|
||||
|
||||
await message.answer(f"Restriction\n{restriction.operation}\nReason:{restriction.reason}\nDate:{restriction.date}",reply_markup=markup)
|
||||
await message.answer(f"Restriction\n{restriction.operation}\nReason:{restriction.reason}\nDate:{restriction.timestamp}",
|
||||
reply_markup=markup)
|
||||
|
||||
await States.state1.set()
|
||||
|
||||
@dp.callback_query_handler(text_contains="report_restriction",state=States.state1)
|
||||
async def report_restriction(call:CallbackQuery,state:FSMContext):
|
||||
async def report_restriction(call:CallbackQuery, state:FSMContext):
|
||||
await call.answer(cache_time=60)
|
||||
|
||||
# callback_data = call.data
|
||||
# restriction_id = callback_data.split(":")[1]
|
||||
|
||||
callback_data = call.data
|
||||
restriction_id = callback_data.split(":")[1]
|
||||
|
||||
markup = ReplyKeyboardMarkup(resize_keyboard=True)
|
||||
cancel = KeyboardButton("❌ Cancel")
|
||||
markup.add(cancel)
|
||||
|
@ -73,31 +74,34 @@ async def report_restriction(call:CallbackQuery,state:FSMContext):
|
|||
await state.update_data(restriction_id=restriction_id)
|
||||
|
||||
await call.message.answer("Please,enter your report.",reply_markup=markup)
|
||||
|
||||
await States.next()
|
||||
|
||||
@dp.message_handler(state=States.state2)
|
||||
async def get_message_report(message: types.Message,state:FSMContext):
|
||||
async def get_message_report(message:types.Message, state:FSMContext):
|
||||
answer = message.text
|
||||
|
||||
if not ("Cancel" in answer):
|
||||
|
||||
restriction = database.search_user_restriction(message.from_user.id)
|
||||
|
||||
if not ("Cancel" in answer):
|
||||
data = await state.get_data()
|
||||
restriction_id = data.get("restriction_id")
|
||||
restriction = Restriction.search(id=restriction_id)
|
||||
|
||||
if (restriction is None):
|
||||
return
|
||||
|
||||
#from_admin = restriction.from_admin
|
||||
#to_user = restriction.to_user
|
||||
from_user = restriction.from_user
|
||||
to_user = restriction.to_user
|
||||
|
||||
reason = restriction.reason
|
||||
if (not reason):
|
||||
reason = "No reason"
|
||||
|
||||
await bot.send_message(config.telegram_log_chat_id,(
|
||||
await bot.send_message(config.second_group_id,(
|
||||
f"Report on restriction #{restriction_id}\n"
|
||||
f"From admin:[{from_admin.first_name}](tg://user?id={from_admin.id})\n"
|
||||
f"To user:[{from_admin.first_name}](tg://user?id={to_user.id})\n"
|
||||
f"From user:[{from_user.first_name}](tg://user?id={from_user.id})\n"
|
||||
f"To user:[{from_user.first_name}](tg://user?id={to_user.id})\n"
|
||||
f"Reason:{reason}\n"
|
||||
f"Message:{answer}"
|
||||
f"{answer}"
|
||||
),parse_mode="Markdown")
|
||||
|
||||
await message.answer("Report restriction sended",reply_markup=ReplyKeyboardRemove())
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from aiogram.utils.callback_data import CallbackData
|
||||
|
||||
|
||||
report_callback = CallbackData("report_restriction","user_id")
|
||||
report_callback = CallbackData("report_restriction","restriction_id")
|
||||
|
|
6
load.py
6
load.py
|
@ -6,12 +6,6 @@ from aiogram.contrib.fsm_storage.memory import MemoryStorage
|
|||
import config
|
||||
import utils
|
||||
|
||||
|
||||
from database.database import Database
|
||||
|
||||
|
||||
database = Database()
|
||||
|
||||
storage = MemoryStorage()
|
||||
|
||||
# Create client connection
|
||||
|
|
BIN
media/photo.jpg
BIN
media/photo.jpg
Binary file not shown.
Before Width: | Height: | Size: 19 KiB |
|
@ -1,9 +1,9 @@
|
|||
from .notify_start import notify_started_bot
|
||||
from .default_commands import set_default_commands
|
||||
|
||||
from .update_user_data import check_user_data
|
||||
|
||||
from .telegram_client import TelegramClient
|
||||
from .parse_timedelta import parse_timedelta
|
||||
|
||||
from .virustotal import VirusTotalAPI
|
||||
|
||||
from .arguments_parser import getArgument,getCommandArgs,checkArg,parse_duration,delete_substring_from_string
|
||||
|
|
81
utils/arguments_parser.py
Normal file
81
utils/arguments_parser.py
Normal file
|
@ -0,0 +1,81 @@
|
|||
from dataclasses import dataclass
|
||||
from database import Member
|
||||
from aiogram import types
|
||||
|
||||
import re
|
||||
|
||||
def getArgument(arguments:list,index:int=0) -> str | None:
|
||||
""" Get element from a list.If element not exist return None """
|
||||
if not (arguments):
|
||||
return None
|
||||
|
||||
if (len(arguments) > index):
|
||||
return arguments[index]
|
||||
else:
|
||||
return None
|
||||
|
||||
@dataclass
|
||||
class CommandArguments:
|
||||
to_user:Member | None
|
||||
from_user:Member | None
|
||||
arguments:list
|
||||
|
||||
async def getCommandArgs(message: types.Message) -> CommandArguments:
|
||||
"""
|
||||
Describe user data and arguments from message
|
||||
!command (username|id) ...
|
||||
"""
|
||||
|
||||
|
||||
arguments = message.text.split()[1:]
|
||||
to_user = None
|
||||
from_user = Member.search(Member.user_id, message.from_user.id)
|
||||
|
||||
# If message replied
|
||||
if (message.reply_to_message):
|
||||
to_user = Member.search(Member.user_id, message.reply_to_message)
|
||||
else:
|
||||
user_data = getArgument(arguments)
|
||||
|
||||
if (user_data):
|
||||
if (user_data.isdigit()):
|
||||
to_user = Member.search(Member.user_id, user_data)
|
||||
|
||||
if (user_data[0] == "@"):
|
||||
to_user = Member.search(Member.username, user_data)
|
||||
|
||||
if (arguments) and (not to_user):
|
||||
await message.answer(f"❌ User {to_user} not exist.")
|
||||
|
||||
arguments = arguments[1:]
|
||||
|
||||
return CommandArguments(to_user, from_user, arguments)
|
||||
|
||||
|
||||
def delete_substring_from_string(string:str,substring:str) -> str:
|
||||
string_list = string.split(substring)
|
||||
return "".join(string_list).lstrip()
|
||||
|
||||
def parse_duration(message) -> str:
|
||||
duration = re.findall(r"(\d+d|\d+h|\d+m|\d+s)",''.join(message))
|
||||
duration = " ".join(duration)
|
||||
return duration
|
||||
|
||||
def checkArg(message:str) -> bool | None:
|
||||
""" Check if first argument in ["enable","on","true"] then return true """
|
||||
if (not message):
|
||||
return None
|
||||
|
||||
argument = message.split()
|
||||
argument = getArgument(message.split(),1)
|
||||
|
||||
if (argument is None):
|
||||
return None
|
||||
|
||||
on = ['enable','on','true']
|
||||
off = ['disable','off','false']
|
||||
|
||||
if (argument in on):
|
||||
return True
|
||||
if (argument in off):
|
||||
return False
|
|
@ -1,27 +0,0 @@
|
|||
from database.models import Member
|
||||
from config import group_id
|
||||
|
||||
async def check_user_data():
|
||||
"""Check user data in database and update it"""
|
||||
from load import tgc,database
|
||||
|
||||
members = await tgc.members_list(group_id)
|
||||
|
||||
for member in members:
|
||||
exists = database.check_data_exists(Member.user_id,member["id"])
|
||||
|
||||
role = "member"
|
||||
if (member["status"] == "ChatMemberStatus.OWNER"):
|
||||
role = "owner"
|
||||
|
||||
if (not exists):
|
||||
database.register_user(
|
||||
member["id"],member["first_name"],
|
||||
member["username"],role
|
||||
)
|
||||
else:
|
||||
database.update_member_data(
|
||||
member["id"],
|
||||
[Member.first_name,Member.user_name],
|
||||
[member["first_name"],member["username"]]
|
||||
)
|
Reference in a new issue