📓
Study
  • README
  • Application
    • Contest
      • 竞赛trick
  • Basic Know
    • 半监督学习
    • 贝叶斯
      • 朴素贝叶斯分类器
    • 对抗训练
    • 概率图模型
      • CRF
      • HMM
      • 概率图模型
    • 关联分析
    • 归纳偏置
      • [什么是 Inductive bias(归纳偏置)?](BasicKnow/归纳偏置/什么是 Inductive bias(归纳偏置)?.md)
    • 聚类
    • 决策树
    • 绿色深度学习
    • 树模型&集成学习
      • 提升树
      • Ada Boost
      • [集成学习]
    • 特征工程
      • 数据分桶
      • 特征工程概述
      • 特征选择
      • LDA
      • PCA
    • 线性模型
      • 感知机
      • 最大熵模型
      • SVM
        • SVM支持向量机
      • 逻辑回归
      • 线性回归
    • 优化算法
      • 拉格朗日对偶性
      • 牛顿法
        • 牛顿法&拟牛顿法
      • 梯度下降法
        • 梯度下降算法
      • 优化算法
    • 预处理
      • [1-1]正则表达式
      • [1-2]文本预处理
      • [1-3]词性
      • [1-4]语法分析
      • [1-6]文本分类
      • [1-7]网络爬取
      • 【备用】正则表达式
      • 7.re模块
      • 词典匹配
      • 分词
      • 子表达式
      • Todo
    • 主题模型
      • LDA
    • Deep Learning
      • 反向传播
      • 梯度消失&梯度爆炸
      • Batch Size
      • 1.DLbasis
      • 小概念
      • MLstrategy
      • CNN
      • RNN及其应用
      • 关于深度学习实践
      • 神经网络概述
      • Batch Normalization
      • Program CNN
      • Program D Lbasis
      • Program DN Nimprove
      • Program Neural Style Transfer
      • Summer DL
    • EM算法
    • GAN
      • Gans In Action Master
    • GNN
      • 搜广推之GNN
      • Representation Learning
        • Anomalydetection
        • Conclusion
        • Others
        • Papernotes
        • Recommadation
    • k近邻法
      • K近邻
    • Language Model
      • 语言模型解码采样策略
      • [1-1][语言模型]从N-gram模型讲起
      • [1-2][语言模型]NNLM(神经网络语言模型)
      • [1-3][语言模型]基于RNN的语言模型
      • [1-4][语言模型]用N-gram来做完形填空
      • [1-5][语言模型]用KenLM来做完形填空
    • Loss Function
      • 常用损失函数
      • Focal Loss
      • softmax+交叉熵
    • Machine Learning
      • [基础]概念
      • 待整合
      • 交叉验证
      • 无监督学习
      • 优缺点
      • ML Yearning
      • SVD
    • Statistics Math
      • 程序员的数学基础课
      • 数学基础
      • 统计&高数
      • 统计题目
      • 线性代数
      • 组合数学
      • Discrete Choice Model
      • Nested Choice Model
  • Course Note
    • 基于TensorFlow的机器学习速成课程
      • [Key ML Terminology](CourseNote/基于TensorFlow的机器学习速成课程/Key ML Terminology.md)
    • 集训营
      • 任务说明
      • 算法实践1.1模型构建
      • 算法实践1.2模型构建之集成模型
      • 算法实践2.1数据预处理
    • 李宏毅机器学习
      • 10DNN训练Tips
        • Chapter 18
      • 16无监督学习
        • Chapter 25
    • 贪心NLP
      • 贪心NLP笔记
    • Cs 224 N 2019
      • [A Simple But Tough To Beat Baseline For Sentence Embeddings](CourseNote/cs224n2019/A Simple but Tough-to-beat Baseline for Sentence Embeddings.md)
      • [Lecture 01 Introduction And Word Vectors](CourseNote/cs224n2019/Lecture 01 Introduction and Word Vectors.md)
      • [Lecture 02 Word Vectors 2 And Word Senses](CourseNote/cs224n2019/Lecture 02 Word Vectors 2 and Word Senses.md)
      • [Lecture 03 Word Window Classification Neural Networks And Matrix Calculus](CourseNote/cs224n2019/Lecture 03 Word Window Classification, Neural Networks, and Matrix Calculus.md)
      • [Lecture 04 Backpropagation And Computation Graphs](CourseNote/cs224n2019/Lecture 04 Backpropagation and Computation Graphs.md)
      • [Lecture 05 Linguistic Structure Dependency Parsing](CourseNote/cs224n2019/Lecture 05 Linguistic Structure Dependency Parsing.md)
      • [Lecture 06 The Probability Of A Sentence Recurrent Neural Networks And Language Models](CourseNote/cs224n2019/Lecture 06 The probability of a sentence Recurrent Neural Networks and Language Models.md)
      • Stanford NLP
    • Deep Learning Book Goodfellow
      • Books
        • Deep Learning Book Chapter Summaries Master
      • 提纲
      • C 5
      • C 6
      • [Part I Applied Math And Machine Learning Basics](CourseNote/Deep-Learning-Book-Goodfellow/Part I - Applied Math and Machine Learning basics.md)
    • Lihang
    • NLP实战高手课
      • 极客时间_NLP实战高手课
    • 工具&资料
    • 机器学习、深度学习面试知识点汇总
    • 七月kaggle课程
    • 算法工程师
    • 贪心科技机器学习必修知识点特训营
    • 唐宇迪机器学习
    • 语言及工具
    • AI技术内参
    • Suggestions
  • Data Related
    • 数据质量
      • 置信学习
    • 自然语言处理中的数据增广_车万翔
      • 自然语言处理中的数据增广
    • Mixup
    • 数据不均衡问题
    • 数据增强的方法
  • Knowledge Graph
    • Information Extraction
      • 联合抽取
        • PRGC
      • Code
        • BERT微调
      • NER
        • 阅读理解做NER
          • MRC
        • FLAT
        • Global Pointer
        • 命名实体识别NER
    • Keyword Extraction
      • 关键词抽取
    • 小米在知识表示学习的探索与实践
    • KG
  • Multi Task
    • EXT 5
      • Ex T 5
  • NLG
    • Dailogue
      • 比赛
        • 对话评估比赛
          • [simpread-DSTC10 开放领域对话评估比赛冠军方法总结](NLG/Dailogue/比赛/对话评估比赛/simpread-DSTC10 开放领域对话评估比赛冠军方法总结.md)
      • 任务型对话
        • DST
          • DST概述
        • NLG
          • NLG概述
        • NLU
          • NLU概述
        • 任务型对话概述
        • simpread-任务型对话系统预训练最新研究进展
      • 问答型对话
        • 检索式问答
          • 基于预训练模型的检索式对话系统
          • 检索式文本问答
        • 业界分享
          • 低资源场景下的知识图谱表示学习和问答_阿里_李杨
          • QQ浏览器搜索智能问答
        • 问答型对话系统概述
      • 闲聊型对话
        • 闲聊型对话系统概述
      • 业界分享
        • 人工智能与心理咨询
        • 腾讯多轮对话机器人
        • 微软小冰
        • 小布助手闲聊生成式算法
        • 美团智能客服实践_江会星
        • 去哪儿智能客服探索和实践
        • 实时语音对话场景下的算法实践_阿里_陈克寒
        • 智能语音交互中的无效query识别_小米_崔世起
        • UNIT智能对话
      • 主动对话
      • EVA
        • EVA分享
        • EVA模型
      • PLATO
      • RASA
    • Machine Translation
      • 业界分享
        • 爱奇艺台词翻译分享
      • Paper
        • Deep Encoder Shallow Decoder
    • RAGRelated
    • Text 2 SQL
      • M SQL
        • [M SQL 2](NLG/Text2SQL/M-SQL/M-SQL (2).md)
      • [Text2SQL Baseline解析](NLG/Text2SQL/Text2SQL Baseline解析.md)
      • Text 2 SQL
    • Text Summarization
      • [文本摘要][paper]CTRLSUM
      • 文本摘要
  • Pre Training
    • 业界分享
      • 超大语言模型与语言理解_黄民烈
        • 超大语言模型与语言理解
      • 大模型的加速算法_腾讯微信
        • 大模型的加速算法
      • 孟子轻量化预训练模型
      • 悟道文汇文图生成模型
      • 悟道文澜图文多模态大模型
      • 语义驱动可视化内容创造_微软
        • 语义驱动可视化内容创造
    • Base
      • Attention
      • Mask
        • NLP中的Mask
      • Position Encoding
        • 位置编码
    • BERT
      • ALBERT
      • Bert
        • Venv
          • Lib
            • Site Packages
              • idna-3.2.dist-info
                • LICENSE
              • Markdown-3.3.4.dist-info
                • LICENSE
              • Tensorflow
                • Include
                  • External
                    • Libjpeg Turbo
                      • LICENSE
                  • Unsupported
                    • Eigen
                      • CXX 11
                        • Src
                          • Tensor
              • Werkzeug
                • Debug
                  • Shared
                    • ICON LICENSE
        • CONTRIBUTING
        • Multilingual
      • Ro BER Ta
      • BERT
      • BERT面试问答
      • BERT源码解析
      • NSP BERT
    • BERT Flow
    • BERT Zip
      • Distilling The Knowledge In A Neural Network
      • TINYBERT
      • 模型压缩
    • CPM
    • CPT
      • 兼顾理解和生成的中文预训练模型CPT
    • ELECTRA
    • EL Mo
    • ERNIE系列语言模型
    • GPT
    • MBART
    • NEZHA
    • NLG Sum
      • [simpread-预训练时代下的文本生成|模型 & 技巧](Pre-training/NLGSum/simpread-预训练时代下的文本生成|模型 & 技巧.md)
    • Prompt
      • 预训练模型的提示学习方法_刘知远
        • 预训练模型的提示学习方法
    • T 5
      • Unified SKG
      • T 5
    • Transformer
    • Uni LM
    • XL Net
    • 预训练语言模型
    • BERT变种
  • Recsys
    • 多任务Multi-task&推荐
    • 推荐介绍
    • 推荐系统之召回与精排
      • 代码
        • Python
          • Recall
            • Deep Match Master
              • Docs
                • Source
                  • Examples
                  • FAQ
                  • Features
                  • History
                  • Model Methods
                  • Quick Start
    • 业界分享
      • 腾讯基于知识图谱长视频推荐
    • 召回
    • Sparrow Rec Sys
    • 深度学习推荐系统实战
    • 推荐模型
    • Deep FM
  • Search
    • 搜索
    • 业界分享
      • 爱奇艺搜索排序算法实践
      • 语义搜索技术和应用
    • 查询关键字理解
    • 搜索排序
    • BM 25
    • KDD21-淘宝搜索中语义向量检索技术
    • query理解
    • TFIDF
  • Self Supervised Learning
    • Contrastive Learning
      • 业界分享
        • 对比学习在微博内容表示的应用_张俊林
      • Paper
      • R Drop
      • Sim CSE
    • 自监督学习
  • Text Classification
    • [多标签分类(Multi-label Classification)](TextClassification/多标签分类(Multi-label Classification)/多标签分类(Multi-label Classification).md)
    • Fast Text
    • Text CNN
    • 文本分类
  • Text Matching
    • 文本匹配和多轮检索
    • CNN SIM
    • Word Embedding
      • Skip Gram
      • Glove
      • Word 2 Vec
    • 文本匹配概述
  • Tool
    • 埋点
    • 向量检索(Faiss等)
    • Bigdata
      • 大数据基础task1_创建虚拟机+熟悉linux
      • 任务链接
      • Mr
      • Task1参考答案
      • Task2参考答案
      • Task3参考答案
      • Task4参考答案
      • Task5参考答案
    • Docker
    • Elasticsearch
    • Keras
    • Numpy
    • Python
      • 可视化
        • Interactivegraphics
        • Matplotlib
        • Tkinter
        • Turtle
      • 数据类型
        • Datatype
      • python爬虫
        • Python Scraping Master
          • phantomjs-2.1.1-windows
        • Regularexp
        • Scrapying
        • Selenium
      • 代码优化
      • 一行代码
      • 用python进行语言检测
      • Debug
      • Exception
      • [Features Tricks](Tool/python/Features & Tricks.md)
      • Fileprocess
      • Format
      • Functional Programming
      • I Python
      • Magic
      • Math
      • Os
      • Others
      • Pandas
      • Python Datastructure
      • Python操作数据库
      • Streamlit
      • Time
    • Pytorch
      • Dive Into DL Py Torch
        • 02 Softmax And Classification
        • 03 Mlp
        • 04 Underfit Overfit
        • 05 Gradient Vanishing Exploding
        • 06 Text Preprocess
        • 07 Language Model
        • 08 Rnn Basics
        • 09 Machine Translation
        • 10 Attention Seq 2 Seq
        • 11 Transformer
        • 12 Cnn
        • 14 Batchnorm Resnet
        • 15 Convexoptim
        • 16 Gradientdescent
        • 17 Optim Advance
    • Spark
      • Pyspark
        • pyspark之填充缺失的时间数据
      • Spark
    • SQL
      • 数据库
      • Hive Sql
      • MySQL实战45讲
    • Tensor Flow
      • TensorFlow入门
  • Common
  • NLP知识体系
