B站up主更新推送微信或钉钉

发布于 2021-07-04  3.48k 次阅读


第一时间收到必看的up 主的视频更新消息,推送到微信或钉钉,冲

#1. python3 代码


# -*- coding:utf-8 -*-
# b站up主更新推送
# by stomach
import time
import requests
import json
import os
import logging


logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
                    level=logging.INFO, filename='/tmp/bslog.log')


# 推送钉钉
def postdingding(up_name, title, bv_id):
    HEADERS = {"Content-Type": "application/json ;charset=utf-8"}
    data = {
        "msgtype": "markdown",
        "markdown": {
            "title": f'{up_name} 更新啦!',
            "text": f"[{title}](https://www.bilibili.com/video/{bv_id})  --{up_name}"
        },
        "at": {
            "atMobiles": [
                ""
            ],
            "isAtAll": "true"
        }
    }
    String_textMsg = json.dumps(data)
    # 钉钉地址
    dingdingurl = 'https://oapi.dingtalk.com/robot/send?access_token=8db6bd77b5470e911XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
    res = requests.post(dingdingurl, data=String_textMsg, headers=HEADERS)
    logging.info(res.text)


# 推送微信
def postwechat1(up_name, title, bv_id):
    HEADERS = {"Content-Type": "application/json ;charset=utf-8"}
    # 封面图片
    p = requests.get('https://ley.best/picapi/').text
    # 获得token
    token = get_token()
    # agentid 按需更改
    data = {
        "touser": "@all",
        "msgtype": "news",
        "agentid": 1000002,
        "news": {
            "articles": [
                {
                    "title": f'{up_name} 更新啦!',
                    "description": f'{title}  --{up_name}',
                    "url": f'https://www.bilibili.com/video/{bv_id}',
                    "picurl": p
                }
            ]
        },
        "at": {
            "atMobiles": [
                ""
            ],
            "isAtAll": "true"
        }
    }
    String_textMsg = json.dumps(data)
    # 企业微信应用地址
    wechaturl = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}'
    res = requests.post(wechaturl, data=String_textMsg, headers=HEADERS)
    print(res.text)


def get_token():
    secret = 'fU9hhFGiya3NZkzkMnXXXXXXXXXXXXXXXXX'
    corpid = 'ww98c01fXXXXXXXXXXXXXX'
    r = requests.get(
        f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={secret}').text
    js = json.loads(r)
    token = js['access_token']
    print(token)
    return token


# 获取企业微信二维码
def get_qrcode(token=''):
    token = get_token()
    url = f'https://qyapi.weixin.qq.com/cgi-bin/corp/get_join_qrcode?access_token={token}&size_type=1'
    r = requests.get(url)
    print(r.text)


# 获取含多p的视频更新
def get_bv_info(bv_id):
    r = requests.get(url=f'http://api.bilibili.com/x/web-interface/view?bvid={bv_id}')
    res = r.text
    js = json.loads(res)
    pages = js["data"]["pages"]
    num = len(pages)
    title = js["data"]["title"]
    last_p_name = pages[-1]["part"]
    print(last_p_name)
    return title, last_p_name, num


def post_bv(title, last_p_name, bv_id, num):
    HEADERS = {"Content-Type": "application/json ;charset=utf-8"}
    p = requests.get('https://ley.best/picapi/').text
    token = get_token()
    data = {
        "touser": "@all",
        "msgtype": "news",
        "agentid": 1000003,
        "news": {
            "articles": [
                {
                    "title": f'{title}更新啦!',
                    "description": f'{last_p_name}',
                    "url": f'https://www.bilibili.com/video/{bv_id}?p={num}',
                    "picurl": p
                }
            ]
        },
        "at": {
            "atMobiles": [
                ""
            ],
            "isAtAll": "true"
        }
    }
    String_textMsg = json.dumps(data)
    # 企业微信应用地址
    wechaturl = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}'
    res = requests.post(wechaturl, data=String_textMsg, headers=HEADERS)
    print(res.text)


def get_up_video(up_id):
    # 这个api 用requests 不得行啊,用curl 吧
    cmd = f'curl -s "http://api.bilibili.com/x/space/arc/search?mid={up_id}&ps=5&pn=1"'
    res = os.popen(cmd).read()
    js = json.loads(res)
    vlist = js['data']['list']['vlist']
    v_list = []
    for v in vlist:
        l = []
        title = v['title']
        bv_id = v['bvid']
        print(bv_id, title)
        l.extend([bv_id, title])
        v_list.append(l)
    return v_list


def get_up_name(up_id):
    r = requests.get(url=f'http://api.bilibili.com/x/space/acc/info?mid={up_id}')
    res = r.text
    js = json.loads(res)
    name = js['data']['name']
    return name


def push_and_update_by_up(up_id):
    path = f'/home/tmp/{up_id}'
    up_name = get_up_name(up_id)
    v_list = get_up_video(up_id)
    if not os.path.exists(path):
        os.system(f'echo 1 > {path}')
    with open(path, 'r', encoding='utf-8') as f:
        data = f.read()
    with open(path, 'a', encoding='utf-8') as f:
        for v in v_list:
            bv_id, title = v
            logging.info(title)
            if bv_id not in data:
                postwechat1(up_name, title, bv_id)
                postdingding(up_name, title, bv_id)
                string = f'{bv_id} {title}\n'
                f.write(string)


def push_and_update_by_bv(bv_id):
    path = f'/home/tmp/{bv_id}'
    title, last_p_name, num = get_bv_info(bv_id)
    logging.info(title)
    if not os.path.exists(path):
        os.system(f'echo 1 > {path}')
    with open(path, 'r', encoding='utf-8') as f:
        data = f.read()
    with open(path, 'a', encoding='utf-8') as f:
        if last_p_name not in data:
            post_bv(title, last_p_name, bv_id, num)
            string = f'{last_p_name}\n'
            f.write(string)


def main():
    logging.info('##################################################')
    # up 主id 列表
    upid_list = [1202762767, 489248785, 60232120, 287795639, 345630501, 434734521, 16525598, 32682920, 430967976, 290526283, 430967976, 42594430, 1468951948, 31071444, 357066913]
    for up_id in upid_list:
        try:
            push_and_update_by_up(up_id)
        except Exception as e:
            logging.info(e)
        finally:
            time.sleep(3)
    # 多p视频id
    bv_list = ['BV1Bx411a7J2', 'BV1VU4y157jW']
    for bv_id in bv_list:
        try:
            push_and_update_by_bv(bv_id)
        except Exception as e:
            logging.info(e)
        finally:
            time.sleep(3)


if __name__ == '__main__':
    main()

#2. crontab 定时任务


# 每20分钟拉取一次
# crontab -e
*/20 * * * * /usr/bin/python3 /root/daily_monitor/bilibili-reminder.py

#3. 效果
B站up主更新推送微信或钉钉

#4. 很喜欢某个up主的视频的话可以使用you-get 下载下来,下次写吧