MENU

qq群机器人绑定某网站站内签到

• April 15, 2019 • Read: 349 • python

以pt之家为例,在q群里艾特机器人签到,自动在站内签到。

依旧是基于nonebot。原理很简单,就是绑定qq号码和网站的cookies。第一次用的话就会小窗让群员输用户名密码和图片验证码,然后模拟登录网站获取cookies并且保存到数据库。然后再次使用的时候直接用cookies模拟登录签到,这个功能插件有点鸡肋,分网站也分人吧。

数据库设计

create table pthome_group(
    id int unsigned primary key auto_increment not null,
    qq varchar(40) default '',
    __cfduid varchar(200) default '',
    c_secure_login varchar(200) default '',
    c_secure_pass varchar(200) default '',
    c_secure_ssl varchar(200) default '',
    c_secure_tracker_ssl varchar(200) default '',
    c_secure_uid varchar(200) default ''
);

checkin.py

import requests_async as requests
import asyncio
import aiohttp
import aiomysql
from bs4 import BeautifulSoup
import re
from aiocqhttp.message import MessageSegment
from nonebot import on_command,CommandSession,on_natural_language,NLPSession,IntentCommand

#{'anonymous': None, 'font': 146284496, 'group_id': 984611554, 'message': [{'type': 'text', 'data': {'text': '签到'}}], 'message_id': 3668, 'message_type': 'group', 'post_type': 'message', 'raw_message': '签到', 'self_id': 3184350475, 'sender': {'age': 0, 'area': '日本', 'card': '', 'level': '冒泡', 'nickname': 'SSNI319', 'role': 'owner', 'sex': 'male', 'title': '', 'user_id': 420688441}, 'sub_type': 'normal', 'time': 1555305800, 'user_id': 420688441, 'to_me': False}

tmp=True
global_qq_num=None
in_mysql=False
hash_value=''
arg_tmp=None
headers={'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 Safari/537.36'}

@on_command('checkin', aliases=('签到'))
async def checkin(session: CommandSession):
    #解析参数:
    global tmp
    global global_qq_num
    global in_mysql
    #print('state',session.state)
    global_qq_num = session.ctx['user_id']
    #print(global_qq_num)
    if session.ctx['message_type'] == 'group':
        tmp = True
        #print('group')
    else:
        tmp = False
        #print('user')
    # 查数据库

    loop = asyncio.get_event_loop()
    async with aiomysql.create_pool(host='*********', port=3306, db='*********', user='root', password='*********',
                                    loop=loop) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cs:
                sql = """select * from pthome_group where qq="%s";""" % str(global_qq_num)
                #print(sql)
                count = await cs.execute(sql)
                await conn.commit()
        pool.close()
        await pool.wait_closed()
    if count == 1:
        in_mysql = True
        #print('in mysql')
    else:
        in_mysql = False
        #print('not in mysql')
    if tmp:
        if in_mysql:
            #签到
            report=await pthome_checkin(global_qq_num)
            await session.send(report,at_sender=True)
        else:
            #私聊
            await session.send('您是第一次签到呢,请关注我给您发的小窗消息哦',at_sender=True)
            await session.send('您是第一次签到呢,请在这里重新回复我签到,我马上给您签到哦~',ensure_private=True)
    else:
        if in_mysql:
            #签到
            report = await pthome_checkin(global_qq_num)
            await session.send(report)
        else:
            global arg_tmp
            s=session
            if s.is_first_run:
                global hash_value
                async with requests.Session() as session:
                    url = 'https://www.pthome.net/login.php'
                    response = await session.get(url=url, headers=headers)
                    cookie = response.cookies
                    html = BeautifulSoup(response.text, 'html.parser')
                    input_list = html.select('input')
                    hash_value = input_list[3]['value']
                    #print('hash',hash_value)
                #state
                #print('state',session.state)
                img_url = 'https://www.pthome.net/image.php?action=regimage&imagehash=%s' % str(hash_value)
                arg_tmp=s.get('checkin',prompt=MessageSegment.image(img_url)+'图片验证码(可能需要点击查看)\n'+'请回复\n用户名\n密码\n图片验证码\n并用空格隔开;例如:\n lemeaco    123455    1gn1g2')
            args_list=(s.state['checkin']).split()
            #print(args_list)
            data={'username':args_list[0],'password':args_list[1],'imagestring':args_list[2],'imagehash':str(hash_value)}
            #print(data)
            async with aiohttp.ClientSession() as session:
                async with session.post(url='https://www.pthome.net/takelogin.php',headers=headers,data=data,allow_redirects=False) as response:
                    #assert response.status == 200
                    cookies=response.cookies
                    #print(response.text())
                    #print(dict(cookies))
                    #print(str(cookies['__cfduid']))
                    cookies=dict(cookies)
                    cookie=dict()
                    cookie['__cfduid']=(re.match(r'.*__cfduid=(.*?);.*',str(cookies['__cfduid']))).group(1)
                    #print(cookie)
                    cookie['c_secure_login'] = (re.match(r'.*c_secure_login=(.*?);.*',str(cookies['c_secure_login']))).group(1)
                    #print(cookie)
                    cookie['c_secure_pass'] = (re.match(r'.*c_secure_pass=(.*?);.*',str(cookies['c_secure_pass']))).group(1)
                    cookie['c_secure_ssl'] = (re.match(r'.*c_secure_ssl=(.*?);.*',str(cookies['c_secure_ssl']))).group(1)
                    cookie['c_secure_tracker_ssl'] = (re.match(r'.*c_secure_tracker_ssl=(.*?);.*',str(cookies['c_secure_tracker_ssl']))).group(1)
                    cookie['c_secure_uid'] = (re.match(r'.*c_secure_uid=(.*?);.*',str(cookies['c_secure_uid']))).group(1)
            loop=asyncio.get_event_loop()
            async with aiomysql.create_pool(host='*********', port=3306, db='*********', user='root', password='*********',loop=loop) as pool:
                async with pool.acquire() as conn:
                        async with conn.cursor() as cs:
                            sql = "select * from pthome_group where qq='%s';" % global_qq_num
                            #print(sql)
                            count=await cs.execute(sql)
                            if count == 0:
                                sql = """insert into pthome_group values(0,"%s","%s","%s","%s","%s","%s","%s");""" % (str(global_qq_num),cookie['__cfduid'],cookie['c_secure_login'],cookie['c_secure_pass'],cookie['c_secure_ssl'],cookie['c_secure_tracker_ssl'],cookie['c_secure_uid'])
                                #print(sql)
                                await cs.execute(sql)
                            await conn.commit()
                pool.close()
                await pool.wait_closed()
                print('插入数据库成功')
            #签到
            report = await pthome_checkin(global_qq_num)
            await s.send('正在签到,以后直接在群内艾特我签到就可以了')
            await s.send(report)

