客至汲泉烹茶, 抚琴听者知音

用pandas分析全球新冠COVID-19疫情

前言

这周一就想写这篇博客的,不过最近事务繁忙(拖延症犯了),一直拖到今天才写好几个函数,本来打算是分析一下疫情态势,不过想想我又不擅长数据分析,而且全球疫情瞬息万变,不如写一篇教程,授人以渔嘛。于是代码就先写到这里吧,感觉基本功能已经够了,如果你有新需求,可以在评论区留言,我有空就继续写。

这篇博客的代码可以让你:获取全球以及中国分省市疫情数据,进行清洗与分析,最后获得一些关键指标,如累计数据(病例、死亡、治愈),新增数据,现存病例,治愈率、感染率、死亡率,还可以获得选中国家的直观对比情况。

注:本人只是python爱好者,并非大神,代码可能有不完善之处,欢迎在评论区批评指正。

github地址:https://github.com/caly5144/shu-s-project/tree/master/covid

依赖

python不必说,博主用的是python3.6.5。sqlite3是python自带库,无需手动安装。如果你用的是anaconda的话,也会自带pandas,如果不是,那就手动安装一下。

pip install pandas

SQLiteStudio是可视化的sqlite管理工具,装了之后方便查看数据。

代码

获取数据

首先我们要获取新冠疫情实时数据,当然,不能是爬虫,要不然学习成本和获取数据难度都有大幅提高。我们直接用现成整理好的数据就行,github上有不少仓库存储了新冠疫情数据,比如这个:https://github.com/canghailan/Wuhan-2019-nCoV,它的特点是提供了中国省市数据,每小时更新,并提供csv、json、xlsx格式供下载。

当然我们不用下载,因为pandas可以读取远程数据。但是众所周知的是,github在中国大陆访问速度比较慢,读取一次可能要花好长时间,这时候就要请出我们的大杀器—jsdelivr,上一篇文章我们介绍了相关原理,在此不再赘述。直接读取加速网址:https://cdn.jsdelivr.net/gh/canghailan/Wuhan-2019-nCoV/Wuhan-2019-nCoV.csv即可。

该数据集中有累计确诊、治愈、死亡、疑似数据,我们首先加上个现存病例now=累计确诊-累计治愈-累计死亡。

接着看下数据结构:

可以看出它是一个三级式的结构,国家、省、市混在了一起,我们觉得这样会妨碍数据分析,如果能把数据按照行政级别分开就好了,比如国家的数据放到一张表中,省的数据放到另外一张表中。我的解决思路是添加一个region_class,之后再根据不同的region_class进行数据取出与存储。

那么怎么判断行政级别呢?由上图可以看出,国家级别的行政区是没有provincecity字段的,而省级别的行政区是没有city字段的。那么只要判断相应的字段为存在,不就能判断行政级别了吗?整理一下思路,代码如下:

import pandas as pd
import sqlite3
import datetime
# 后面就不再写导入的库了
def get_data():
    conn = sqlite3.connect(r'E:\sqlite3\nCoV\nCoV.db')
    url = 'https://cdn.jsdelivr.net/gh/canghailan/Wuhan-2019-nCoV/Wuhan-2019-nCoV.csv'
    df = pd.read_csv(url)
    
    df['now'] = df.confirmed-df.cured-df.dead # 添加一列,现存确诊
    df['region_class'] = 0 # 国家
    df.loc[pd.isnull(df.province) == False , 'region_class' ] = 1 # 省
    df.loc[pd.isnull(df.city) == False , 'region_class' ] = 2 # 市
    # 添加一列region_class,判断行政级别
    
    df = df.set_index('date')
    df.to_sql('data',conn,if_exists='replace')

初步清洗

这一步没什么好说的,就是把原始数据中的数据取出,按条件分别放到不同行政区数据表中。不过有一个点值得注意:我们需要计算出每日新增数据,应该怎么办?我的方法是复制一列数据(如累计新增),然后整体往下挪一行,新列第一行设为0,然后对这两行进行相减,即可得到每日新增确诊。

def data_clear():
    conn = sqlite3.connect(r'E:\sqlite3\nCoV\nCoV.db')
    region_dict = {0:'country',1:'province',2:'city'}
    for region_class in range(3):            
        df = pd.read_sql('select * from data where {0} like {1}'\
                         .format('region_class',region_class),conn)
        item_list = [('confirmed','inc_confirmed'),('suspected','inc_suspected')
        ,('cured','inc_cured'),('dead','inc_dead')]
        
        for index in range(4):
            df['temp'] = df.groupby(region_dict[region_class])\
            [item_list[index][0]].shift(1) # 添加一列        
            df['temp'].fillna(0, inplace=True) # 缺失值(即疫情第一天),填充0
            df[item_list[index][1]] = df[item_list[index][0]] - df['temp']
            df = df.drop(['temp'],axis=1) 
    
        df = df.set_index('date')
        df.to_sql(region_dict[region_class],conn,if_exists='replace')
        print(region_dict[region_class],'已添加到数据库中')

