📓
Study
  • README
  • Application
    • Contest
      • 竞赛trick
  • Basic Know
    • 半监督学习
    • 贝叶斯
      • 朴素贝叶斯分类器
    • 对抗训练
    • 概率图模型
      • CRF
      • HMM
      • 概率图模型
    • 关联分析
    • 归纳偏置
      • [什么是 Inductive bias(归纳偏置)?](BasicKnow/归纳偏置/什么是 Inductive bias(归纳偏置)?.md)
    • 聚类
    • 决策树
    • 绿色深度学习
    • 树模型&集成学习
      • 提升树
      • Ada Boost
      • [集成学习]
    • 特征工程
      • 数据分桶
      • 特征工程概述
      • 特征选择
      • LDA
      • PCA
    • 线性模型
      • 感知机
      • 最大熵模型
      • SVM
        • SVM支持向量机
      • 逻辑回归
      • 线性回归
    • 优化算法
      • 拉格朗日对偶性
      • 牛顿法
        • 牛顿法&拟牛顿法
      • 梯度下降法
        • 梯度下降算法
      • 优化算法
    • 预处理
      • [1-1]正则表达式
      • [1-2]文本预处理
      • [1-3]词性
      • [1-4]语法分析
      • [1-6]文本分类
      • [1-7]网络爬取
      • 【备用】正则表达式
      • 7.re模块
      • 词典匹配
      • 分词
      • 子表达式
      • Todo
    • 主题模型
      • LDA
    • Deep Learning
      • 反向传播
      • 梯度消失&梯度爆炸
      • Batch Size
      • 1.DLbasis
      • 小概念
      • MLstrategy
      • CNN
      • RNN及其应用
      • 关于深度学习实践
      • 神经网络概述
      • Batch Normalization
      • Program CNN
      • Program D Lbasis
      • Program DN Nimprove
      • Program Neural Style Transfer
      • Summer DL
    • EM算法
    • GAN
      • Gans In Action Master
    • GNN
      • 搜广推之GNN
      • Representation Learning
        • Anomalydetection
        • Conclusion
        • Others
        • Papernotes
        • Recommadation
    • k近邻法
      • K近邻
    • Language Model
      • 语言模型解码采样策略
      • [1-1][语言模型]从N-gram模型讲起
      • [1-2][语言模型]NNLM(神经网络语言模型)
      • [1-3][语言模型]基于RNN的语言模型
      • [1-4][语言模型]用N-gram来做完形填空
      • [1-5][语言模型]用KenLM来做完形填空
    • Loss Function
      • 常用损失函数
      • Focal Loss
      • softmax+交叉熵
    • Machine Learning
      • [基础]概念
      • 待整合
      • 交叉验证
      • 无监督学习
      • 优缺点
      • ML Yearning
      • SVD
    • Statistics Math
      • 程序员的数学基础课
      • 数学基础
      • 统计&高数
      • 统计题目
      • 线性代数
      • 组合数学
      • Discrete Choice Model
      • Nested Choice Model
  • Course Note
    • 基于TensorFlow的机器学习速成课程
      • [Key ML Terminology](CourseNote/基于TensorFlow的机器学习速成课程/Key ML Terminology.md)
    • 集训营
      • 任务说明
      • 算法实践1.1模型构建
      • 算法实践1.2模型构建之集成模型
      • 算法实践2.1数据预处理
    • 李宏毅机器学习
      • 10DNN训练Tips
        • Chapter 18
      • 16无监督学习
        • Chapter 25
    • 贪心NLP
      • 贪心NLP笔记
    • Cs 224 N 2019
      • [A Simple But Tough To Beat Baseline For Sentence Embeddings](CourseNote/cs224n2019/A Simple but Tough-to-beat Baseline for Sentence Embeddings.md)
      • [Lecture 01 Introduction And Word Vectors](CourseNote/cs224n2019/Lecture 01 Introduction and Word Vectors.md)
      • [Lecture 02 Word Vectors 2 And Word Senses](CourseNote/cs224n2019/Lecture 02 Word Vectors 2 and Word Senses.md)
      • [Lecture 03 Word Window Classification Neural Networks And Matrix Calculus](CourseNote/cs224n2019/Lecture 03 Word Window Classification, Neural Networks, and Matrix Calculus.md)
      • [Lecture 04 Backpropagation And Computation Graphs](CourseNote/cs224n2019/Lecture 04 Backpropagation and Computation Graphs.md)
      • [Lecture 05 Linguistic Structure Dependency Parsing](CourseNote/cs224n2019/Lecture 05 Linguistic Structure Dependency Parsing.md)
      • [Lecture 06 The Probability Of A Sentence Recurrent Neural Networks And Language Models](CourseNote/cs224n2019/Lecture 06 The probability of a sentence Recurrent Neural Networks and Language Models.md)
      • Stanford NLP
    • Deep Learning Book Goodfellow
      • Books
        • Deep Learning Book Chapter Summaries Master
      • 提纲
      • C 5
      • C 6
      • [Part I Applied Math And Machine Learning Basics](CourseNote/Deep-Learning-Book-Goodfellow/Part I - Applied Math and Machine Learning basics.md)
    • Lihang
    • NLP实战高手课
      • 极客时间_NLP实战高手课
    • 工具&资料
    • 机器学习、深度学习面试知识点汇总
    • 七月kaggle课程
    • 算法工程师
    • 贪心科技机器学习必修知识点特训营
    • 唐宇迪机器学习
    • 语言及工具
    • AI技术内参
    • Suggestions
  • Data Related
    • 数据质量
      • 置信学习
    • 自然语言处理中的数据增广_车万翔
      • 自然语言处理中的数据增广
    • Mixup
    • 数据不均衡问题
    • 数据增强的方法
  • Knowledge Graph
    • Information Extraction
      • 联合抽取
        • PRGC
      • Code
        • BERT微调
      • NER
        • 阅读理解做NER
          • MRC
        • FLAT
        • Global Pointer
        • 命名实体识别NER
    • Keyword Extraction
      • 关键词抽取
    • 小米在知识表示学习的探索与实践
    • KG
  • Multi Task
    • EXT 5
      • Ex T 5
  • NLG
    • Dailogue
      • 比赛
        • 对话评估比赛
          • [simpread-DSTC10 开放领域对话评估比赛冠军方法总结](NLG/Dailogue/比赛/对话评估比赛/simpread-DSTC10 开放领域对话评估比赛冠军方法总结.md)
      • 任务型对话
        • DST
          • DST概述
        • NLG
          • NLG概述
        • NLU
          • NLU概述
        • 任务型对话概述
        • simpread-任务型对话系统预训练最新研究进展
      • 问答型对话
        • 检索式问答
          • 基于预训练模型的检索式对话系统
          • 检索式文本问答
        • 业界分享
          • 低资源场景下的知识图谱表示学习和问答_阿里_李杨
          • QQ浏览器搜索智能问答
        • 问答型对话系统概述
      • 闲聊型对话
        • 闲聊型对话系统概述
      • 业界分享
        • 人工智能与心理咨询
        • 腾讯多轮对话机器人
        • 微软小冰
        • 小布助手闲聊生成式算法
        • 美团智能客服实践_江会星
        • 去哪儿智能客服探索和实践
        • 实时语音对话场景下的算法实践_阿里_陈克寒
        • 智能语音交互中的无效query识别_小米_崔世起
        • UNIT智能对话
      • 主动对话
      • EVA
        • EVA分享
        • EVA模型
      • PLATO
      • RASA
    • Machine Translation
      • 业界分享
        • 爱奇艺台词翻译分享
      • Paper
        • Deep Encoder Shallow Decoder
    • RAGRelated
    • Text 2 SQL
      • M SQL
        • [M SQL 2](NLG/Text2SQL/M-SQL/M-SQL (2).md)
      • [Text2SQL Baseline解析](NLG/Text2SQL/Text2SQL Baseline解析.md)
      • Text 2 SQL
    • Text Summarization
      • [文本摘要][paper]CTRLSUM
      • 文本摘要
  • Pre Training
    • 业界分享
      • 超大语言模型与语言理解_黄民烈
        • 超大语言模型与语言理解
      • 大模型的加速算法_腾讯微信
        • 大模型的加速算法
      • 孟子轻量化预训练模型
      • 悟道文汇文图生成模型
      • 悟道文澜图文多模态大模型
      • 语义驱动可视化内容创造_微软
        • 语义驱动可视化内容创造
    • Base
      • Attention
      • Mask
        • NLP中的Mask
      • Position Encoding
        • 位置编码
    • BERT
      • ALBERT
      • Bert
        • Venv
          • Lib
            • Site Packages
              • idna-3.2.dist-info
                • LICENSE
              • Markdown-3.3.4.dist-info
                • LICENSE
              • Tensorflow
                • Include
                  • External
                    • Libjpeg Turbo
                      • LICENSE
                  • Unsupported
                    • Eigen
                      • CXX 11
                        • Src
                          • Tensor
              • Werkzeug
                • Debug
                  • Shared
                    • ICON LICENSE
        • CONTRIBUTING
        • Multilingual
      • Ro BER Ta
      • BERT
      • BERT面试问答
      • BERT源码解析
      • NSP BERT
    • BERT Flow
    • BERT Zip
      • Distilling The Knowledge In A Neural Network
      • TINYBERT
      • 模型压缩
    • CPM
    • CPT
      • 兼顾理解和生成的中文预训练模型CPT
    • ELECTRA
    • EL Mo
    • ERNIE系列语言模型
    • GPT
    • MBART
    • NEZHA
    • NLG Sum
      • [simpread-预训练时代下的文本生成|模型 & 技巧](Pre-training/NLGSum/simpread-预训练时代下的文本生成|模型 & 技巧.md)
    • Prompt
      • 预训练模型的提示学习方法_刘知远
        • 预训练模型的提示学习方法
    • T 5
      • Unified SKG
      • T 5
    • Transformer
    • Uni LM
    • XL Net
    • 预训练语言模型
    • BERT变种
  • Recsys
    • 多任务Multi-task&推荐
    • 推荐介绍
    • 推荐系统之召回与精排
      • 代码
        • Python
          • Recall
            • Deep Match Master
              • Docs
                • Source
                  • Examples
                  • FAQ
                  • Features
                  • History
                  • Model Methods
                  • Quick Start
    • 业界分享
      • 腾讯基于知识图谱长视频推荐
    • 召回
    • Sparrow Rec Sys
    • 深度学习推荐系统实战
    • 推荐模型
    • Deep FM
  • Search
    • 搜索
    • 业界分享
      • 爱奇艺搜索排序算法实践
      • 语义搜索技术和应用
    • 查询关键字理解
    • 搜索排序
    • BM 25
    • KDD21-淘宝搜索中语义向量检索技术
    • query理解
    • TFIDF
  • Self Supervised Learning
    • Contrastive Learning
      • 业界分享
        • 对比学习在微博内容表示的应用_张俊林
      • Paper
      • R Drop
      • Sim CSE
    • 自监督学习
  • Text Classification
    • [多标签分类(Multi-label Classification)](TextClassification/多标签分类(Multi-label Classification)/多标签分类(Multi-label Classification).md)
    • Fast Text
    • Text CNN
    • 文本分类
  • Text Matching
    • 文本匹配和多轮检索
    • CNN SIM
    • Word Embedding
      • Skip Gram
      • Glove
      • Word 2 Vec
    • 文本匹配概述
  • Tool
    • 埋点
    • 向量检索(Faiss等)
    • Bigdata
      • 大数据基础task1_创建虚拟机+熟悉linux
      • 任务链接
      • Mr
      • Task1参考答案
      • Task2参考答案
      • Task3参考答案
      • Task4参考答案
      • Task5参考答案
    • Docker
    • Elasticsearch
    • Keras
    • Numpy
    • Python
      • 可视化
        • Interactivegraphics
        • Matplotlib
        • Tkinter
        • Turtle
      • 数据类型
        • Datatype
      • python爬虫
        • Python Scraping Master
          • phantomjs-2.1.1-windows
        • Regularexp
        • Scrapying
        • Selenium
      • 代码优化
      • 一行代码
      • 用python进行语言检测
      • Debug
      • Exception
      • [Features Tricks](Tool/python/Features & Tricks.md)
      • Fileprocess
      • Format
      • Functional Programming
      • I Python
      • Magic
      • Math
      • Os
      • Others
      • Pandas
      • Python Datastructure
      • Python操作数据库
      • Streamlit
      • Time
    • Pytorch
      • Dive Into DL Py Torch
        • 02 Softmax And Classification
        • 03 Mlp
        • 04 Underfit Overfit
        • 05 Gradient Vanishing Exploding
        • 06 Text Preprocess
        • 07 Language Model
        • 08 Rnn Basics
        • 09 Machine Translation
        • 10 Attention Seq 2 Seq
        • 11 Transformer
        • 12 Cnn
        • 14 Batchnorm Resnet
        • 15 Convexoptim
        • 16 Gradientdescent
        • 17 Optim Advance
    • Spark
      • Pyspark
        • pyspark之填充缺失的时间数据
      • Spark
    • SQL
      • 数据库
      • Hive Sql
      • MySQL实战45讲
    • Tensor Flow
      • TensorFlow入门
  • Common
  • NLP知识体系
