第一时间收到必看的up 主的视频更新消息,推送到微信或钉钉,冲
#1. python3 代码
# -*- coding:utf-8 -*-
# b站up主更新推送
# by stomach
import time
import requests
import json
import os
import logging
logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
level=logging.INFO, filename='/tmp/bslog.log')
# 推送钉钉
def postdingding(up_name, title, bv_id):
HEADERS = {"Content-Type": "application/json ;charset=utf-8"}
data = {
"msgtype": "markdown",
"markdown": {
"title": f'{up_name} 更新啦!',
"text": f"[{title}](https://www.bilibili.com/video/{bv_id}) --{up_name}"
},
"at": {
"atMobiles": [
""
],
"isAtAll": "true"
}
}
String_textMsg = json.dumps(data)
# 钉钉地址
dingdingurl = 'https://oapi.dingtalk.com/robot/send?access_token=8db6bd77b5470e911XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
res = requests.post(dingdingurl, data=String_textMsg, headers=HEADERS)
logging.info(res.text)
# 推送微信
def postwechat1(up_name, title, bv_id):
HEADERS = {"Content-Type": "application/json ;charset=utf-8"}
# 封面图片
p = requests.get('https://ley.best/picapi/').text
# 获得token
token = get_token()
# agentid 按需更改
data = {
"touser": "@all",
"msgtype": "news",
"agentid": 1000002,
"news": {
"articles": [
{
"title": f'{up_name} 更新啦!',
"description": f'{title} --{up_name}',
"url": f'https://www.bilibili.com/video/{bv_id}',
"picurl": p
}
]
},
"at": {
"atMobiles": [
""
],
"isAtAll": "true"
}
}
String_textMsg = json.dumps(data)
# 企业微信应用地址
wechaturl = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}'
res = requests.post(wechaturl, data=String_textMsg, headers=HEADERS)
print(res.text)
def get_token():
secret = 'fU9hhFGiya3NZkzkMnXXXXXXXXXXXXXXXXX'
corpid = 'ww98c01fXXXXXXXXXXXXXX'
r = requests.get(
f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={secret}').text
js = json.loads(r)
token = js['access_token']
print(token)
return token
# 获取企业微信二维码
def get_qrcode(token=''):
token = get_token()
url = f'https://qyapi.weixin.qq.com/cgi-bin/corp/get_join_qrcode?access_token={token}&size_type=1'
r = requests.get(url)
print(r.text)
# 获取含多p的视频更新
def get_bv_info(bv_id):
r = requests.get(url=f'http://api.bilibili.com/x/web-interface/view?bvid={bv_id}')
res = r.text
js = json.loads(res)
pages = js["data"]["pages"]
num = len(pages)
title = js["data"]["title"]
last_p_name = pages[-1]["part"]
print(last_p_name)
return title, last_p_name, num
def post_bv(title, last_p_name, bv_id, num):
HEADERS = {"Content-Type": "application/json ;charset=utf-8"}
p = requests.get('https://ley.best/picapi/').text
token = get_token()
data = {
"touser": "@all",
"msgtype": "news",
"agentid": 1000003,
"news": {
"articles": [
{
"title": f'{title}更新啦!',
"description": f'{last_p_name}',
"url": f'https://www.bilibili.com/video/{bv_id}?p={num}',
"picurl": p
}
]
},
"at": {
"atMobiles": [
""
],
"isAtAll": "true"
}
}
String_textMsg = json.dumps(data)
# 企业微信应用地址
wechaturl = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}'
res = requests.post(wechaturl, data=String_textMsg, headers=HEADERS)
print(res.text)
def get_up_video(up_id):
# 这个api 用requests 不得行啊,用curl 吧
cmd = f'curl -s "http://api.bilibili.com/x/space/arc/search?mid={up_id}&ps=5&pn=1"'
res = os.popen(cmd).read()
js = json.loads(res)
vlist = js['data']['list']['vlist']
v_list = []
for v in vlist:
l = []
title = v['title']
bv_id = v['bvid']
print(bv_id, title)
l.extend([bv_id, title])
v_list.append(l)
return v_list
def get_up_name(up_id):
r = requests.get(url=f'http://api.bilibili.com/x/space/acc/info?mid={up_id}')
res = r.text
js = json.loads(res)
name = js['data']['name']
return name
def push_and_update_by_up(up_id):
path = f'/home/tmp/{up_id}'
up_name = get_up_name(up_id)
v_list = get_up_video(up_id)
if not os.path.exists(path):
os.system(f'echo 1 > {path}')
with open(path, 'r', encoding='utf-8') as f:
data = f.read()
with open(path, 'a', encoding='utf-8') as f:
for v in v_list:
bv_id, title = v
logging.info(title)
if bv_id not in data:
postwechat1(up_name, title, bv_id)
postdingding(up_name, title, bv_id)
string = f'{bv_id} {title}\n'
f.write(string)
def push_and_update_by_bv(bv_id):
path = f'/home/tmp/{bv_id}'
title, last_p_name, num = get_bv_info(bv_id)
logging.info(title)
if not os.path.exists(path):
os.system(f'echo 1 > {path}')
with open(path, 'r', encoding='utf-8') as f:
data = f.read()
with open(path, 'a', encoding='utf-8') as f:
if last_p_name not in data:
post_bv(title, last_p_name, bv_id, num)
string = f'{last_p_name}\n'
f.write(string)
def main():
logging.info('##################################################')
# up 主id 列表
upid_list = [1202762767, 489248785, 60232120, 287795639, 345630501, 434734521, 16525598, 32682920, 430967976, 290526283, 430967976, 42594430, 1468951948, 31071444, 357066913]
for up_id in upid_list:
try:
push_and_update_by_up(up_id)
except Exception as e:
logging.info(e)
finally:
time.sleep(3)
# 多p视频id
bv_list = ['BV1Bx411a7J2', 'BV1VU4y157jW']
for bv_id in bv_list:
try:
push_and_update_by_bv(bv_id)
except Exception as e:
logging.info(e)
finally:
time.sleep(3)
if __name__ == '__main__':
main()
#2. crontab 定时任务
# 每20分钟拉取一次
# crontab -e
*/20 * * * * /usr/bin/python3 /root/daily_monitor/bilibili-reminder.py
#4. 很喜欢某个up主的视频的话可以使用you-get 下载下来,下次写吧
Comments | 3 条评论
monono 博主
这代码可真是好看呢
mocuishle 博主
@monono
那么看懂了吗~
monono 博主
@mocuishle
看不懂看不懂,代码之美可妙不可言