Powered by GitBook
On this page

Was this helpful?

  1. Tool
  2. Spark
  3. Pyspark

pyspark之填充缺失的时间数据

这里的场景是,原始数据有两个表示时间的字段:日期和小时,以及对应时间的数据值(比如某个网站的访问量,在凌晨一般没有,白天有)。只有数据值不为0的时候才会记录,因此数据值为0的时间段是没有的。但我们可能需要这些数据,因此就要用到填充功能。

下面会举一个例子来说明。

首先导入需要用到的包,这里的pyspark版本是2.2.0,python版本是2.7。

import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

创建一个spark会话(如果使用的是shell,不需要此步骤):

spark = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()

创建一个dataframe,有5列6行数据:

df = spark.createDataFrame(
    [(1, "a", 10, "2019-09-20", "1"), (2, "a", 20, "2019-09-20", "3"), (3, "a", 5, "2019-09-21", "5"), (4, "b", 8, "2019-09-20", "7"), (5, "b", 9, "2019-09-21", "9"), (6, "b", 16, "2019-09-21", "11")], 
    ["id", "category", "num", "d", "h"])

原始数据表示时间的两列是日期(d)和小时(h),将其转换为时间戳,在此之前,先将h列转换成整数类型:

df = df.withColumn("h", df["h"].cast(IntegerType()))
df = df.withColumn("time", F.from_unixtime((F.unix_timestamp(F.col("d"), "yyyy-MM-dd")+F.col("h")*3600)).cast("timestamp"))