Powered by GitBook
On this page
  • Pyspark在zeus上的使用
  • dataframe转rdd方式,再调用python函数
  • pyspark写入hive表问题
  • Pyspark的缺失值填充
  • 均值填充问题 Apr 1, 2021
  • pyspark归一化
  • 新建列(sum)
  • UDF(User Defined Function)
  • 入门教程
  • 默认返回类型Apr 1, 2021
  • udf函数传入多个参数
  • 分区partition
  • 参考资料

Was this helpful?

  1. Tool
  2. Spark

Pyspark

Pyspark在zeus上的使用

1.基本用法:

today='${today}'
spark-submit --conf spark.yarn.tags=router,spark2.2.0,streaming --conf spark.driver.maxResultSize=4g --executor-memory 18G \
test.py $today;

其中today是传入的参数,在test.py里用sys.argv[1]取

在开头加一行set -e,判断 spark-submit执行是否成功

2.如果使用Python3,加上参数

--conf spark.pyspark.python=/usr/bin/python3.6 \

--conf spark.pyspark.driver.python=/usr/bin/python3.6 \

3.报内存不够的错误时,可以尝试:

1)添加配置--conf spark.sql.adaptive.enabled=false --conf spark.yarn.executor.memoryOverhead=12g

2)代码拆分成多个部分,分开跑