人口数据

我们想获得人口数据,以便进行更深一步分析(比如计算感染率),原数据集中没有,因此我们需要去其他地方找。世界银行提供了各个国家人口数据,虽然只更新到2018年,但也基本够用。我们去世界银行官网:https://data.worldbank.org.cn/上,查询并下载人口xls表,重命名为population.xls,并放到代码文件夹下(我的github项目中已经提供了,大家也可以去github中下载)。

但是世界银行数据的国家名与新冠疫情数据集的国家名并不完全一致,所以我们首先要对比有哪些不一致的国家,接着进行一一修改。修改主要分为两部分,一是add_serie,即世行没有提供数据的国家或地区,二是mapping_dict,即与数据集国家名不一致的,大家仿照着格式进行添加即可。我已经修改了大部分国家名,如果大家在运行代码时并没有提示存在国家名不一致的情况,也就无需修改。

def table_exist(conn,table): # 判断某表是否存在
    cursor = conn.cursor()
    cursor.execute("select name from sqlite_master where type='table'")
    alist = cursor.fetchall()
    alist = [num for elem in alist for num in elem] 
    return (table in alist)

def pop_clear(): # 人口数据存储与清洗
    conn = sqlite3.connect(r'E:\sqlite3\nCoV\nCoV.db')
    cursor = conn.cursor() # 获取一个光标
    if table_exist(conn,'pop'):
        df = pd.read_sql('select * from pop',conn)
        
    else:
        df = pd.read_excel('population.xls',skiprows= 3)
        col_n = ['Country Name','2018']
        df = pd.DataFrame(df,columns = col_n) # 选取其中两列数据
        df.rename(columns={'Country Name':'country', '2018':'pop'}, inplace = True)
        df = df.set_index('country')        
        df.to_sql('pop',conn,if_exists='replace')
        print('人口数据储存成功')    
    df = df.reset_index()
    country_list = df['country'].to_list() # 获取population中所有国家列表   
    # 获取数据库中已经存在的国家列表
    country_ncov = cursor.execute('SELECT {0} FROM {0}'.format("country")).fetchall()
    country_ncov = set([elem for item in country_ncov for elem in item])
    
    # 数据清洗主程序
    def process(df):
        add_serie = [['法属圭亚那',289763],['圣巴泰勒米',9868],['马提尼克',375554],['马约特',270400],
                     ['梵蒂冈',1000],['格恩西岛',67052],['荷属安的列斯',227000],['巴勒斯坦',5052000],
                     ['留尼汪',860000],['泽西岛',106800],['瓜德罗普',400104],['钻石公主号邮轮',3700],
                     ['美属维尔京群岛',107300],['蒙特塞拉特',5215],['英属维尔京群岛',30191],['圣皮埃尔和密克隆',5800],
                     ['福克兰群岛(马尔维纳斯)',3459],['安圭拉',16290],]
        
        col_n = ['country','pop']
        df = df.append(pd.DataFrame(add_serie, columns=col_n))
        
          
        mapping_dict = {"country":{"俄罗斯联邦": "俄罗斯","多米尼加共和国": "多米尼加",
                                        "阿拉伯埃及共和国":"埃及","斯洛伐克共和国":"斯洛伐克",
                                        "圣马丁(法属)":"圣马丁","波斯尼亚和黑塞哥维那":"波黑",
                                        "毛里塔尼亚":"毛利塔尼亚","大韩民国":"韩国",
                                        "捷克共和国":"捷克","阿拉伯联合酋长国":"阿联酋",
                                        "文莱达鲁萨兰国":"文莱","伊朗伊斯兰共和国":"伊朗",
                                        "中非共和国":"中非","委内瑞拉玻利瓦尔共和国":"委内瑞拉",
                                        "安道尔共和国":"安道尔","马恩岛":"英国属地曼岛",
                                        "几内亚比绍共和国":"几内亚比绍","也门共和国":"也门",
                                        "特克斯科斯群岛":"特克斯和凯科斯群岛","阿拉伯叙利亚共和国":"叙利亚",
                                        "北马里亚纳群岛":"北马里亚纳"}} 
        df = df.replace(mapping_dict)  # 国家名替换
        df.drop_duplicates(['country'],keep = "first") # 多次运行会有重复数据,需要进行清洗
        df = df.set_index('country')
        return df
    
    
    alist = [i for i in country_ncov if i not in country_list]
    if alist:
        print('尝试清洗人口数据……')
        df = process(df)
        alist = [i for i in country_ncov if i not in set(df.index.to_list())]
        if alist:
            print('已进行过初步清洗,但仍然存在国家名不一致的情况,修改代码后请再次运行:{0}'.format(alist))
        else:
            print('清洗完毕')
    else:
        print('未发现有国家名不一致的情况,无需清洗')
    
    df.to_sql('pop',conn,if_exists='replace')

最后清洗