@on_natural_language(keywords={'签到'},only_to_me=True)
async def _(session: NLPSession):
    stripped_msg = session.msg_text.strip()
    i = 0
    checkin=''
    for item in stripped_msg.split():
        if i > 0:
            checkin = checkin + item + '  '
        if i == len(stripped_msg.split()):
            checkin = checkin + item
        i = i + 1
    #print(checkin)
    return IntentCommand(90.0, 'checkin', current_arg= checkin or'')

async def pthome_checkin(global_qq_num):
    async with requests.Session() as session:
        #数据库拿到cookies
        loop = asyncio.get_event_loop()
        async with aiomysql.create_pool(host='*********', port=3306, db='*********', user='root', password='*********',
                                        loop=loop) as pool:
            async with pool.acquire() as conn:
                async with conn.cursor() as cs:
                    sql = "select * from pthome_group where qq='%s';" % global_qq_num
                    # print(sql)
                    count = await cs.execute(sql)
                    result=await cs.fetchall()
                    await conn.commit()
            pool.close()
            await pool.wait_closed()
        cookie={}
        #print(result)
        cookie['__cfduid'] = result[0][2]
        #print(cookie)
        cookie['c_secure_login'] = result[0][3]
        #print(cookie)
        cookie['c_secure_pass'] = result[0][4]
        cookie['c_secure_ssl'] = result[0][5]
        cookie['c_secure_tracker_ssl'] = result[0][6]
        cookie['c_secure_uid'] = result[0][7]

        url = 'https://www.pthome.net/attendance.php'
        response = await session.get(url=url, headers=headers,cookies = cookie)
        html = BeautifulSoup(response.text, 'html.parser')
        #print(html)
        class_text = html.select('.text')
        #print(class_text)
        response_text=class_text[-1].get_text()
        print(str(global_qq_num)+'签到成功')
        return str(response_text)

自认为最近写的有关nonebot这类异步的代码有点多,爬虫的代码也写了不少,虽然不敢说爬虫玩的有多厉害,起码scrapy这个框架我至今不怎么会用2333。决定要进修一下,立个flag,学完前端再来。到时候开发个网站看看。

Tags: python
最后编辑于: April 17, 2019
Archives QR Code Tip
QR Code for this page
Tipping QR Code
Leave a Comment

已有 3 条评论
  1. sxfbest sxfbest

    优秀!跟随大佬的脚步。晚上回家试试@(哈哈)

    1. @sxfbest哈哈哈c,小破站竟然还有人看

    2. sxfbest sxfbest

      @柊叶当然有人看啊,博主不要妄自菲薄,很多人连小站都没有,比如我,哈哈@(哈哈)