3)添加配置driver-memory 2560m

4.zeus上没有的Python包,可以本地将其打包成.zip文件,上传到zeus,在代码里添加:

(以pypinyin这个包为例)

1  from pyspark.conf import SparkConf
2  conf = SparkConf()
3  conf.setAppName("Spark Stream")
4  sc = SparkContext(conf=conf)
5  sc.addPyFile("pypinyin.zip")
6  from pypinyin import Style, pinyin, lazy_pinyin, load_phrases_dict

即可使用import的方法

dataframe转rdd方式,再调用python函数

from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.types import *
 
def run(sent):
    gspoiid, taid, lang, taname, poiname, poiename, taename = \
        sent['gspoiid'], sent['taid'], sent['lang'], sent['taname'], sent['poiname'], sent['poiename'], sent['taename']
    res_list = []
    judgelang, problem = isnoen_needclean(taname, lang)
    res_list.append([gspoiid, taid, lang, taname, poiname, poiename, taename, judgelang, problem])
    return res_list
 
 
def df2hive(df,option,table,patition = 'd'):
    if option in ['append','overwrite']:
        df.write.format("hive").mode(option).saveAsTable(table)
    else:
        df.write.format("hive").partitionBy(patition).mode("append").saveAsTable(table)
    print('write df into ',table)
 