主要是添加了感染率,死亡率,治愈率数据,这里我只对国家级别的数据进行了添加,省市级别的自行修改代码。另外,死亡率又分为粗死亡率(死亡人数/确诊人数)和细死亡率(死亡人数/(死亡+治愈人数)),原因就是粗死亡率并不能反映最终情况,比如累计病例数不变的情况下,每日粗死亡率肯定是上升的,因此用细死亡率来评估可能更科学一点。

def country_ratio(): # 各种比率
    conn = sqlite3.connect(r'E:\sqlite3\nCoV\nCoV.db')
    pop = pd.read_sql('select * from pop',conn)
    infect = pd.read_sql('select * from country',conn)
    df = pd.merge(infect, pop, how='left', on='country')
    df = df.set_index('date')
    df['pop'] = df['pop'].astype(float)
    df['inf_ratio'] = df['confirmed']/df['pop']*10000 # 万人感染率
    df['death_ratio'] = df['dead']/df['confirmed'] # 粗死亡率
    df['cure_ratio'] = df['cured']/df['confirmed'] # 治愈率
    df['death_ratio_x'] = df['dead']/(df['dead']+df['cured']) # 细死亡率
    
    df.to_sql('country_ratio',conn,if_exists='replace')
    print('最终清洗数据已存入至数据库中')

数据分析

这里我设计了两个函数,一个是select,另外一个是select_control,前者主要用于整体数据分析,后者则用于国家间的对比。先上代码:

def select(data=datetime.date.today().strftime('%Y-%m-%d'),item = ['confirmed','dead'],
           sort_item = ['confirmed'],condition = '',head_row = 20,is_exp = False):
    
    if set(sort_item)-set(item):
        print('分类参数中有多余的参数')        
    else:
        item_list = ['date','country']
        sort_item_list = []
        item_list = type_trans(item_list,item)
        sort_item_list = type_trans(sort_item_list,sort_item)
        if item_list and sort_item_list:            
            conn = sqlite3.connect(r'E:\sqlite3\nCoV\nCoV.db')
            if condition:
                df = pd.read_sql('select {0} from country_ratio where {1}'\
                                 .format(','.join(item_list),condition),conn)
            else:
                df = pd.read_sql('select {0} from country_ratio'\
                                 .format(','.join(item_list)),conn)
            
            if data[0:2]=='20':
                selected = df[df['date'] ==data].sort_values(by = sort_item,ascending = False)
            else:
                selected = df[df['country'] ==data].sort_values(by = 'date',ascending = False)
            
            if is_exp:
                file_name = '{0}_{1}.xlsx'.format(data,''.join(item))
                selected.to_excel(file_name)
                print('{0}数据导出成功'.format(file_name))
            else:
                print(selected.head(head_row))
        else:
            print('参数类型输入错误,请重新输入')

select有六个参数,第一个参数data,可以输入国家名或者日期(2020-03-22格式的),就会列出某国不同日期疫情变化情况或者某一天疫情最为严重的前几个国家情况。默认值是今天。

第二个参数item,可以输入列表、集合或元组,代表展示列表中的字段。默认值是['confirmed','dead'],即展示累计病例数与累计死亡。

第三个参数sort_item,可以输入列表、集合或元组,代表dataframe按列表中的字段进行排序(从大到小),先按列表第一个元素排,再按第二个排……。默认值是['confirmed'],也即按照累计确诊从大到小排。注意sort_item不能出现item没有的元素。

第四个参数condition,这个主要是筛选用的,默认值为空,也即无条件。你需要输入类似sqlwhere语句,比如condition = 'pop>1000000',代表着选取人口超过一百万的国家数据。

第五个参数head_row,代表展示多少行的数据,默认20行。

第六个参数is_exp,代表是否导出数据为excel格式,默认不导出,可改为is_exp = True,即导出。注意此时不再受head_row参数的影响。

[scode type="yellow"]注意:使用函数前,可以先注释掉get_data()、data_clear()、pop_save()、pop_clear()、country_ratio()函数,以免重复获取清洗数据。[/scode]

所有参数均有默认值,因此你直接运行select()就可以看到效果:

我想看一下意大利疫情变化情况,指标选取累计确诊、死亡、治愈?运行select('意大利',['confirmed','dead','cured'])即可

我想看一下3月21日各国现存确诊,死亡率和感染率,并且按照死亡率排序,而且我更关心大国数据(人口大于1000万)?运行select('2020-03-21',item = ['now','inf_ratio','death_ratio'],sort_item=['inf_ratio'],condition = 'pop>10000000')即可

相信你已经掌握了select的用法,接下来讲一下select_control怎么用。

它有四个参数,country_list代表要对比的国家,输入是一个列表,默认是中国;item_list是指标列表,默认是新增确诊;head_row是列举前多少行数据,默认20;is_exp,代表是否导出数据为excel格式,默认不导出,可改为is_exp = True,即导出。注意此时不再受head_row参数的影响。。

举个简单的例子select_control(['意大利','西班牙'],['inc_confirmed']) 代表对比意大利、西班牙、世界其他国家、全球累计确诊数据。

添加新评论