看一看数据:

df.take(6)

输出:

[Row(id=1, category=u'a', num=10, d=u'2019-09-20', h=1, time=datetime.datetime(2019, 9, 20, 9, 0)),
 Row(id=2, category=u'a', num=20, d=u'2019-09-20', h=3, time=datetime.datetime(2019, 9, 20, 11, 0)),
 Row(id=3, category=u'a', num=5, d=u'2019-09-21', h=5, time=datetime.datetime(2019, 9, 21, 13, 0)),
 Row(id=4, category=u'b', num=8, d=u'2019-09-20', h=7, time=datetime.datetime(2019, 9, 20, 15, 0)),
 Row(id=5, category=u'b', num=9, d=u'2019-09-21', h=9, time=datetime.datetime(2019, 9, 21, 17, 0)),
 Row(id=6, category=u'b', num=16, d=u'2019-09-21', h=11, time=datetime.datetime(2019, 9, 21, 19, 0))]

可以发现,time列显示的时间并不是我们希望得到的时间,不过没关系,这个不影响我们数据的填充。

下面介绍两种方法来填充数据。

第一种方法,根据数据中的最小时间和最大时间,生成这个时间段的所有时间数据,再和原始表做left outer join。

先获取最小时间和最大时间

# 得到数据中的最小时间和最大时间,这里得到的minp和maxp是(1568941200, 1569063600),可以用python代码转换一下
minp, maxp = df.select(F.min("time").cast("long"), F.max("time").cast("long")).first()
# print(datetime.datetime.utcfromtimestamp(1568941200))
# 2019-09-20 01:00:00
# 结果和原始时间一样!神奇不!