spark = SparkSession.builder.appName("run lang clean combine").enableHiveSupport().getOrCreate()
inputtable = str(sys.argv[1])
outputtable = str(sys.argv[2])
 
df = spark.table(inputtable)
 
df = df.repartition(5000)
df = df.rdd.map(lambda x: x.asDict())
y = df.flatMap(run)
# gspoiid, taid, lang, taname, poiname, poiename, taename
schema = StructType(
    [StructField("gspoiid", LongType(), True), StructField("taid", LongType(), True),
     StructField("lang", StringType(), True), StructField("taname", StringType(), True),
     StructField("poiname", StringType(), True),
     StructField("poiename", StringType(), True), StructField("taename", StringType(), True),
     StructField("judgelang", StringType(), True), StructField("problem", StringType(), True)])
df = spark.createDataFrame(y, schema)
df = df.dropDuplicates()
 
idcounts = df.groupBy('taid', 'lang').count()
df = df.join(idcounts, ['taid', 'lang'], 'left')
 
df2hive(df, 'overwrite', outputtable)

inputdata = spark.sql(sql).repartition(2001) //sql是从hive表中读取数据的sql语句,注意此处一定要进行repartition,否则默认读入的分区可能很小从而发挥不了分布式计算的优势

ret = inputdata.rdd.map(lambda x: (x["coid"], main_api(x.asDict(), 1, "content", 0, 0)["score"]))

