Gone are the days, when you used to use pandas for every single project. Today is the day of big data. And simple
pandas can’t do processing at that scale. And here comes into picture pyspark. Following are some of most commonly
used pandas and their pyspark equivalent versions.
# Display the content of df (n rows)sdf.limit(n).show() # In pandasdf.head(n) # In spark# Return df column names and data typesdf.dtypes # In spark and pandas# Return the columns of dfdf.columns # In spark and pandas# Count the number of rows in df len(df) # In pandassdf.count() # In spark# Compute summary statisticsdf.describe() # In pandasdf.describe().show() # In spark
# Filter on one columdf = df[df.col > value] # In pandassdf = sdf.filter(sdf.col > value) # In sparksdf = sdf[sdf.col.isin(["val1","val2"])] # In spark and pandas# Filter on multiple columnsdf = df[(df.col1==val1) & (df.col2==val2)] # In pandassdf = sdf.filter((sdf.col1==val1) & (sdf.col2==val2)) # In sparkNOTE : use '&'for'and', '|'for'or', '~'for'not' when building DataFrame boolean expressions.# select querydf = df[['col1', 'col2']] # In pandassdf = sdf.select(col1, col2) # In sparkfrompyspark.sqlimport functions as F
sdf =sdf.select("col", F.when(sdf.col > val, val_if_true)
.otherwise(val_if_false)) # In spark
# Drop Duplicate Valuesdf = df.drop_duplicates(subset = ["col_1"]) # In pandassdf = sdf.dropDuplicates(subset = ["col_1"]) # In spark# Remove Nullsdf = df.dropna() # In spark and pandas# Fill nan valuesdf = df.fillna(value) # In spark and pandasdf = df.fillna({col_1:value_1,col_2:value_2} # In pandassdf = sdf.fillna(value, subset = ["col_1"]) # In spark
# UDF with single columndf['squared_col'] = df['col'].apply(lambda x:x*x) # In pandasfrompyspark.sqlimport functions as F
squared = F.udf(lambda x: x*x, IntegerType())
sdf = sdf.withColumn("squared_col", squared('col')) # In spark# UDF with multiple columndf['col1*col2'] = df.apply(lambda x : x[col1]*x[col2],
axis=1) # In pandasdefmultiply(x):
x['col1*col2'] = x[col1]*x[col2]
return x
df = df.apply(multiply, axis=1) # In pandasfrompyspark.sqlimport functions as F
squared = F.udf(lambda x,y: x*y, IntegerType())
sdf = sdf.withColumn("col1*col2",
squared('col1', 'col2')) # In spark
# Window with in built spark functionsfrompyspark.sql.windowimport Window
win = Window().partitionBy('col1')
.orderBy(F.col('col2').desc())
sdf = sdf1.withColumn('row_num', F.row_number().over(win))
# Window with in user defined functions in spark@F.pandas_udf(ArrayType(IntegerType()),
F.PandasUDFType.GROUPED_AGG)
defget_running_mean(df:pd.Series):
return np.mean(df)
w = Window.partitionBy('col1').orderBy('col2')
sdf = sdf.withColumn('running_mean',
get_running_mean('col').over(w))
# Window with in built pandas functionsdf['col_rolling_sum'] = df['col'].rolling(window=3,
min_periods=1).sum()
# Window with in user defined functions in pandasdf['col2'] = df['col1'].rolling(window=3, min_periods=1)
.apply(lambda x : np.mean(x))
# Add Columnsdf["column_name"] ="Any literal"or dataframe column # In pandasfrompyspark.sqlimport functions as F # In sparksdf=sdf.withColumn('column_name',F.lit("any literal"))\
.withColumn("column_name_2', sdf.column_name)# Remove Columnsdf = df.drop(columns=["column_1", "column_2") # In pandassdf = sdf.drop("column_1", "column_2") # In spark# Rename one Columndf.rename(columns={'old_col':'new_col'},inplace=True) # In pandassdf = sdf.withColumnRenamed('old_col', 'new_col') # In spark# Rename all Columnsdf.columns = ["col_1", "col_2", ..."col_n"] # In pandassdf = sdf.toDF("col_1", "col_2", ..."col_n") # In spark# Typecast one columndf.astype({'col1': column_type}) # In pandasfrompyspark.sql.typesimport*# In sparksdf = sdf.withColumn("dd", sdf.dd.cast(column_type))
# Typecast multiple columnsdf.astype(column_type) # In pandassdf = sdf.select([sdf.column.cast(column_type) for column in sdf.columns]) # In spark
# Concat two dataframes columnwise in sparkfrompyspark.sqlimport functions as F
sdf1 = sdf1.withColumn("id", F.monotonically_increasing_id())
sdf2 = sdf2.withColumn("id", F.monotonically_increasing_id())
sdf3 = sdf2.join(sdf1, "id", "outer").drop("id")
# Concat two dataframes rowwise in sparksdf3 = sdf1.union(sdf2)
# Concat two dataframes columnwise in pandasdf3 = pd.concat([df1, df2], axis=1)
# Concat two dataframes rowwise in pandasdf3 = pd.concat([df1, df2], axis=0)
# column to list in spark (two ways)col = sdf.select("col").rdd.map(lambda x: x[0]).collect()
col = sdf.select("col").rdd.flatMap(lambda x: x).collect()
# column to list in pandascol = df.col.tolist()