根据最小时间和最大时间,以小时为单位,生成这个时间段的所有时间数据:

# 时间间隔,这里是以小时为单位,所以是60*60,即3600秒
step = 60 * 60  
reference = spark.range((minp / step) * step, ((maxp / step) + 1) * step, step).select(F.col("id").cast("timestamp").alias("time"))
reference.take(3)

输出:

[Row(time=datetime.datetime(2019, 9, 20, 9, 0)),
 Row(time=datetime.datetime(2019, 9, 20, 10, 0)),
 Row(time=datetime.datetime(2019, 9, 20, 11, 0))]

这里有两个category,a和b,假如我们希望对于每个category,都有完整的时间数据,要怎么做呢?那就要用到笛卡尔积了:

# 我们希望对于每个category,都有每个时间段的数据,因此需要将时间与category做笛卡尔积
cate = dftest.select('category').distinct()
reference2 = cate.crossJoin(reference)   # 笛卡尔积

笛卡尔积的结果就是所有我们需要的时间段数据,再将其与原始表做left outer join,就能得到我们想要的结果

df1 = reference2.join(df, ["category", "time"], "leftouter")

此时df1的前几行是这样的:

[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0), id=1, num=10, d=u'2019-09-20', h=1),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 10, 0), id=None, num=None, d=None, h=None),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 11, 0), id=2, num=20, d=u'2019-09-20', h=3),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 12, 0), id=None, num=None, d=None, h=None),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 13, 0), id=None, num=None, d=None, h=None)]

