Pyspark
Pyspark在zeus上的使用
1.基本用法:
today='${today}'
spark-submit --conf spark.yarn.tags=router,spark2.2.0,streaming --conf spark.driver.maxResultSize=4g --executor-memory 18G \
test.py $today;
其中today是传入的参数,在test.py里用sys.argv[1]取
在开头加一行set -e,判断 spark-submit执行是否成功
2.如果使用Python3,加上参数
--conf spark.pyspark.python=/usr/bin/python3.6 \
--conf spark.pyspark.driver.python=/usr/bin/python3.6 \
3.报内存不够的错误时,可以尝试:
1)添加配置--conf spark.sql.adaptive.enabled=false --conf spark.yarn.executor.memoryOverhead=12g
2)代码拆分成多个部分,分开跑
3)添加配置driver-memory 2560m
4.zeus上没有的Python包,可以本地将其打包成.zip文件,上传到zeus,在代码里添加:
(以pypinyin这个包为例)
1 from pyspark.conf import SparkConf
2 conf = SparkConf()
3 conf.setAppName("Spark Stream")
4 sc = SparkContext(conf=conf)
5 sc.addPyFile("pypinyin.zip")
6 from pypinyin import Style, pinyin, lazy_pinyin, load_phrases_dict
即可使用import的方法
dataframe转rdd方式,再调用python函数
inputdata = spark.sql(sql).repartition(2001) //sql是从hive表中读取数据的sql语句,注意此处一定要进行repartition,否则默认读入的分区可能很小从而发挥不了分布式计算的优势
ret = inputdata.rdd.map(lambda x: (x["coid"], main_api(x.asDict(), 1, "content", 0, 0)["score"]))
pyspark写入hive表问题
Pyspark写入问题:源于我在overwrite table某个分区的时候,居然把之前的分区都删掉了,只剩下了最新的分区。问了相关的维护人员,这个数据无法恢复,因为是datasource方式写入的,只有hive table方式的数据删除后是进入trash,可以恢复。2021/03/30-2021/03/31
df.write.format(format).partitionBy('d').mode(option).saveAsTable(outputTable)
format='hive'
table是hive上建的正式表,不管option是overwrite还是append都无法写入,Can't create table under official database
table是hive上建的临时表,不管option是overwrite还是append都可以写入,用overwrite的话也会直接重建一张表
table是未建的正式表,不管option是overwrite还是append都无法写入,Can't create table under official database
table是未建的临时表,不管option是overwrite还是append都可以写入
format='orc'
table是hive上建的正式表,append和orc会报类型不匹配的错误,overwrite和orc会直接重新写入一张表,原来的表就没了。
table是未建的正式表,overwrite多次写入的话只会保留最新写入的分区。append多次写入的话会保留每个分区。第一次append后续overwrite也只会保留最新写入的分区。
总结:注意几个事情
最好是pyspark写入临时表,再用hive将临时表的数据写入正式表。
在hive上建的正式表,通过pyspark是无法写入的(应该是公司有一些限制),如果是临时表就可以写入。
pyspark写入的方式分为orc和hive,两者都可以用,但是新建正式表不能用hive的方式(权限限制),临时表可以。
pyspark里直接写入正式表的话,如果不小心删除了就无法恢复的(目前为止是这样)。这样的表如果要操作分区,一定要指定分区的具体值。
对于一张pyspark新建写入的分区表,在hive里执行insert overwrite table partition(d) select * from table,会直接只保留新写入的分区。(如果table之前是以hive方式写入的则不会覆盖)
insert overwrite table tablename partition(d='2021-03-29')
select 字段名称(不包括分区字段) from tablename where d='2021-03-29';
使用上述语句写入则不会覆盖。
orc方式写入似乎比hive方式速度更快
Pyspark的缺失值填充
均值填充问题 Apr 1, 2021
源于发现用户特征中有很多NULL值,是无意义的,需要处理。但是之前的代码中其实已经有填充缺失值了,说明这个填充没有生效。因此找原因。
这里说一下pyspark的DataFrame.fillna这个方法,下面是它的官方文档介绍:

df.fillna() 也可以写作 df.na.fill()。
大致来说有三种用法:
df.fillna(value),value可以是int, float, string, bool类型,这样的话df的所有列的缺失值都会被填充为value值。df.fillna({colname1:value1, colname2:value2}),以字典的形式传入列名和相应的需要填充的值df.fillna(value, subset=[colname1,colname2]), 传入value以及需要填充的列的list。注意这种方式要求subset中的列的数据类型和value的数据类型相同,数据类型不相同的列不予填充(不会报错,而是直接忽略)。
之前的填充代码之所以未生效就是因为使用了第三种方法,而正好数据类型确实不匹配(value是float类型,col是string类型),我改成第二种方式之后就生效了。(还记得吗?查看df的列的数据类型的方法是df.schema)
pyspark归一化
自己实现
参考:
https://stackoverflow.com/questions/60281354/apply-minmaxscaler-on-multiple-columns-in-pyspark
https://stackoverflow.com/questions/40337744/scalenormalise-a-column-in-spark-dataframe-pyspark
新建列(sum)
How to add column sum as new column in PySpark dataframe ?
UDF(User Defined Function)
入门教程
为什么我们需要用UDF呢?原因主要在于UDF可以重用。就像Python里我们定义函数一样,可以反复使用它。为了演示UDF的用法,下面我们举例说明。
创建一个DataFrame
输出
创建一个Python函数
转成UDF
在DataFrame中使用
可以在select()方法中使用:
输出
也可以在withColumn()方法中使用(这里用了个新的UDF):
来看看效果
还可以注册这个函数,将它用到SQL里
其实可以直接在定义函数的时候将其转成UDF
默认返回类型Apr 1, 2021
为什么呢?很简单,udf需要设置一个返回类型,但是这里没设置,默认返回的是StringType()。所以最好的方式是指定返回类型,也就是:
udf函数传入多个参数
返回多列
分区partition
spark partition 理解 / coalesce 与 repartition的区别
参考资料
https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/
Last updated
Was this helpful?