python大数据开发:实现微信自动报表
大数据时代下,从基层到管理层都希望从日常数据着手,关注业务每日变化的同时寻找可以驱动业务的点。所以日常报表的及时性和便捷可读性非常重要。特别是有时候周末放假,老板突然想知道这两天数据咋样了,而这时你身边刚好没有电脑就非常麻烦。为了解决这一问题,让老板和业务方可以随时在微信上查看核心数据,我们利用python做了个微信报表机器人。玩过微信自动回复的人都知道,万能的python有一个库叫itchat,是开源的微信个人号接口,可以实现消息监控、获取信息以及自动回复等功能。
首先梳理一下我们所需要做的东西:
1、连接数据库获取数据
2、将数据生成表格式的图片或文件(发现老板更中意图片形式~)
3、设立分组与触发条件
4、根据触发条件发送图片或文字
我把前两点归到一个脚本作为子程序,后两点归到一个脚本作为主程序。这样子可以通过增加子程序(一个报表作为一个子程序)来满足多个不同需求的数据表格,并用主程序来调用它们。
子程序:
这里主要用到pymysql、numpy和matplotlib三个库。pymysql用来连接数据库,获取源数据,numpy用来辅助数据运算,而matplotlib是python的绘图库,用来作图。NumPy 和 Matplotlib 的组合,可以有效替代 MatLab,在数据科学或者机器学习中经常会用到。
# -*- coding: utf-8 -*-
import time,pymysql,datetime
import numpy as np
import matplotlib.pyplot as plt
from pylab import mpl
from matplotlib.font_manager import FontProperties
mpl.rcParams['font.sans-serif'] = ['SimHei'] #动态设置 中文字体
font1 = FontProperties(fname=r"c:\windows\fonts\msyh.ttf", size=19) #微软雅黑
font2 = FontProperties(fname=r"c:\windows\fonts\msyh.ttf", size=18) #微软雅黑
和上次的邮件自动化一样,第一步就是建立连接数据库的函数,查询语句可单独作为一个函数,方便后面调用。
def connectDatabase(sql):
db = pymysql.connect(
host = 'xxx.xx.xx.xxx',
port = xxxxx,
user = 'xxxxxx',
passwd = 'xxxxxx',
db = 'xxxxxxxx',
charset = "utf8")
conn = db.cursor()
conn.execute(sql)
results = conn.fetchall()
conn.close()
db.close()
return results
def get_dau_sql(start_day):
sql='''
select xxxxx
from
table
where dt >= ''' + "'" + start_day + "'" + '''
order by dt desc
'''
return sql
第二步就是建立获取数据的函数。这里用了try/except语句来处理异常,当try之中的语句出现错误时,except语句能够捕获异常信息msg,并pass掉来保证主程序和其他子程序的顺利进行。之前没有用的时候容易发生一个数据没更新使得整个微信机器人瘫掉的情况。
def getData(day):
try:
data = [[],[],[],[],[],[],[],[],[],[],[]] #根据数据的长度而定
systime = time.strftime('%Y-%m-%d', time.localtime())
start_day = datetime.datetime.today().strptime(systime,'%Y-%m-%d') - datetime.timedelta(days=day)
start_day = start_day.strftime('%Y-%m-%d')
sql = get_dau_sql(start_day)#获取查询语句
results = connectDatabase(sql)
data[0].append(u'日环比')
data[0].append(u'周同比')
for i in range(1,10) :#计算日环比、周同比
data[i].append( results[0][i] - results[1][i])
data[i].append( results[0][i] - results[7][i])
data[10].append( results[1][10] - results[2][10])
data[10].append( results[1][10] - results[8][10])
for info in results :#写入数据
for i in range(len(info)) :
data[i].append(info[i])
return data
except Exception as msg:
#print("getData:", msg)
pass
第三步是画图。这里有种方法,一种是直接把数据填入excel,以文件形式发送或再将表格复制成图片(在上一篇邮件自动化里有讲到),另一种就是直接利用python的绘图库来画图。我这里要讲的是第二种办法,它的优点是日常运作不易出问题而且使用灵活,结合pyecharts库可以作出很好看的数据可视化。缺点就是python没有可以直接生成表格图片的库(如果有请一定要告诉我),我们只能根据坐标系,用划线的方式组合成表格,并利用坐标位置填入数据。
def build_DAU_img():
try:
#获取日期,用于命名每日文件
systime = time.strftime('%Y-%m-%d', time.localtime())
today = datetime.datetime.today().strptime(systime,'%Y-%m-%d')
day = today.strftime('%Y%m%d')
data =getData(14) #调用上面的取数函数
higth_num = 0.7 #行高
boteen_higth = 0.3 #字体在单元格里的高度
plt.figure(figsize=(28, 12)) #画布大小
row = len(data[1])
col = len(data)
for i in range(2, 2 * col, 2): # 表格的列,列宽为2
plt.vlines(i + 0.5 ,0, (row) * higth_num, linewidth=1,colors='darkgray')
plt.vlines(2.5, 0, row*higth_num, linewidth=1) #根据你表格的样式进行调整
for i in range(0, row): # 表格的行,行高为0.7
plt.hlines(i * higth_num, 0, 2 * col + 0.5, linewidth=1,colors='darkgray')
plt.ylim(0, row*higth_num) #x,y轴的数值显示范围,即表格大小
plt.xlim(0, 2 * col + 0.5)
plt.xticks(range(0, 0), list('' * 8)) #设置刻度值
yticks = [i / 10 for i in range(0, 10, 2)]
plt.yticks(range(0, 0), list(' ' * 5))
#为表头加上颜色~thistle是我喜欢的粉紫色
x = np.arange(0, 64, 0.5)
y1 = row*higth_num
y2 = (row-1)*higth_num
plt.fill_between(x, y1, y2,where=(-1<x) & (x<32), facecolor='thistle')
#添加字段名
plt.text(0.8, (row-1 + boteen_higth)*higth_num, u'日期', fontproperties=font1,color='black')
plt.text(2.85, (row-1 + boteen_higth)*higth_num, u'曝光视频数', size=17, fontproperties=font1,color='black')
plt.text(4.7, (row-1 + boteen_higth)*higth_num, u'平均曝光次数', size=17, fontproperties=font1,color='black')
plt.text(6.9, (row-1 + boteen_higth)*higth_num, u'曝光设备', size=17, fontproperties=font1,color='black')
plt.text(8.9, (row-1 + boteen_higth)*higth_num, u'播放设备', size=17, fontproperties=font1,color='black')
plt.text(10.8, (row-1 + boteen_higth)*higth_num, u'设备播放率', size=17, fontproperties=font1,color='black')
plt.text(12.8, (row-1 + boteen_higth)*higth_num, u'次数播放率', size=17, fontproperties=font1,color='black')
plt.text(14.6, (row-1 + boteen_higth)*higth_num, u'人均播放次数', size=17, fontproperties=font1,color='black')
plt.text(16.6, (row-1 + boteen_higth)*higth_num, u'人均播放时长', size=17, fontproperties=font1,color='black')
plt.text(18.9, (row-1 + boteen_higth)*higth_num, u'播放进度', size=17, fontproperties=font1,color='black')
plt.text(20.8, (row-1 + boteen_higth)*higth_num, u'播放留存率', size=17, fontproperties=font1,color='black')
plt.title(u'近14天消费数据', fontproperties=font1,size=23)
#填入日环比、周同比,并用绿色和红色来标记上涨和下跌
for i in range(2):
for j in range(len(data)) : #列
if j==0:
plt.text(j*2 + 0.7, (row + boteen_higth - i - 2)*higth_num, str(data[j][i]), size=18, fontproperties=font2,color='k')
elif j in (5,6,9,10):
if data[j][i]>0:
plt.text(j*2 + 1, (row + boteen_higth - i - 2)*higth_num, '+'+str('%.2f' % (data[j][i]*100))+'%', fontproperties=font2,color='green')
else:
plt.text(j*2 + 1, (row + boteen_higth - i - 2)*higth_num, str('%.2f' % (data[j][i]*100))+'%', fontproperties=font2,color='red')
elif j in (2,7,8):
if data[j][i]>0:
plt.text(j*2 + 1, (row + boteen_higth - i - 2)*higth_num, '+'+str('%.2f' % (data[j][i])), fontproperties=font2,color='green')
else:
plt.text(j*2 + 1, (row + boteen_higth - i - 2)*higth_num, str('%.2f' % (data[j][i])), fontproperties=font2,color='red')
else:
if data[j][i]>0:
plt.text(j*2 + 1, (row + boteen_higth - i - 2)*higth_num, '+'+str(data[j][i]), fontproperties=font2,color='green')
else:
plt.text(j*2 + 1, (row + boteen_higth - i - 2)*higth_num, str(data[j][i]), fontproperties=font2,color='red')
#填入数据
for i in range(2,len(data[0])): # 行
for j in range(0, len(data)) : #列
if j==0:
plt.text(j*2 + 0.3, (row + boteen_higth - i - 2)*higth_num, data[j][i], fontproperties=font2,color='k')
elif j in (5,6,9,10):
plt.text(j*2 + 1, (row + boteen_higth - i - 2)*higth_num, str('%.2f' % (data[j][i]*100))+'%', fontproperties=font2,color='k')
else:
plt.text(j*2 + 1, (row + boteen_higth - i - 2)*higth_num, data[j][i], fontproperties=font2,color='k')
plt.savefig('E:/imgSave/kpi' + str(day) + '.png') #保存
print(0)
except Exception as msg:
print("build_DAU_img:", msg)
print(1)
到这一步如果程序没有报错,就可以得到下列的图啦

