from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as func
import os
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6.5"
spark = SparkSession.builder.appName("poiranking_userfeatures_v3").enableHiveSupport().getOrCreate()
# 测试spark能不能跑通,如果这里不报错有输出就可以
spark.sparkContext.parallelize(range(0, 10)).count()
df = spark.table("table_name")
# 看看有多少个缺失值
df.where("price_pref is null").count()
# 取出5行数据观察下
df.where("price_pref is null").take(5)
# 可以查看df的列的数据类型
df.schema
# 缺失值填充的函数,其实这里把where条件去掉也可以,func.mean计算均值的时候不会把空值计算进去。
def fill_na_by_mean(df, fill_col):
mean = df.where(col(fill_col).isNotNull()).select(func.mean(fill_col)).collect()[0][0]
# 第一种方式,传入字典填充
df1 = df.fillna({fill_col: mean})
# 第二种方式,指定subset
df2 = df.fillna(mean, subset=[fill_col])
return df1
# 填充缺失值
fill_col = 'price_pref'
df = fill_na_by_mean(df, fill_col)
# 看看还有没有缺失值
df.where("price_pref is null").count()
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
pdf = pd.DataFrame({'x':range(3), 'y':[1,2,5], 'z':[100,200,1000]})
df = spark.createDataFrame(pdf)
columns_to_scale = ["x", "y", "z"]
# 归一化
columns_to_scale = [i for i in feature_df.columns if i not in ['officialpoiid', 'poiname', 'businessid', 'districtid', 'isopen', 'isfree',
'isfee','isHasCoverImage','addressAndCoorScore']]
#columns_to_scale = ['lncommenttotalscore']
unlist = udf(lambda x: float(list(x)[0]), DoubleType())
df = feature_df
df = df.fillna(0, columns_to_scale)
for i in columns_to_scale:
# VectorAssembler Transformation - Converting column to vector type
assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
# MinMaxScaler Transformation
scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_norm")
# Pipeline of VectorAssembler and MinMaxScaler
pipeline = Pipeline(stages=[assembler, scaler])
# Fitting pipeline on dataframe
df = pipeline.fit(df).transform(df).withColumn(i+"_norm", unlist(i+"_norm")).drop(i+"_Vect")
df.show()
for i in columns_to_scale:
maxval = df.select(F.max(i)).collect()[0][0]
minval = df.select(F.min(i)).collect()[0][0]
df = df.withColumn(i[2:]+"_norm", (df[i]-minval)/(maxval-minval))
tmpdf = tmpdf.withColumn("poiscore", sum(tmpdf[col] for col in tmpcols))
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
columns = ["Seqno","Name"]
data = [("1", "john jones"),
("2", "tracey smith"),
("3", "amy sanders")]
df = spark.createDataFrame(data=data,schema=columns)
df.show(truncate=False)
def convertCase(str):
resStr=""
arr = str.split(" ")
for x in arr:
resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
return resStr
""" Converting function to UDF """
convertUDF = udf(lambda z: convertCase(z),StringType())
""" Converting function to UDF
StringType() is by default hence not required """
convertUDF = udf(lambda z: convertCase(z))
+-----+------------+-------------+
|Seqno|Name |Cureated Name|
+-----+------------+-------------+
|1 |john jones |JOHN JONES |
|2 |tracey smith|TRACEY SMITH |
|3 |amy sanders |AMY SANDERS |
+-----+------------+-------------+
""" Using UDF on SQL """
spark.udf.register("convertUDF", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE") \
.show(truncate=False)