pyspark写入hive表问题

Pyspark写入问题:源于我在overwrite table某个分区的时候,居然把之前的分区都删掉了,只剩下了最新的分区。问了相关的维护人员,这个数据无法恢复,因为是datasource方式写入的,只有hive table方式的数据删除后是进入trash,可以恢复。2021/03/30-2021/03/31

df.write.format(format).partitionBy('d').mode(option).saveAsTable(outputTable)

  1. format='hive'

    1. table是hive上建的正式表,不管option是overwrite还是append都无法写入,Can't create table under official database

    2. table是hive上建的临时表,不管option是overwrite还是append都可以写入,用overwrite的话也会直接重建一张表

    3. table是未建的正式表,不管option是overwrite还是append都无法写入,Can't create table under official database

    4. table是未建的临时表,不管option是overwrite还是append都可以写入

  2. format='orc'

    1. table是hive上建的正式表,append和orc会报类型不匹配的错误,overwrite和orc会直接重新写入一张表,原来的表就没了。

    2. table是未建的正式表,overwrite多次写入的话只会保留最新写入的分区。append多次写入的话会保留每个分区。第一次append后续overwrite也只会保留最新写入的分区。

总结:注意几个事情

  1. 最好是pyspark写入临时表,再用hive将临时表的数据写入正式表。

  2. 在hive上建的正式表,通过pyspark是无法写入的(应该是公司有一些限制),如果是临时表就可以写入。

  3. pyspark写入的方式分为orc和hive,两者都可以用,但是新建正式表不能用hive的方式(权限限制),临时表可以。

  4. pyspark里直接写入正式表的话,如果不小心删除了就无法恢复的(目前为止是这样)。这样的表如果要操作分区,一定要指定分区的具体值。

对于一张pyspark新建写入的分区表,在hive里执行insert overwrite table partition(d) select * from table,会直接只保留新写入的分区。(如果table之前是以hive方式写入的则不会覆盖)

insert overwrite table tablename partition(d='2021-03-29')

select 字段名称(不包括分区字段) from tablename where d='2021-03-29';