发现填充的数据的id,num,d,h都是空的,那么就需要补充这些数据的值了:

# 补id、num、d、h
df1 = df1.withColumn("d", F.to_date(F.col("time")).cast(StringType()))
df1 = df1.withColumn("h", F.hour(F.col("time")).cast(IntegerType()))
df1 = df1.fillna(0, subset=['num'])
df1 = df1.fillna(0, subset=['id'])

再来看看df1的前5行:

[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0), id=1, num=10, d=u'2019-09-20', h=1),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 10, 0), id=0, num=0, d=u'2019-09-20', h=2),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 11, 0), id=2, num=20, d=u'2019-09-20', h=3),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 12, 0), id=0, num=0, d=u'2019-09-20', h=4),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 13, 0), id=0, num=0, d=u'2019-09-20', h=5)]

可以发现,转换为d和h后,时间就变成我们想要的了,time和d、h看起来不是一个时间。。。事实上,写入hive表后,time也会变成我们想要的时间,这里显示的时间不准确,可能是有别的未可知原因。

用df1.count()查看,有70列数据。可以想一下,为什么是70行。

方法一需要生成所有category的所有时间段的数据,再和原始表join,在数据量很大的时候,效率比较低。方法二则是生成原始表所没有的数据,再和原始表做union。

方法二的主要思想是,通过每个category下time的排序,找出相邻两个time之间的缺失time,然后生成缺失time的数据。

首先针对每个category下的数据,得到该行数据对应的time的上一个已有的time:

# 同一个category的上一个时间
tempDf = df.withColumn('pre_time', F.lag(df['time']).over(Window.partitionBy("category").orderBy("time")))

得到此时间与上一个时间的时间差:

# 时间差
tempDf = tempDf.withColumn('diff', F.unix_timestamp(F.col("time"), "yyyy-MM-dd HH:mm:ss")-F.unix_timestamp(F.col("pre_time"), "yyyy-MM-dd HH:mm:ss"))

这里的时间差是以秒为单位的,当时间差为3600秒时,说明两个时间之间没有缺失的时间,大于3600秒时才有。因此,要针对这部分数据找出缺失的时间:

fill_dates = F.udf(lambda x,z:[x-y for y in range(3600, z, 3600)], ArrayType(IntegerType()))
tempDf = tempDf.filter(F.col("diff") > 3600)\
    .withColumn("next_dates", fill_dates(F.unix_timestamp(F.col("time")), F.col("diff")))

这里的fill_dates是一个udf函数,输入当前时间x,以及时间差z,以3600秒为步长,得到当前时间与上一个时间之间缺失的那些时间,即这里的next_dates,它是一个list。可以用explode函数将这个list拆分,得到多行数据:

tempDf = tempDf.withColumn("time", F.explode(F.col("next_dates")))

再做一些格式转换,以及d和h的生成,num和id的补充:

