Pandas to Pyspark

Pandas to Pyspark

April 1, 2022

Gone are the days, when you used to use pandas for every single project. Today is the day of big data. And simple pandas can’t do processing at that scale. And here comes into picture pyspark. Following are some of most commonly used pandas and their pyspark equivalent versions.

Create/Read/Write files as Dataframes

# Create Dataframe from list of tuples
data = [(1,"a"), (2,"b")]
columns = ["id", "key"]
df = pd.DataFrame(data=data, columns = columns)   # In pandas
sdf = spark.createDataFrame(data, columns)        # In spark
# Create Dataframe from list of dictionaries
data = [{"id":1,"key":"a"}, {"id":2,"key":"b"}]
df = pd.DataFrame(data=data)                      # In pandas
sdf = spark.createDataFrame(data)                 # In spark
# Convert spark dataframe into pandas dataframe
df = sdf.toPandas()
# Convert pandas dataframe into spark datafrrame
sdf = spark.createDataFrame(df)
# Reading a csv file
df =  pd.read_csv(file_path       # In pandas
sdf = spark.read.csv(file_path)   # In spark
# Writing a csv file
df.to_csv(file_path)              # In pandas
sdf.write.csv(file_path)          # In spark

Inspect Data

# Display the content of df (n rows)
sdf.limit(n).show()                           # In pandas
df.head(n)                                    # In spark
# Return df column names and data types
df.dtypes                                     # In spark and pandas
# Return the columns of df
df.columns                                    # In spark and pandas
# Count the number of rows in df 
len(df)                                       # In pandas
sdf.count()                                   # In spark
# Compute summary statistics
df.describe()                                 # In pandas
df.describe().show()                          # In spark

Filter/Queries

# Filter on one colum
df = df[df.col > value]                       # In pandas
sdf = sdf.filter(sdf.col > value)             # In spark
sdf = sdf[sdf.col.isin(["val1","val2"])]      # In spark and pandas
# Filter on multiple columns
df = df[(df.col1==val1) & (df.col2==val2)]              # In pandas
sdf = sdf.filter((sdf.col1==val1) & (sdf.col2==val2))   # In spark
NOTE : use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
# select query
df = df[['col1', 'col2']]                                # In pandas
sdf = sdf.select(col1, col2)                             # In spark
from pyspark.sql import functions as F
sdf =sdf.select("col", F.when(sdf.col > val, val_if_true)
                        .otherwise(val_if_false))        # In spark

Drop or Fill Nulls and Duplicate Values

# Drop Duplicate Values
df = df.drop_duplicates(subset = ["col_1"])  # In pandas
sdf = sdf.dropDuplicates(subset = ["col_1"]) # In spark
# Remove Nulls
df = df.dropna()                             # In spark and pandas
# Fill nan values
df = df.fillna(value)                        # In spark and pandas
df = df.fillna({col_1:value_1,col_2:value_2} # In pandas
sdf = sdf.fillna(value, subset = ["col_1"])  # In spark

Pivot/Sort/Join Dataframe

# Sort Dataframe
df = df.sort_values(by=['col1','col2'],ascending=False) # In pandas
sdf = sdf.orderBy(["col_1","col_2"],ascending=[0,1])    # In spark
sdf = df.sort("col", ascending=False)
# Pivot Dataframe
df = df.pivot_table(index='col1', columns='col2', 
                      values='col3', aggfunc=sum)      # In pandas
sdf = sdf.groupBy("col1").pivot("col2").sum("col2")    # In spark
# Join Dataframes
df = df1.join(df2, on = 'column_name',
                    how = 'join_type')         # In spark and pandas

User Defined Funtions (UDF)

# UDF with single column
df['squared_col'] = df['col'].apply(lambda x:x*x)     # In pandas
from pyspark.sql import functions as F
squared = F.udf(lambda x: x*x, IntegerType())
sdf = sdf.withColumn("squared_col", squared('col'))   # In spark
# UDF with multiple column
df['col1*col2'] = df.apply(lambda x : x[col1]*x[col2], 
                                              axis=1) # In pandas
def multiply(x):
    x['col1*col2'] = x[col1]*x[col2] 
    return x
df = df.apply(multiply, axis=1)                       # In pandas
from pyspark.sql import functions as F
squared = F.udf(lambda x,y: x*y, IntegerType())
sdf = sdf.withColumn("col1*col2", 
                           squared('col1', 'col2'))   # In spark

Group By

## Pandas
# Aggregation with single column
squared = lambda x : x*x
df = df.groupby("col").agg({"col1": squared, "col2":'count'})
# Aggregation with multiple column column
def avg_sum(x):
    x["avg_sum"] = np.mean(x.col1 + x.col2)
    return x
df = df.groupby("col").apply(avg_sum)
## Spark
# Aggregation with single column (inbuilt functions)
sdf = sdf.groupBy("col")
          .agg(F.count("col1").alias("col1_count"),
                F.count("col2").alias("col2_count"))
# Aggregation with multiple columns (user defined functions)
import numpy as np
@F.pandas_udf(DoubleType(),
              functionType=F.PandasUDFType.GROUPED_AGG)
def f(x, y):
    return np.mean(x+y)
  
sdf = sdf.groupBy("col").agg(f("col1", "col2").alias("avg_sum"))

Window Functions

# Window with in built spark functions
from pyspark.sql.window import Window
win =  Window().partitionBy('col1')
               .orderBy(F.col('col2').desc())
sdf = sdf1.withColumn('row_num', F.row_number().over(win))
# Window with in user defined functions in spark
@F.pandas_udf(ArrayType(IntegerType()),
               F.PandasUDFType.GROUPED_AGG)
def get_running_mean(df:pd.Series):
    return np.mean(df)
w = Window.partitionBy('col1').orderBy('col2')
sdf = sdf.withColumn('running_mean', 
                     get_running_mean('col').over(w))
# Window with in built pandas functions
df['col_rolling_sum'] = df['col'].rolling(window=3,
                                   min_periods=1).sum()
# Window with in user defined functions in pandas
df['col2'] = df['col1'].rolling(window=3, min_periods=1)
                       .apply(lambda x : np.mean(x))

Add/Remove/Rename/Typecast Columns

# Add Columns
df["column_name"] = "Any literal" or dataframe column    # In pandas
from pyspark.sql import functions as F                   # In spark
sdf=sdf.withColumn('column_name',F.lit("any literal"))\
       .withColumn("column_name_2', sdf.column_name)
# Remove Columns
df = df.drop(columns=["column_1", "column_2")            # In pandas
sdf = sdf.drop("column_1", "column_2")                   # In spark
# Rename one Column
df.rename(columns={'old_col':'new_col'},inplace=True)   # In pandas
sdf = sdf.withColumnRenamed('old_col', 'new_col')       # In spark
# Rename all Columns
df.columns = ["col_1", "col_2", ... "col_n"]            # In pandas
sdf = sdf.toDF("col_1", "col_2", ... "col_n")           # In spark
# Typecast one column
df.astype({'col1': column_type})                        # In pandas
from pyspark.sql.types import *                         # In spark
sdf = sdf.withColumn("dd", sdf.dd.cast(column_type))
# Typecast multiple  columns
df.astype(column_type)                                 # In pandas
sdf = sdf.select([sdf.column.cast(column_type) for 
                             column in sdf.columns])   # In spark

Miscellaneous

# Concat two dataframes columnwise in spark
from pyspark.sql import functions as F
sdf1 = sdf1.withColumn("id", F.monotonically_increasing_id())
sdf2 = sdf2.withColumn("id", F.monotonically_increasing_id())
sdf3 = sdf2.join(sdf1, "id", "outer").drop("id")
# Concat two dataframes rowwise in spark
sdf3 = sdf1.union(sdf2)
# Concat two dataframes columnwise in pandas
df3 = pd.concat([df1, df2], axis=1)
# Concat two dataframes rowwise in pandas
df3 = pd.concat([df1, df2], axis=0)
# column to list in spark (two ways)
col = sdf.select("col").rdd.map(lambda x: x[0]).collect()
col = sdf.select("col").rdd.flatMap(lambda x: x).collect()
# column to list in pandas
col = df.col.tolist()

References