使用上述语句写入则不会覆盖。

orc方式写入似乎比hive方式速度更快

Pyspark的缺失值填充

均值填充问题 Apr 1, 2021

源于发现用户特征中有很多NULL值,是无意义的,需要处理。但是之前的代码中其实已经有填充缺失值了,说明这个填充没有生效。因此找原因。

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as func
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6.5"
spark = SparkSession.builder.appName("poiranking_userfeatures_v3").enableHiveSupport().getOrCreate()
# 测试spark能不能跑通,如果这里不报错有输出就可以
spark.sparkContext.parallelize(range(0, 10)).count()

df = spark.table("table_name")
# 看看有多少个缺失值
df.where("price_pref is null").count()
# 取出5行数据观察下
df.where("price_pref is null").take(5)
# 可以查看df的列的数据类型
df.schema

# 缺失值填充的函数,其实这里把where条件去掉也可以,func.mean计算均值的时候不会把空值计算进去。
def fill_na_by_mean(df, fill_col):
    mean = df.where(col(fill_col).isNotNull()).select(func.mean(fill_col)).collect()[0][0]
    # 第一种方式,传入字典填充
    df1 = df.fillna({fill_col: mean})
    # 第二种方式,指定subset
    df2 = df.fillna(mean, subset=[fill_col])
    return df1

# 填充缺失值
fill_col = 'price_pref'
df = fill_na_by_mean(df, fill_col)
# 看看还有没有缺失值
df.where("price_pref is null").count()

df.fillna() 也可以写作 df.na.fill()。

大致来说有三种用法:

  • df.fillna(value),value可以是int, float, string, bool类型,这样的话df的所有列的缺失值都会被填充为value值。

  • df.fillna({colname1:value1, colname2:value2}),以字典的形式传入列名和相应的需要填充的值

  • df.fillna(value, subset=[colname1,colname2]), 传入value以及需要填充的列的list。注意这种方式要求subset中的列的数据类型和value的数据类型相同,数据类型不相同的列不予填充(不会报错,而是直接忽略)。

之前的填充代码之所以未生效就是因为使用了第三种方法,而正好数据类型确实不匹配(value是float类型,col是string类型),我改成第二种方式之后就生效了。(还记得吗?查看df的列的数据类型的方法是df.schema)

pyspark归一化

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)
columns_to_scale = ["x", "y", "z"]

# 归一化
columns_to_scale = [i for i in feature_df.columns if i not in ['officialpoiid', 'poiname', 'businessid', 'districtid', 'isopen', 'isfree',
                                            'isfee','isHasCoverImage','addressAndCoorScore']]
#columns_to_scale = ['lncommenttotalscore']
unlist = udf(lambda x: float(list(x)[0]), DoubleType())
df = feature_df
df = df.fillna(0, columns_to_scale)
for i in columns_to_scale:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_norm")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df = pipeline.fit(df).transform(df).withColumn(i+"_norm", unlist(i+"_norm")).drop(i+"_Vect")
df.show()

自己实现

for i in columns_to_scale:
    maxval = df.select(F.max(i)).collect()[0][0]
    minval = df.select(F.min(i)).collect()[0][0]
    df = df.withColumn(i[2:]+"_norm", (df[i]-minval)/(maxval-minval))

参考:

新建列(sum)

tmpdf = tmpdf.withColumn("poiscore", sum(tmpdf[col] for col in tmpcols))

UDF(User Defined Function)

入门教程

为什么我们需要用UDF呢?原因主要在于UDF可以重用。就像Python里我们定义函数一样,可以反复使用它。为了演示UDF的用法,下面我们举例说明。

  1. 创建一个DataFrame

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

输出

+-----+------------+
|Seqno|Names       |
+-----+------------+
|1    |john jones  |
|2    |tracey smith|
|3    |amy sanders |
+-----+------------+
  1. 创建一个Python函数

def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr
  1. 转成UDF