tempDf = tempDf.withColumn("time", F.col("time").cast(TimestampType()))\
    .withColumn("d", F.to_date(F.col("time")).cast(StringType()))\
    .withColumn("h", F.hour(F.col("time")).cast(IntegerType()))\
    .withColumn("num", F.lit("0")).withColumn("id", F.lit("0"))

看两行数据:

[Row(id=u'0', category=u'a', num=u'0', d=u'2019-09-20', h=2, time=datetime.datetime(2019, 9, 20, 10, 0), pre_time=datetime.datetime(2019, 9, 20, 9, 0), diff=7200, next_dates=[1568944800]),
 Row(id=u'0', category=u'a', num=u'0', d=u'2019-09-21', h=4, time=datetime.datetime(2019, 9, 21, 12, 0), pre_time=datetime.datetime(2019, 9, 20, 11, 0), diff=93600, next_dates=[1569038400, 1569034800, 1569031200, 1569027600, 1569024000, 1569020400, 1569016800, 1569013200, 1569009600, 1569006000, 1569002400, 1568998800, 1568995200, 1568991600, 1568988000, 1568984400, 1568980800, 1568977200, 1568973600, 1568970000, 1568966400, 1568962800, 1568959200, 1568955600, 1568952000])]

next_dates是时间戳格式。

再将这个表和原始表union一下就好了,注意要drop不需要的列:

tempDf = tempDf.drop(*['next_dates', 'diff', 'pre_time'])
df2 = df.union(tempDf)
df2.orderBy('category', 'time').select('category', 'time','id','num','d','h').take(5)

输出:

[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0), id=u'1', num=u'10', d=u'2019-09-20', h=1),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 10, 0), id=u'0', num=u'0', d=u'2019-09-20', h=2),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 11, 0), id=u'2', num=u'20', d=u'2019-09-20', h=3),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 12, 0), id=u'0', num=u'0', d=u'2019-09-20', h=4),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 13, 0), id=u'0', num=u'0', d=u'2019-09-20', h=5)]

df2.count()后发现只有58行。

原来这里会针对每个category有一个最大时间和最小时间,所以得到的结果数是比方法一少的。疏忽了!

如果想得到和方法一一样的结果,可以这么写:

minp, maxp = df.select(F.min("time").cast("long"), F.max("time").cast("long")).first()
newRow = spark.createDataFrame([(minp,),(maxp,)], ["time"])
newRow = newRow.withColumn('time', F.col("time").cast("timestamp"))
cate = df.select('category').distinct()
newRow = cate.crossJoin(newRow)   # 笛卡尔积
newRow.take(10)

先针对每个category,生成最小时间和最大时间的数据。这里有两个category,所以会有2*2=4行数据

输出:

[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0)),
 Row(category=u'b', time=datetime.datetime(2019, 9, 20, 9, 0)),
 Row(category=u'a', time=datetime.datetime(2019, 9, 21, 19, 0)),
 Row(category=u'b', time=datetime.datetime(2019, 9, 21, 19, 0))]

然后将生成的数据和原始表left join,得到其他字段(id, num, d, h)的值,这是为了保证对于df中已有的数据,newRow的相应行是一样的,后续union的时候可以去掉重复数据:

newRow = newRow.join(df, ['category', 'time'], "left")
newdf = df.select('category', 'time', 'id', 'num', 'd', 'h').union(newRow.select('category', 'time', 'id', 'num', 'd', 'h'))
newdf = newdf.distinct()
newdf = newdf.fillna(0, subset=['num'])
newdf = newdf.fillna(0, subset=['id'])
newdf = newdf.withColumn("d", F.to_date(F.col("time")).cast(StringType()))\
    .withColumn("h", F.hour(F.col("time")).cast(IntegerType()))
newdf.take(10)

输出:

[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 11, 0), id=2, num=20, d=u'2019-09-20', h=3),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0), id=1, num=10, d=u'2019-09-20', h=1),
 Row(category=u'b', time=datetime.datetime(2019, 9, 20, 15, 0), id=4, num=8, d=u'2019-09-20', h=7),
 Row(category=u'a', time=datetime.datetime(2019, 9, 21, 13, 0), id=3, num=5, d=u'2019-09-21', h=5),
 Row(category=u'b', time=datetime.datetime(2019, 9, 21, 17, 0), id=5, num=9, d=u'2019-09-21', h=9),
 Row(category=u'b', time=datetime.datetime(2019, 9, 21, 19, 0), id=6, num=16, d=u'2019-09-21', h=11),
 Row(category=u'a', time=datetime.datetime(2019, 9, 21, 19, 0), id=0, num=0, d=u'2019-09-21', h=11),
 Row(category=u'b', time=datetime.datetime(2019, 9, 20, 9, 0), id=0, num=0, d=u'2019-09-20', h=1)]

