Pyspark

Pyspark在zeus上的使用

1.基本用法:

today='${today}'
spark-submit --conf spark.yarn.tags=router,spark2.2.0,streaming --conf spark.driver.maxResultSize=4g --executor-memory 18G \
test.py $today;

其中today是传入的参数,在test.py里用sys.argv[1]取

在开头加一行set -e,判断 spark-submit执行是否成功

2.如果使用Python3,加上参数

--conf spark.pyspark.python=/usr/bin/python3.6 \

--conf spark.pyspark.driver.python=/usr/bin/python3.6 \

3.报内存不够的错误时,可以尝试:

1)添加配置--conf spark.sql.adaptive.enabled=false --conf spark.yarn.executor.memoryOverhead=12g

2)代码拆分成多个部分,分开跑

3)添加配置driver-memory 2560m

4.zeus上没有的Python包,可以本地将其打包成.zip文件,上传到zeus,在代码里添加:

(以pypinyin这个包为例)

1  from pyspark.conf import SparkConf
2  conf = SparkConf()
3  conf.setAppName("Spark Stream")
4  sc = SparkContext(conf=conf)
5  sc.addPyFile("pypinyin.zip")
6  from pypinyin import Style, pinyin, lazy_pinyin, load_phrases_dict

即可使用import的方法

dataframe转rdd方式,再调用python函数

inputdata = spark.sql(sql).repartition(2001) //sql是从hive表中读取数据的sql语句,注意此处一定要进行repartition,否则默认读入的分区可能很小从而发挥不了分布式计算的优势

ret = inputdata.rdd.map(lambda x: (x["coid"], main_api(x.asDict(), 1, "content", 0, 0)["score"]))

pyspark写入hive表问题

Pyspark写入问题:源于我在overwrite table某个分区的时候,居然把之前的分区都删掉了,只剩下了最新的分区。问了相关的维护人员,这个数据无法恢复,因为是datasource方式写入的,只有hive table方式的数据删除后是进入trash,可以恢复。2021/03/30-2021/03/31

df.write.format(format).partitionBy('d').mode(option).saveAsTable(outputTable)

  1. format='hive'

    1. table是hive上建的正式表,不管option是overwrite还是append都无法写入,Can't create table under official database

    2. table是hive上建的临时表,不管option是overwrite还是append都可以写入,用overwrite的话也会直接重建一张表

    3. table是未建的正式表,不管option是overwrite还是append都无法写入,Can't create table under official database

    4. table是未建的临时表,不管option是overwrite还是append都可以写入

  2. format='orc'

    1. table是hive上建的正式表,append和orc会报类型不匹配的错误,overwrite和orc会直接重新写入一张表,原来的表就没了。

    2. table是未建的正式表,overwrite多次写入的话只会保留最新写入的分区。append多次写入的话会保留每个分区。第一次append后续overwrite也只会保留最新写入的分区。

总结:注意几个事情

  1. 最好是pyspark写入临时表,再用hive将临时表的数据写入正式表。

  2. 在hive上建的正式表,通过pyspark是无法写入的(应该是公司有一些限制),如果是临时表就可以写入。

  3. pyspark写入的方式分为orc和hive,两者都可以用,但是新建正式表不能用hive的方式(权限限制),临时表可以。

  4. pyspark里直接写入正式表的话,如果不小心删除了就无法恢复的(目前为止是这样)。这样的表如果要操作分区,一定要指定分区的具体值。

对于一张pyspark新建写入的分区表,在hive里执行insert overwrite table partition(d) select * from table,会直接只保留新写入的分区。(如果table之前是以hive方式写入的则不会覆盖)

insert overwrite table tablename partition(d='2021-03-29')

select 字段名称(不包括分区字段) from tablename where d='2021-03-29';

使用上述语句写入则不会覆盖。

orc方式写入似乎比hive方式速度更快

Pyspark的缺失值填充

均值填充问题 Apr 1, 2021

源于发现用户特征中有很多NULL值,是无意义的,需要处理。但是之前的代码中其实已经有填充缺失值了,说明这个填充没有生效。因此找原因。

这里说一下pyspark的DataFrame.fillna这个方法,下面是它的官方文档介绍

df.fillna() 也可以写作 df.na.fill()

大致来说有三种用法:

  • df.fillna(value),value可以是int, float, string, bool类型,这样的话df的所有列的缺失值都会被填充为value值。

  • df.fillna({colname1:value1, colname2:value2}),以字典的形式传入列名和相应的需要填充的值

  • df.fillna(value, subset=[colname1,colname2]), 传入value以及需要填充的列的list。注意这种方式要求subset中的列的数据类型和value的数据类型相同,数据类型不相同的列不予填充(不会报错,而是直接忽略)。

之前的填充代码之所以未生效就是因为使用了第三种方法,而正好数据类型确实不匹配(value是float类型,col是string类型),我改成第二种方式之后就生效了。(还记得吗?查看df的列的数据类型的方法是df.schema)

pyspark归一化

自己实现

参考:

https://stackoverflow.com/questions/60281354/apply-minmaxscaler-on-multiple-columns-in-pyspark

https://stackoverflow.com/questions/40337744/scalenormalise-a-column-in-spark-dataframe-pyspark

新建列(sum)

How to add column sum as new column in PySpark dataframe ?

UDF(User Defined Function)

入门教程

为什么我们需要用UDF呢?原因主要在于UDF可以重用。就像Python里我们定义函数一样,可以反复使用它。为了演示UDF的用法,下面我们举例说明。

  1. 创建一个DataFrame

输出

  1. 创建一个Python函数

  1. 转成UDF

  1. 在DataFrame中使用

可以在select()方法中使用:

输出

也可以在withColumn()方法中使用(这里用了个新的UDF):

来看看效果

还可以注册这个函数,将它用到SQL里

其实可以直接在定义函数的时候将其转成UDF

默认返回类型Apr 1, 2021

为什么呢?很简单,udf需要设置一个返回类型,但是这里没设置,默认返回的是StringType()。所以最好的方式是指定返回类型,也就是:

udf函数传入多个参数

返回多列

分区partition

编写Spark程序的几个优化点

spark partition 理解 / coalesce 与 repartition的区别

参考资料

https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/

Last updated

Was this helpful?