pyspark之填充缺失的时间数据
这里的场景是,原始数据有两个表示时间的字段:日期和小时,以及对应时间的数据值(比如某个网站的访问量,在凌晨一般没有,白天有)。只有数据值不为0的时候才会记录,因此数据值为0的时间段是没有的。但我们可能需要这些数据,因此就要用到填充功能。
下面会举一个例子来说明。
首先导入需要用到的包,这里的pyspark版本是2.2.0,python版本是2.7。
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window创建一个spark会话(如果使用的是shell,不需要此步骤):
spark = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()创建一个dataframe,有5列6行数据:
df = spark.createDataFrame(
[(1, "a", 10, "2019-09-20", "1"), (2, "a", 20, "2019-09-20", "3"), (3, "a", 5, "2019-09-21", "5"), (4, "b", 8, "2019-09-20", "7"), (5, "b", 9, "2019-09-21", "9"), (6, "b", 16, "2019-09-21", "11")],
["id", "category", "num", "d", "h"])原始数据表示时间的两列是日期(d)和小时(h),将其转换为时间戳,在此之前,先将h列转换成整数类型:
df = df.withColumn("h", df["h"].cast(IntegerType()))
df = df.withColumn("time", F.from_unixtime((F.unix_timestamp(F.col("d"), "yyyy-MM-dd")+F.col("h")*3600)).cast("timestamp"))看一看数据:
输出:
可以发现,time列显示的时间并不是我们希望得到的时间,不过没关系,这个不影响我们数据的填充。
下面介绍两种方法来填充数据。
第一种方法,根据数据中的最小时间和最大时间,生成这个时间段的所有时间数据,再和原始表做left outer join。
先获取最小时间和最大时间
根据最小时间和最大时间,以小时为单位,生成这个时间段的所有时间数据:
输出:
这里有两个category,a和b,假如我们希望对于每个category,都有完整的时间数据,要怎么做呢?那就要用到笛卡尔积了:
笛卡尔积的结果就是所有我们需要的时间段数据,再将其与原始表做left outer join,就能得到我们想要的结果
此时df1的前几行是这样的:
发现填充的数据的id,num,d,h都是空的,那么就需要补充这些数据的值了:
再来看看df1的前5行:
可以发现,转换为d和h后,时间就变成我们想要的了,time和d、h看起来不是一个时间。。。事实上,写入hive表后,time也会变成我们想要的时间,这里显示的时间不准确,可能是有别的未可知原因。
用df1.count()查看,有70列数据。可以想一下,为什么是70行。
方法一需要生成所有category的所有时间段的数据,再和原始表join,在数据量很大的时候,效率比较低。方法二则是生成原始表所没有的数据,再和原始表做union。
方法二的主要思想是,通过每个category下time的排序,找出相邻两个time之间的缺失time,然后生成缺失time的数据。
首先针对每个category下的数据,得到该行数据对应的time的上一个已有的time:
得到此时间与上一个时间的时间差:
这里的时间差是以秒为单位的,当时间差为3600秒时,说明两个时间之间没有缺失的时间,大于3600秒时才有。因此,要针对这部分数据找出缺失的时间:
这里的fill_dates是一个udf函数,输入当前时间x,以及时间差z,以3600秒为步长,得到当前时间与上一个时间之间缺失的那些时间,即这里的next_dates,它是一个list。可以用explode函数将这个list拆分,得到多行数据:
再做一些格式转换,以及d和h的生成,num和id的补充:
看两行数据:
next_dates是时间戳格式。
再将这个表和原始表union一下就好了,注意要drop不需要的列:
输出:
df2.count()后发现只有58行。
原来这里会针对每个category有一个最大时间和最小时间,所以得到的结果数是比方法一少的。疏忽了!
如果想得到和方法一一样的结果,可以这么写:
先针对每个category,生成最小时间和最大时间的数据。这里有两个category,所以会有2*2=4行数据
输出:
然后将生成的数据和原始表left join,得到其他字段(id, num, d, h)的值,这是为了保证对于df中已有的数据,newRow的相应行是一样的,后续union的时候可以去掉重复数据:
输出:
这样,每个category下都有了最小时间和最大时间的数据了。
再用和之前一样的方法:
此时df3.count()就是70行啦!
如果我们想要计算每个category的每一个时间点的前后1小时这个时间段(一共3个小时)的平均num,就可以这么做:
注意这里要partitionBy,也就是分区计算,不然会出现两个category的时间混在一起被计算。
输出:
Window是一个很有用的函数,可以用于取想要的窗口数据。
上述代码中的rowsBetween是指从当前行算起(当前行是第0行),某两行之间的窗口,比如这里是-1和1,也就是当前行的前一行和后一行之间的这三行。
还有一个方法是rangeBetween(x,y),是指当前行的某个字段,比如这里的num,取这个字段某个区间的那些数据,即num值处于[num+x, num+y]这个区间的那些行。
参考资料:
https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark
Last updated
Was this helpful?