这样,每个category下都有了最小时间和最大时间的数据了。

再用和之前一样的方法:

fill_dates = F.udf(lambda x,z:[x-y for y in range(3600, z, 3600)], ArrayType(IntegerType()))
# 同一个category的上一个时间
tempDf = newdf.withColumn('pre_time', F.lag(newdf['time']).over(Window.partitionBy("category").orderBy("time")))
#时间差
tempDf = tempDf.withColumn('diff', F.unix_timestamp(F.col("time"), "yyyy-MM-dd HH:mm:ss")-F.unix_timestamp(F.col("pre_time"), "yyyy-MM-dd HH:mm:ss"))
tempDf = tempDf.filter(F.col("diff") > 3600)\
    .withColumn("next_dates", fill_dates(F.unix_timestamp(F.col("time")), F.col("diff")))\
    .withColumn("time", F.explode(F.col("next_dates")))\
    .withColumn("time", F.col("time").cast(TimestampType()))\
    .withColumn("d", F.to_date(F.col("time")).cast(StringType()))\
    .withColumn("h", F.hour(F.col("time")).cast(IntegerType()))\
    .withColumn("num", F.lit("0")).withColumn("id", F.lit("0"))
tempDf = tempDf.drop(*['next_dates', 'diff', 'pre_time'])
df3 = newdf.select('category', 'time', 'id', 'num', 'd', 'h').union(tempDf.select('category', 'time', 'id', 'num', 'd', 'h'))

此时df3.count()就是70行啦!

如果我们想要计算每个category的每一个时间点的前后1小时这个时间段(一共3个小时)的平均num,就可以这么做:

# 计算前后各1小时的平均num值,必须严格前后1小时
windowSpec = Window.partitionBy("category").orderBy("d", "h").rowsBetween(-1, 1)
df3 = df3.withColumn("movavg_sameday", F.avg("num").over(windowSpec))\
    .withColumn("movavg_sameday_data", F.collect_list("num").over(windowSpec))
df3.take(5)

注意这里要partitionBy,也就是分区计算,不然会出现两个category的时间混在一起被计算。

输出:

[Row(category=u'a', time=datetime.datetime(2019, 9, 20, 9, 0), id=u'1', num=u'10', d=u'2019-09-20', h=1, movavg_sameday=5.0, movavg_sameday_data=[u'10', u'0']),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 10, 0), id=u'0', num=u'0', d=u'2019-09-20', h=2, movavg_sameday=10.0, movavg_sameday_data=[u'10', u'0', u'20']),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 11, 0), id=u'2', num=u'20', d=u'2019-09-20', h=3, movavg_sameday=6.666666666666667, movavg_sameday_data=[u'0', u'20', u'0']),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 12, 0), id=u'0', num=u'0', d=u'2019-09-20', h=4, movavg_sameday=6.666666666666667, movavg_sameday_data=[u'20', u'0', u'0']),
 Row(category=u'a', time=datetime.datetime(2019, 9, 20, 13, 0), id=u'0', num=u'0', d=u'2019-09-20', h=5, movavg_sameday=0.0, movavg_sameday_data=[u'0', u'0', u'0'])]

Window是一个很有用的函数,可以用于取想要的窗口数据。

上述代码中的rowsBetween是指从当前行算起(当前行是第0行),某两行之间的窗口,比如这里是-1和1,也就是当前行的前一行和后一行之间的这三行。

还有一个方法是rangeBetween(x,y),是指当前行的某个字段,比如这里的num,取这个字段某个区间的那些数据,即num值处于[num+x, num+y]这个区间的那些行。

参考资料:

PreviousPysparkNextSQL

Last updated 3 years ago

Was this helpful?

https://stackoverflow.com/questions/42411184/filling-gaps-in-timeseries-spark