""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z),StringType())

""" Converting function to UDF 
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z))
  1. 在DataFrame中使用

可以在select()方法中使用:

df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

输出

+-----+-------------+
|Seqno|Name         |
+-----+-------------+
|1    |John Jones   |
|2    |Tracey Smith |
|3    |Amy Sanders  |
+-----+-------------+

也可以在withColumn()方法中使用(这里用了个新的UDF):

def upperCase(str):
    return str.upper()

upperCaseUDF = udf(lambda z:upperCase(z),StringType())   

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))) \
  .show(truncate=False)

来看看效果

+-----+------------+-------------+
|Seqno|Name        |Cureated Name|
+-----+------------+-------------+
|1    |john jones  |JOHN JONES   |
|2    |tracey smith|TRACEY SMITH |
|3    |amy sanders |AMY SANDERS  |
+-----+------------+-------------+

还可以注册这个函数,将它用到SQL里

""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
     .show(truncate=False)

其实可以直接在定义函数的时候将其转成UDF

@udf(returnType=StringType()) 
def upperCase(str):
    return str.upper()

df.withColumn("Cureated Name", upperCase(col("Name"))) \
.show(truncate=False)

默认返回类型Apr 1, 2021

from pyspark.sql.types import *
from pyspark.sql.functions import *

# 定义元素相乘的udf
multiplyFunc = udf(lambda e1, e2: e1*e2)
# A,B两列都是Float类型的
# 这样得到的C列是String类型的
df.withColumn("C", multiplyFunc(col('A'), col('B'))).schema
# 这样得到的C列是Float类型的
df.withColumn("C", col('A')*col('B')).schema

为什么呢?很简单,udf需要设置一个返回类型,但是这里没设置,默认返回的是StringType()。所以最好的方式是指定返回类型,也就是:

multiplyFunc = udf(lambda e1, e2: e1*e2, FloatType())

udf函数传入多个参数

def generate_udf(constant_var):
    def test(col1, col2):
        if col1 == col2:
            return col1
        else:
            return constant_var
    return f.udf(test, StringType())

df = df.withColumn('new_column', 
                   generate_udf('default_value')(f.col('col1'), f.col('col2')))

返回多列

schema = StructType([
    StructField("probability", VectorUDT(), False),
    StructField("prediction", DoubleType(), False)
])
@udf(returnType=schema)
def keywords_prob(keywords, prob, prediction):
    keywords_split = keywords.split("@@")
    keywords_notept = [(keywords_split[i], i) for i in range(len(keywords_split)) if keywords_split[i] != ""]
    type_words_dict = {0: 2, 1: 3, 2: 5, 3: 92, 4: 93}
    if len(keywords_notept) == 1:
        type = type_words_dict[keywords_notept[0][1]]
        label = type_label_dict[type]
        label_int = int(label)
        new_prob = list([float(x) for x in prob])
        new_prob[label_int] = 0.99
        new_prob[~label_int] = 0.0
        return (Vectors.dense(new_prob), label)
    return (prob, prediction)

mistakes = mistakes.withColumn("new", keywords_prob(mistakes["keywords"], mistakes["probability"], mistakes["prediction"]))
mistakes = mistakes.withColumn("newprobability", mistakes["new.probability"])\
    .withColumn("newprediction", mistakes["new.prediction"]).drop("new")

分区partition

参考资料

PreviousSparkNextpyspark之填充缺失的时间数据

Last updated 2 years ago

Was this helpful?

这里说一下pyspark的DataFrame.fillna这个方法,下面是它的:

官方文档介绍
https://stackoverflow.com/questions/60281354/apply-minmaxscaler-on-multiple-columns-in-pyspark
https://stackoverflow.com/questions/40337744/scalenormalise-a-column-in-spark-dataframe-pyspark
How to add column sum as new column in PySpark dataframe ?
编写Spark程序的几个优化点
spark partition 理解 / coalesce 与 repartition的区别
https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/