客至汲泉烹茶, 抚琴听者知音

Dash实现数据推送与实时更新

前言

最近遇到了个需求,需要接收数据并且可视化展示出来,数据格式为DataFame。这个用Dash很容易实现,但实时更新就难搞了。Dash官方有定时刷新的组件,读取倒是不难,难点在于如何推送数据。经过这两天摸索,我找出了三个解决方案。

存储到本地磁盘

Dash官方有dash_core_components.Interval组件,这是一个计时器,可以设定时间间隔循环刷新页面。那么很自然的想法,我把生成的数据存到本地,然后网页不断刷新读取就行了。根据前一篇文章的经验,我们知道feather格式最适合小规模的dataframe的写入与读取,有了这一点,很容易就可以写出代码。

import dash
import dash_core_components as dcc
import dash_html_components as html
import dash_table # 渲染表格的插件
from dash.dependencies import Input, Output

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.H4('实时更新数据'),
        html.Div(id='live-update-text'),
        dash_table.DataTable(id='live-update-table'),
        dcc.Interval(
            id='interval',
            interval=5*1000, # 5秒钟间隔
            n_intervals=0
        )
    ])
)

@app.callback([Output("live-update-table", "data")],
            [Input("interval", "n_intervals")])
def make_table(n):
    df = pd.read_feather('log.feather')
    return df.to_dict('records')

当然,还需要另外开一个进程用于存储df到feather。

测试了一段时间,发现feather读写有bug:当写入的同时读取,就会报错,从而中断计时器的运行,必须手动刷新网页才能重新开始更新。这就不行了,于是我的同事想出了另外一种解决方案:通过ZMQ推送到内存中

通过ZMQ推送到内存中

首先需要开一个端口用于发送数据

import zmq
import time
import random
import numpy as np
import pandas as pd


context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5411")

while True:
    df = pd.DataFrame(data=np.random.normal(0, 1, [2, 3]))
    socket.send_pyobj(df)
    time.sleep(1)

然后上述代码中的make_table就可以直接读取内存中的数据,这样速度更快,而且不会产生读写冲突bug。

# 其他代码省略

def create_zmq_socket(zmq_port="5411"):
    """连接ZMQ服务"""
    context = zmq.Context()
    zmq_socket = context.socket(zmq.SUB)
    zmq_socket.connect("tcp://localhost:%s" % zmq_port)
    zmq_socket.setsockopt_string(zmq.SUBSCRIBE,'')  # 消息过滤
    return zmq_socket

def recv_zmq(listen_port="5411"):
    """从ZMQ中接受数据"""
    with create_zmq_socket(listen_port) as socket:
        msg = socket.recv_pyobj()
    return msg

@app.callback([Output("live-update-table", "data")],
            [Input("interval", "n_intervals")])
def make_table(n):
    df = recv_zmq()
    return df.to_dict('records')

这个方案其实比较完美了,而且也符合我对前后端分离的粗浅认知。但还是有些不足,最重要的一点就是,诸如PYQT这种库,都有相应的事件回调函数,只有有数据推送时才会更新,不需要实时刷新监听。我们这种方案,时间间隔太短可能会消耗更多资源,时间间隔长的话对低延时的业务又非常不利。如果能实现类似于PYQT这种回调触发更新的逻辑就好了。

通过调用API更新数据

由于计算机基础和网络通信等知识的欠缺,我一开始就走向了死胡同,找了很多个方法,都是基于“数据推送过来,触发Dash的回调”进行的。我甚至在考虑要不要把数据推送也写到网页代码里面,然后用while True循环。很明显这样是不行的,While循环会阻塞Dash的线程,结果就是网页根本启动不了。

查了很多资料,最后发现了dash_devices库,它对Dash进行了扩展,其中一个新功能就是可以直接在代码内部改变组件值,而无需通过函数回调。看到这个功能,我突然灵光一现,是不是可以在网页代码里写一个API,数据推送模块一旦要更新数据,就调用一下这个API,然后改变页面上某个隐藏控件的属性,并链式回调触发make_table函数,提醒它从ZQM中获取数据。(当然理论上Post直接更新也可以,但是我对Post传数据不太了解,所以目前是通过这种方式间接实现的)

因为Dash是基于Flask实现的,所以我们可以很容易绑定一个路由。完整代码如下:

import dash_devices
from dash_devices.dependencies import Input, Output, State
import dash_core_components as dcc
import dash_html_components as html
import dash_table
import zmq
import time
import random
import numpy as np
import pandas as pd

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash_devices.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
    html.Div([
        html.H4('实时更新数据'),
        html.Div(id='live-update-text'),
        dash_table.DataTable(id='live-update-table'),
        html.Div(id='flag',hidden=True),
    ])
)

@app.callback([Output("live-update-table", "data")],
            [[Input("flag", "children")]])
def make_table(n):
    df = recv_zmq()
    return df.to_dict('records')

def create_zmq_socket(zmq_port="5411"):
    """连接ZMQ服务"""
    context = zmq.Context()
    zmq_socket = context.socket(zmq.SUB)
    zmq_socket.connect("tcp://localhost:%s" % zmq_port)
    zmq_socket.setsockopt_string(zmq.SUBSCRIBE,'')  # 消息过滤
    return zmq_socket

def recv_zmq(listen_port="5411"):
    """
    接受数据
    """
    with create_zmq_socket(listen_port) as socket:
        msg = socket.recv_pyobj()
    return msg


def get_count():
    global count
    count += 1
    if count >= 10:
        count = 0
    return count

@app.server.route('/update_data',methods = ['get','post'])
def update_data():
    app.push_mods({'flag': {'children': str(get_count())}})
    return '请求成功'

前半部分代码和上两章差不多,只不过用dash_devices替换掉了dash,核心在于最后的绑定路由。这样的话,在数据推送模块,每次推送新数据后,都可以get一下API,提醒网页更新数据。

import requests
requests.get('127.0.0.1:5000/update_data')

添加新评论