import pandas as pd
import numpy as np
from pydataset import data
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
from datetime import datetime
from dateutil.relativedelta import relativedelta
current_time = datetime.now()
last_year = datetime.now() - relativedelta(years=1)
print('Today:', current_time.strftime("%Y-%m-%d"))
print('LY:', last_year.strftime("%Y-%m-%d"))
This notebook provides some basic code snippets to perform common DataFrame manipulations, transformations, and actions using PySpark.
For pandas users, I've incluced some code snippets to perform some of the most common Initial EDA
you are probabably used to performing upon reading in data.
# Create spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
# Import mpg dataset into a pandas DataFrame
pdf = data('mpg')
# Convert pandas DF into a Spark DataFrame
sdf = spark.createDataFrame(pdf)
# Create PySpark DataFrame from a table using a SQL Query.
query = f"""
SELECT * FROM table_name
"""
df = spark.sql(query)
print(sdf.count(), len(sdf.columns))
# Just like pandas .dtypes
sdf.dtypes
# OR view columns and spark data types this way.
sdf.printSchema()
# Print DataFrame columns.
sdf.columns
# Display the first five rows.
sdf.show(5)
# In DataBricks
display(sdf.limit(5))
# View descriptive statistics for our Spark DF
sdf.describe().show()
# In DataBricks
display(sdf.describe())
# Make it more readable using your good friend pandas.
sdf.describe().toPandas().set_index('summary').T
(
sdf
.groupBy('model')
.count()
.show()
)
You can use .orderBy()
the same as .sort()
, but I'll demo it later as another option combined with the .when()
function to show how you can perform a manual sort.
(
sdf
.groupBy('model')
.count()
.sort('count', ascending=False)
.show()
)
# In DataBricks
display(
sdf
.groupBy('department')
.count()
.sort('count', ascending=False)
)
OR
display(
sdf
.groupBy('department)
.count()
.sort(desc('count'))
)
# I can change the data type of a PySpark column using .withColumn() and .cast() methods
df.withColumn( 'col_name', df['col_name'].cast(DoubleType()) ) # For a decimal number
df.withColumn( 'col_name', df['col_name'].cast(IntegerType()) ) # For an integer
df.withColumn( 'col_name', df['col_name'].cast(StringType()) ) # For a string
# Select multiple columns from the Spark DataFrame.
sdf.select(['year', 'manufacturer', 'model']).show(5)
I can even chain .alias()
onto my columns to rename them on the fly.
# Use the col and expr functions with the alias method to create a new DF.
sdf.select(
col('hwy').alias('highway_mileage'),
col('cty').alias('city_mileage'),
col('trans').alias('transimission'),
expr('(hwy + cty) / 2').alias('average_mileage')
).show(5)
df['col', 'col']
df[['col', 'col']]
, to create a subset when you pass a list to the indexing operators []
.sdf['manufacturer', 'model'].show(5)
# In DataBricks
display(sdf['manufacturer', 'model'].limit(5))
# Create a new column in a copy of our Spark DataFrame.
# Note: This does not change the original DataFrame.
(
sdf
.withColumn( 'make_&_model', concat(sdf['manufacturer'], lit(' '), sdf['model']) )
.show(5)
)
# I can create a column to flag all automatic transmissions if I like.
(
sdf
.withColumn( 'is_auto', when( col('trans').startswith('a'), 1 )
.otherwise(0) )
.show(5)
)
# I can use the lit() function to insert literal string values, too.
(
sdf
.withColumn( 'transmission', when( col('trans').startswith('a'), lit('automatic') )
.otherwise('manual') )
.show(5)
)
# In DataBricks
display(
sdf
.withColumn( 'transmission', when( col('trans').startswith('a'), lit('automatic') )
.otherwise('manual') )
.limit(5)
)
PySpark doesn't have one of my favorite pandas functions, .map()
, but I can still create a new column based on the values in an existing column; I'll just use the .when()
function.
(
sdf
.withColumn('fuel_efficiency', when( col('hwy') < 10, lit('terrible') )
.when( col('hwy') < 15, lit('bad') )
.when( col('hwy') < 20, lit('ok') )
.when( col('hwy') < 25, lit('good') )
.when( col('hwy') < 30, lit('really good') )
.otherwise('great')
)
.show()
)
# Rename a column in a copy of our Spark DF.
# Note: Again, this does not mutate the original DF.
sdf.withColumnRenamed('manufacturer', 'make').show(5)
# This does not mutate my original DataFrame; I would have to reassign if I want to do that.
sdf.drop('fl').show(5)
# I can drop as many columns as I want.
sdf.drop('fl', 'cyl').show(5)
.contains()
, .startswith()
, .endswith()
.df.where( (condition1) & (condition2) )
df.where(condition1).where(condition2)
df.where( (condition1) & ~(condition2) )
df.where( (condition1) | (condition2) )
df.where("col_name = value")
df.where("col_name <= value")
df.where( col('col_name') == value )
df.where( sdf['col_name'] == value )
df.where( sdf.col_name == value )
# Filter Spark DataFrame to be a subset of compact cars only. I could also use .where() here.
sdf.filter(sdf['class'] == 'compact').show(5)
# Filter with multiple conditions using .where(). .filter() will do the exact same thing.
sdf.where( (sdf['class'] == 'compact') & (sdf.year > 2000) ).show(5)
# In DataBricks
display(
sdf.where( (sdf['class'] == 'compact') & (sdf.year > 2000) )
.limit(5)
# I can even combine them and filter with multiple conditions using .filter & .where if I like.
sdf.filter(sdf['class'] == 'compact').where(sdf.year > 2000).show(5)
sdf.where( col('class').isin('compact', 'subcompact') ).show()
# Use the .like() operator like you would in SQL.
sdf.where( col('trans').like('%auto%') ).show(10)
I can use the .agg()
method to run a function, like .round()
, on the aggregate or even perform more than on aggregate on one or more columns at a time.
.alias()
method..agg()
with one or more columns and more or more types of aggregations.sdf
.groupBy('col_name')
.agg({'col1': 'agg_func', 'col2': 'agg_func'})
sdf
.select(stddev("Sales").alias('std'))
.select(format_number('std',2).alias('std_2digits'))
I can simply use .select()
with countDistinct()
if I want to know how many unique values I have in a column. I can even throw in a .alias()
to clean up my column name.
sdf.select(countDistinct('trans').alias('transmission_type_count')).show()
# I can use the distinct and count methods if I want to return the integer.
sdf.select('trans').distinct().count()
sdf.groupBy('class').count().show()
(
sdf
.groupBy('trans')
.agg( count('trans').alias('row_count') )
.show()
)
I may want to grab a metric to use somewhere else.
# Save average city mileage to a variable.
avg_city = sdf.select(avg('cty').alias('average_city'))
avg_city.show()
Maybe I want to clean up the formatting of my metric.
avg_city.select(format_number('average_city', 2).alias('average_city')).show()
sdf.groupBy('trans').agg({'cty': 'avg', 'hwy': 'avg'}).show()
I can clean up my values using .round()
, too. Yep, I'll chain on .alias()
for kicks. Squeaky clean!
(
sdf
.groupBy('trans')
.agg( round(avg('cty'), 2).alias('city_average')
,round(avg('hwy'), 2).alias('highway_average')
)
.show()
)
model_pivot = (
sdf
.withColumn( 'transmission_type', when( col('trans').startswith('a'), 'automatic' )
.otherwise('manual') )
.groupBy('transmission_type')
.pivot('class')
.agg( countDistinct('model').alias('unique_models') )
)
model_pivot.show()
This is one way I can handle unwanted Null values. In this context, it makes the most sense.
# I can replace my Null values here with 0 because it means there are no models with a certain transmission type.
model_pivot.fillna(0).show()
# Create a column that ranks the average mileage by class of vehicle. .rank() skips numbers after rows with a tie.
(
sdf
.groupBy('manufacturer', 'class')
.agg( avg('hwy').alias('average_highway_mileage') )
.withColumn( 'mileage_rank', rank().over(Window.partitionBy('class').orderBy(desc('average_highway_mileage'))) )
.show()
)
# Grab all of the top ranking vehicles for each class.
(
sdf
.groupBy('manufacturer', 'class')
.agg( avg('hwy').alias('average_highway_mileage') )
.withColumn( 'mileage_rank', rank().over(Window.partitionBy('class').orderBy(desc('average_highway_mileage'))) )
.where(col('mileage_rank') == 1)
.show()
)
# What happens when I filter for mileage_rank when there is a tie in a class?
(
sdf
.groupBy('manufacturer', 'class')
.agg( avg('hwy').alias('average_highway_mileage') )
.withColumn( 'mileage_rank', rank().over(Window.partitionBy('class').orderBy(desc('average_highway_mileage'))) )
.where(col('mileage_rank') == 3)
.show()
)
# .dense_rank() does not skip numbers after rows with a tie like .rank() does.
(
sdf
.groupBy('manufacturer', 'model', 'class')
.agg( avg('hwy').alias('average_highway_mileage') )
.withColumn( 'mileage_rank', dense_rank().over(Window.partitionBy('class').orderBy(desc('average_highway_mileage'))) )
.show()
)
# I'm creating a temporary view to query. You might have a database you are querying, and not need to complete this step first.
sdf.createOrReplaceTempView('sql_mpg')
# For a simple query, I can just pass the sql query directly into the spark.sql() function.
results = spark.sql('SELECT * FROM sql_mpg WHERE class = "compact"')
# That easily, I'm working with a PySpark DataFrame.
results.show(5)