PySpark DataFrame Transformations — Interview Workbook
Generated: 2025-08-15 16:49:45
This notebook is a hands-on interview prep guide covering the most common DataFrame transformations in PySpark, with:
- Clean examples you can run
- Expected outputs (described)
- Multiple interview questions with answers for each topic
- Extra deep-dive questions used in real interviews
You can run this notebook locally (with PySpark installed) or upload to Databricks/EMR/Glue.
0) Setup & Sample Data
We create a Spark session and a small dataset used throughout the notebook.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, sum as _sum, count, countDistinct, desc, asc, lit,
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, coalesce
from pyspark.sql.window import Window
spark = SparkSession.builder.appName('PySparkTransformationsInterview').getOrCreate()
data = [
(1, 'John', 28, 'HR', 4000, '2025-01', 'New York'),
(2, 'Alice',34, 'Finance', 5000, '2025-01', 'Chicago'),
(3, 'Bob', 23, 'IT', 4500, '2025-02', 'New York'),
(4, 'Eve', 29, 'Finance', 5500, '2025-02', 'Chicago'),
(5, 'Tom', 31, 'IT', 3000, '2025-03', 'New York'),
(6, 'Sam', 26, 'HR', None, '2025-03', 'Chicago'),
]
df = spark.createDataFrame(data, ['id','name','age','department','salary','month','city'])
df.show()
1) Filtering
Definition: Select rows from a DataFrame based on a condition using .filter()
or .where()
.
Key tips:
- Chain multiple conditions with
&
(AND),|
(OR),~
(NOT) - Handle nulls with
.isNull()
/.isNotNull()
Example: Employees older than 30 and non-null salary
df.filter((col('age') > 30) & col('salary').isNotNull()).show()
Example: Finance department OR salary > 5000
df.where((col('department') == 'Finance') | (col('salary') > 5000)).show()
Example: City contains ‘New’
df.filter(col(‘city’).contains(‘New’)).show()
Interview Questions (with answers):
- Filter Finance employees earning strictly more than 5000, ignoring null salaries.
df.filter((col('department')=='Finance') & col('salary').isNotNull() & (col('salary')>5000))
- Difference between
.filter()
and.where()
? — They are aliases (functionally identical) in PySpark. - Filter rows where
name
starts with ‘A’ or ‘E’.
df.filter(col('name').rlike('^(A|E)'))
- Exclude rows with null in any column.
df.dropna(how='any')
2) Selecting Columns
Definition: Extract specific columns using .select()
; add expressions with aliases.
Select fixed columns
df.select(‘name’,’salary’).show()
Select with expression/alias
df.select(col('name'), (col('salary')*1.1).alias('salary_plus_10pct')).show()
Programmatically select numeric columns
numeric_cols = [f.name for f in df.schema.fields if f.dataType.typeName() in ('long','integer','double')]
df.select(*numeric_cols).show()
Interview Questions (with answers):
- Select all columns except
city
andmonth
.
keep = [c for c in df.columns if c not in {'city','month'}]
df.select(*keep)
- What happens if you select a non-existent column? —
AnalysisException
. - Select columns whose names contain ‘id’ or end with ‘ary’.
import re
cols = [c for c in df.columns if (re.search('id', c)) or c.endswith('ary')]
df.select(*cols)
3) Adding Columns
Definition: Create or modify columns using .withColumn()
.
Add computed columns
df_added = df.withColumn('bonus', col('salary') * 0.1) \
.withColumn('is_senior', col('age') >= 30)
df_added.show()
Conditional logic
df_rules = df.withColumn(
'adjusted_salary',
when(col('department')=='IT', col('salary')1.05) .when(col('department')=='Finance', col('salary')1.02)
.otherwise(col('salary'))
)
df_rules.show()
Interview Questions (with answers):
- Add
total_comp = salary + bonus
, wherebonus
is 10% of salary; treat null salary as 0.
df.withColumn('bonus', coalesce(col('salary'), lit(0))*0.1) \
.withColumn('total_comp', coalesce(col('salary'), lit(0)) + col('bonus'))
- Difference between
.withColumn()
and using.select()
to add columns? —.withColumn()
modifies/extends the DF without relisting all columns;.select()
can add but requires listing everything you keep. - Set a default city
'Unknown'
when city is null.
df.withColumn('city', coalesce(col('city'), lit('Unknown')))
4) Dropping Columns
Definition: Remove unnecessary columns using .drop()
.
Drop one or multiple columns
df.drop('month').show()
df.drop('month','city').show()
Keep only a whitelist
keep_cols = ['id','name','salary']
df.select(*keep_cols).show()
Interview Questions (with answers):
- Drop all columns that are entirely null.
non_all_null = [c for c in df.columns if df.where(col(c).isNotNull()).limit(1).count() > 0]
df.select(*non_all_null)
- Drop columns by a prefix, e.g.,
tmp_
.
drop_cols = [c for c in df.columns if c.startswith('tmp_')]
df.drop(*drop_cols)
5) Grouping & Aggregating
Definition: Group rows and compute aggregates like sum/avg/count.
Average salary by department
df.groupBy(‘department’).agg(avg(‘salary’).alias(‘avg_salary’)).show()
Multiple aggregates
df.groupBy('department').agg(
_sum('salary').alias('total_salary'),
avg('salary').alias('avg_salary'),
count('*').alias('rows')
).show()
Distinct counts
df.groupBy(‘city’).agg(countDistinct(‘department’).alias(‘distinct_departments’)).show()
Interview Questions (with answers):
- Top 3 departments by total salary.
df.groupBy('department').agg(_sum('salary').alias('total_salary')) \
.orderBy(desc('total_salary')).limit(3)
- Include departments with no rows in a left join with a static list. — Create a DF of all departments and left join with aggregates; fill nulls with 0.
depts = spark.createDataFrame([(d,) for d in ['HR','Finance','IT','Marketing']], ['department'])
agg = df.groupBy('department').agg(_sum('salary').alias('total_salary'))
depts.join(agg, 'department', 'left').fillna({'total_salary':0})
- Calculate percent contribution of each department to the overall salary.
total = df.agg(_sum('salary').alias('t')).collect()[0]['t']
df.groupBy('department').agg((_sum('salary')/lit(total)).alias('pct_of_total'))
6) Sorting
Definition: Arrange rows in specified order using .orderBy()
/ .sort()
.
Sort by salary descending, then name ascending
df.orderBy(desc('salary'), asc('name')).show()
Null handling: put nulls last
df.orderBy(col('salary').asc_nulls_last()).show()
Interview Questions (with answers):
- Stable sort by multiple keys (department asc, salary desc).
df.orderBy(asc('department'), desc('salary'))
- How are nulls ordered by default? — For ascending: nulls come first; for descending: nulls last. Use
asc_nulls_last()
/desc_nulls_first()
to control.
7) Joining
Definition: Combine two DataFrames on a key using inner
, left
, right
, full
, left_semi
, left_anti
.
emp_types = spark.createDataFrame([(1,'Full-time'),(2,'Part-time'),(7,'Contract')], ['id','employment_type'])
Inner join (only matching ids)
df.join(emp_types, 'id', 'inner').show()
Left join (all from left)
df.join(emp_types, 'id', 'left').show()
Left anti (in df but not in emp_types)
df.join(emp_types, 'id', 'left_anti').show()
Left semi (only keys that exist in right, but keep only left columns)
df.join(emp_types, 'id', 'left_semi').show()
Interview Questions (with answers):
- Find customers who have never placed an order. —
left_anti
join of customers against orders oncustomer_id
. - Difference between
left_semi
andleft_anti
? —left_semi
keeps rows whose key exists in right;left_anti
keeps rows whose key does not exist in right. - Handle duplicate keys on either side? — Consider deduping keys before join, or understand that joins will multiply rows (Cartesian effect on duplicate keys).
8) Union
Definition: Concatenate two DataFrames vertically (same schema). For differing schemas, use unionByName
with allowMissingColumns=True
.
df_jan = df.filter(col('month')=='2025-01')
df_feb = df.filter(col('month')=='2025-02')
Simple union (schemas/order must match)
union_df = df_jan.select(df.columns).union(df_feb.select(df.columns))
union_df.show()
Union by name with missing columns
df_extra = spark.createDataFrame([(10, 'Zed')], ['id','name'])
union_flex = df.select('id','name').unionByName(df_extra, allowMissingColumns=True)
union_flex.show()
Interview Questions (with answers):
- Union two monthly files where February has an extra column
region
. — UseunionByName(allowMissingColumns=True)
after aligning common columns. - How to ensure column order doesn’t break the union? — Always select columns in a consistent order before union, or use
unionByName
.
9) Pivoting (and Unpivot/Melt)
Definition:
- Pivot: long → wide
- Unpivot (Melt): wide → long (Spark doesn’t have
melt
built-in; emulate withstack
or use Pandas API on Spark)
Pivot: total salary by department x month
pivoted = df.groupBy('department').pivot('month').agg(_sum('salary'))
pivoted.show()
Fill nulls after pivot
pivoted.fillna(0).show()
Unpivot example using stack (assumes known month values)
months = ['2025-01','2025-02','2025-03']
expr = "stack({0}, {1}) as (month, total_salary)".format(
len(months),
', '.join([f"'{m}', {m}
" for m in months])
)
unpivoted = pivoted.select('department', expr)
unpivoted.show()
Interview Questions (with answers):
- Pivot monthly sales, then fill missing months with zeros. — Use
.pivot('month').agg(sum(...)).fillna(0)
. - Unpivot unknown set of metric columns. — Build a
stack
expression dynamically or switch to Pandas API on Spark formelt()
. - Performance tip: Provide an explicit list of pivot values to avoid an expensive distinct scan.
10) Window Functions
Definition: Perform calculations across sets of rows related to the current row, e.g., ranking, running totals, moving averages.
Rank employees by salary within each department
w_rank = Window.partitionBy('department').orderBy(desc('salary'))
df.withColumn('row_number', row_number().over(w_rank)) \
.withColumn('dense_rank', dense_rank().over(w_rank)) \
.withColumn('rank', rank().over(w_rank)) \
.show()
2-row rolling average salary ordered by month within department
w_roll = Window.partitionBy('department').orderBy('month').rowsBetween(-2, 0)
df.withColumn('rolling_avg_salary', avg('salary').over(w_roll)).show()
Lag/Lead examples
w_time = Window.partitionBy('department').orderBy('month')
df.withColumn('prev_salary', lag('salary', 1).over(w_time)) \
.withColumn('next_salary', lead('salary', 1).over(w_time)) \
.show()
Interview Questions (with answers):
- Assign a rank to employees within each department by salary; ties get the same rank. — Use
dense_rank()
over a partition by department order by salary desc. - Compute a 3-month rolling average per department. — Window with
rowsBetween(-2, 0)
andavg('salary')
ordered bymonth
. - Find employees whose salary increased compared to previous month. — Use
lag('salary')
and compare to current.
w = Window.partitionBy('id').orderBy('month')
df.withColumn('prev', lag('salary').over(w)) \
.filter(col('salary') > col('prev'))
Bonus: End-to-End Exercise (with solution)
Problem:
You receive employee records monthly. Build a pipeline that:
1) Filters out rows with null salary
,
2) Adds bonus = 0.1 * salary
and total_comp = salary + bonus
,
3) Aggregates average total_comp
by department
and month
,
4) Pivots months as columns, filling nulls with 0,
5) Sorts departments by total of pivoted values descending.
Solution
clean = df.filter(col('salary').isNotNull())
enriched = clean.withColumn('bonus', col('salary')*0.1) \
.withColumn('total_comp', col('salary') + col('bonus'))
agg = enriched.groupBy('department','month').agg(avg('total_comp').alias('avg_total_comp'))
pivoted = agg.groupBy('department').pivot('month').agg(_sum('avg_total_comp')).fillna(0)
result = pivoted.withColumn('row_total', sum([col(c) for c in pivoted.columns if c != 'department'])) \
.orderBy(desc('row_total'))
result.show()
Extra Deep-Dive Questions (Short Answers)
left_semi
vsleft_anti
? — Semi keeps rows with matching keys in right; anti keeps rows without matches.- Why prefer
unionByName
? — Safer across schema drift; avoids column order bugs. - Pivot performance tips? — Provide explicit pivot values; reduce data beforehand; avoid high-cardinality pivots.
- Window vs GroupBy? — Window keeps row-level granularity; groupBy collapses rows per group.
- Null-safe equality in joins? — Use SQL
<=>
or DataFrame expressionexpr('a <=> b')
for null-safe comparisons. - Skew handling in joins? — Salting, broadcast small tables (
broadcast(df)
), repartitioning on key. - Deterministic ordering caution: — Without an
orderBy
, output partition file order is not guaranteed.