主程序:
主程序主要用到前文提到的itchat和subprocess。当然,你还需要一个微信小号:小助手。
#coding:utf-8
import itchat,time,shelve,datetime as datetime1,datetime as datetime2,subprocess
from itchat.content import *
from pandas import *
设立业务分组,并用shelve进行储存分组结果。
shelfFile = shelve.open('group1')
mylist = list(shelfFile.keys())
if mylist == []:
group = {'yewu1': [], 'yewu2': [] ,'yewu3': []}
shelfFile['group'] = group
else:
group = shelfFile['group']
shelfFile.close()
建立判断用户分组与权限的函数。当用户在群聊里@了小助手并输入对应的暗号,如:yewu1后,便会弹出“该群加入XX组成功“,输入错误则会弹出“该群尚未加入群组,请输入正确组名”,函数最后返回对应的组名。
def check_group(group, msg): #判断用户权限
shelfFile = shelve.open('group1')
groupID = msg['FromUserName'] #发言者昵称
for key in group: #group 获取各业务群名称
groupKey = 0
if groupID in group[key]:
groupKey = key
break
if groupKey == 0: # 等于0表示未加入任何组别
if msg['Content'][msg['Content'].index(u'手') + 2:] == 'yewu1':
group['huwai'].append(groupID)
shelfFile['group'] = group
shelfFile.close()
itchat.send_msg(u'该群加入XX组成功', toUserName=msg['FromUserName'])
elif msg['Content'][msg['Content'].index(u'手') + 2:] == 'yewu2':
group['yunying'].append(groupID)
shelfFile['group'] = group
shelfFile.close()
itchat.send_msg(u'该群加入XX组成功', toUserName=msg['FromUserName'])
elif msg['Content'][msg['Content'].index(u'手') + 2:] == 'yewu3':
group['meishi'].append(groupID)
shelfFile['group'] = group
shelfFile.close()
itchat.send_msg(u'该群加入XX组成功', toUserName=msg['FromUserName'])
else:
itchat.send_msg(u'该群尚未加入群组,请输入正确组名', toUserName=msg['FromUserName'])
return groupKey # 找到对应组别返回
建立回复单独文本消息函数。为了提高效率,我的小助手暂不支持私聊。
@itchat.msg_register(TEXT) #这里的TEXT表示如果有人单独发送文本消息,那么就会调用下面的方法
def simple_reply(msg):
getmsg = msg['Content']
# print (msg)
# if u'liucun' in getmsg :
# # starday,endday = get_date(getmsg,systime,msg)
# data = getData()
# build_img(data,getmsg )
itchat.send_image('本助手暂不接受私聊哦,请在群组中进行回复', toUserName=msg['FromUserName'])
建立回复群聊消息函数。这里可以根据暗号(@小助手 1)来设定不同的回复报表。这里用到subprocess.Popen来执行传递过来的子程序,用output.communicate()获取到子程序的输出,这就是为什么我在子程序的最后要输出1或0了。当输出结果为1时也就是子程序执行失败,就回复“数据未更新,请稍后再试”。否则就输出生成的图片。
@itchat.msg_register(TEXT, isGroupChat=True) #这里的TEXT、isGroupChat 表示如果是群有人发送文本消息,那么就会调用下面的方法
def group_reply(msg):
if msg['isAt']: # 判断是否@自己
# print (msg)
groupKey = check_group(group, msg)
sub_biz = 0
if groupKey == 'yewu1' :
systime = time.strftime('%Y-%m-%d', time.localtime())
today = datetime1.datetime.today().strptime(systime,'%Y-%m-%d')
day = today.strftime('%Y%m%d')
if msg['Content'][msg['Content'].index(u'手') + 2:] == '1':
output = subprocess.Popen(r"python E:\WeChat\createPic.py", stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
code = output.communicate()[0]
if '1' in code:
itchat.send_msg(u'数据未更新,请稍后再试', toUserName=msg['FromUserName'])
else:
itchat.send_file('E:/imgSave/kpi' + str(day) + '.png',toUserName=msg['FromUserName'])
else :
itchat.send_msg(u'输入时请注意输入规则',
toUserName=msg['FromUserName'])
最后,也就是最重要的一步,就是登录微信。
itchat.auto_login(hotReload=True)
itchat.run()
最后把程序挂到服务器或者能够24小时营业的电脑上,扫码登录就大功告成。补充一点,小助手可以挂到手机模拟器上,就不用担心手机不够用啦。