PySpark DataFrame Transformations

PySpark DataFrame Transformations — Interview Workbook

Generated: 2025-08-15 16:49:45

This notebook is a hands-on interview prep guide covering the most common DataFrame transformations in PySpark, with:

  • Clean examples you can run
  • Expected outputs (described)
  • Multiple interview questions with answers for each topic
  • Extra deep-dive questions used in real interviews

You can run this notebook locally (with PySpark installed) or upload to Databricks/EMR/Glue.

0) Setup & Sample Data

We create a Spark session and a small dataset used throughout the notebook.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, sum as _sum, count, countDistinct, desc, asc, lit,
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, coalesce
from pyspark.sql.window import Window

spark = SparkSession.builder.appName('PySparkTransformationsInterview').getOrCreate()

data = [
(1, 'John', 28, 'HR', 4000, '2025-01', 'New York'),
(2, 'Alice',34, 'Finance', 5000, '2025-01', 'Chicago'),
(3, 'Bob', 23, 'IT', 4500, '2025-02', 'New York'),
(4, 'Eve', 29, 'Finance', 5500, '2025-02', 'Chicago'),
(5, 'Tom', 31, 'IT', 3000, '2025-03', 'New York'),
(6, 'Sam', 26, 'HR', None, '2025-03', 'Chicago'),
]

df = spark.createDataFrame(data, ['id','name','age','department','salary','month','city'])
df.show()

1) Filtering

Definition: Select rows from a DataFrame based on a condition using .filter() or .where().

Key tips:

  • Chain multiple conditions with & (AND), | (OR), ~ (NOT)
  • Handle nulls with .isNull() / .isNotNull()

Example: Employees older than 30 and non-null salary

df.filter((col('age') > 30) & col('salary').isNotNull()).show()

Example: Finance department OR salary > 5000

df.where((col('department') == 'Finance') | (col('salary') > 5000)).show()

Example: City contains ‘New’

df.filter(col(‘city’).contains(‘New’)).show()

Interview Questions (with answers):

  1. Filter Finance employees earning strictly more than 5000, ignoring null salaries.
   df.filter((col('department')=='Finance') & col('salary').isNotNull() & (col('salary')>5000))
  1. Difference between .filter() and .where()? — They are aliases (functionally identical) in PySpark.
  2. Filter rows where name starts with ‘A’ or ‘E’.
   df.filter(col('name').rlike('^(A|E)'))
  1. Exclude rows with null in any column.
   df.dropna(how='any')

2) Selecting Columns

Definition: Extract specific columns using .select(); add expressions with aliases.

Select fixed columns

df.select(‘name’,’salary’).show()

Select with expression/alias

df.select(col('name'), (col('salary')*1.1).alias('salary_plus_10pct')).show()

Programmatically select numeric columns

numeric_cols = [f.name for f in df.schema.fields if f.dataType.typeName() in ('long','integer','double')]
df.select(*numeric_cols).show()

Interview Questions (with answers):

  1. Select all columns except city and month.
   keep = [c for c in df.columns if c not in {'city','month'}]
   df.select(*keep)
  1. What happens if you select a non-existent column?AnalysisException.
  2. Select columns whose names contain ‘id’ or end with ‘ary’.
   import re
   cols = [c for c in df.columns if (re.search('id', c)) or c.endswith('ary')]
   df.select(*cols)

3) Adding Columns

Definition: Create or modify columns using .withColumn().

Add computed columns

df_added = df.withColumn('bonus', col('salary') * 0.1) \
.withColumn('is_senior', col('age') >= 30)
df_added.show()

Conditional logic

df_rules = df.withColumn(
'adjusted_salary',
when(col('department')=='IT', col('salary')1.05) .when(col('department')=='Finance', col('salary')1.02)
.otherwise(col('salary'))
)
df_rules.show()

Interview Questions (with answers):

  1. Add total_comp = salary + bonus, where bonus is 10% of salary; treat null salary as 0.
   df.withColumn('bonus', coalesce(col('salary'), lit(0))*0.1) \
     .withColumn('total_comp', coalesce(col('salary'), lit(0)) + col('bonus'))
  1. Difference between .withColumn() and using .select() to add columns?.withColumn() modifies/extends the DF without relisting all columns; .select() can add but requires listing everything you keep.
  2. Set a default city 'Unknown' when city is null.
   df.withColumn('city', coalesce(col('city'), lit('Unknown')))

4) Dropping Columns

Definition: Remove unnecessary columns using .drop().

Drop one or multiple columns

df.drop('month').show()
df.drop('month','city').show()

Keep only a whitelist

keep_cols = ['id','name','salary']
df.select(*keep_cols).show()

Interview Questions (with answers):

  1. Drop all columns that are entirely null.
   non_all_null = [c for c in df.columns if df.where(col(c).isNotNull()).limit(1).count() > 0]
   df.select(*non_all_null)
  1. Drop columns by a prefix, e.g., tmp_.
   drop_cols = [c for c in df.columns if c.startswith('tmp_')]
   df.drop(*drop_cols)

5) Grouping & Aggregating

Definition: Group rows and compute aggregates like sum/avg/count.

Average salary by department

df.groupBy(‘department’).agg(avg(‘salary’).alias(‘avg_salary’)).show()

Multiple aggregates

df.groupBy('department').agg(
_sum('salary').alias('total_salary'),
avg('salary').alias('avg_salary'),
count('*').alias('rows')
).show()

Distinct counts

df.groupBy(‘city’).agg(countDistinct(‘department’).alias(‘distinct_departments’)).show()

Interview Questions (with answers):

  1. Top 3 departments by total salary.
   df.groupBy('department').agg(_sum('salary').alias('total_salary')) \
     .orderBy(desc('total_salary')).limit(3)
  1. Include departments with no rows in a left join with a static list. — Create a DF of all departments and left join with aggregates; fill nulls with 0.
   depts = spark.createDataFrame([(d,) for d in ['HR','Finance','IT','Marketing']], ['department'])
   agg = df.groupBy('department').agg(_sum('salary').alias('total_salary'))
   depts.join(agg, 'department', 'left').fillna({'total_salary':0})
  1. Calculate percent contribution of each department to the overall salary.
   total = df.agg(_sum('salary').alias('t')).collect()[0]['t']
   df.groupBy('department').agg((_sum('salary')/lit(total)).alias('pct_of_total'))

6) Sorting

Definition: Arrange rows in specified order using .orderBy() / .sort().

Sort by salary descending, then name ascending

df.orderBy(desc('salary'), asc('name')).show()

Null handling: put nulls last

df.orderBy(col('salary').asc_nulls_last()).show()

Interview Questions (with answers):

  1. Stable sort by multiple keys (department asc, salary desc).
   df.orderBy(asc('department'), desc('salary'))
  1. How are nulls ordered by default? — For ascending: nulls come first; for descending: nulls last. Use asc_nulls_last() / desc_nulls_first() to control.

7) Joining

Definition: Combine two DataFrames on a key using inner, left, right, full, left_semi, left_anti.

emp_types = spark.createDataFrame([(1,'Full-time'),(2,'Part-time'),(7,'Contract')], ['id','employment_type'])

Inner join (only matching ids)

df.join(emp_types, 'id', 'inner').show()

Left join (all from left)

df.join(emp_types, 'id', 'left').show()

Left anti (in df but not in emp_types)

df.join(emp_types, 'id', 'left_anti').show()

Left semi (only keys that exist in right, but keep only left columns)

df.join(emp_types, 'id', 'left_semi').show()

Interview Questions (with answers):

  1. Find customers who have never placed an order.left_anti join of customers against orders on customer_id.
  2. Difference between left_semi and left_anti?left_semi keeps rows whose key exists in right; left_anti keeps rows whose key does not exist in right.
  3. Handle duplicate keys on either side? — Consider deduping keys before join, or understand that joins will multiply rows (Cartesian effect on duplicate keys).

8) Union

Definition: Concatenate two DataFrames vertically (same schema). For differing schemas, use unionByName with allowMissingColumns=True.

df_jan = df.filter(col('month')=='2025-01')
df_feb = df.filter(col('month')=='2025-02')

Simple union (schemas/order must match)

union_df = df_jan.select(df.columns).union(df_feb.select(df.columns))
union_df.show()

Union by name with missing columns

df_extra = spark.createDataFrame([(10, 'Zed')], ['id','name'])
union_flex = df.select('id','name').unionByName(df_extra, allowMissingColumns=True)
union_flex.show()

Interview Questions (with answers):

  1. Union two monthly files where February has an extra column region. — Use unionByName(allowMissingColumns=True) after aligning common columns.
  2. How to ensure column order doesn’t break the union? — Always select columns in a consistent order before union, or use unionByName.

9) Pivoting (and Unpivot/Melt)

Definition:

  • Pivot: long → wide
  • Unpivot (Melt): wide → long (Spark doesn’t have melt built-in; emulate with stack or use Pandas API on Spark)

Pivot: total salary by department x month

pivoted = df.groupBy('department').pivot('month').agg(_sum('salary'))
pivoted.show()

Fill nulls after pivot

pivoted.fillna(0).show()

Unpivot example using stack (assumes known month values)

months = ['2025-01','2025-02','2025-03']
expr = "stack({0}, {1}) as (month, total_salary)".format(
len(months),
', '.join([f"'{m}', {m}" for m in months])
)
unpivoted = pivoted.select('department', expr)
unpivoted.show()

Interview Questions (with answers):

  1. Pivot monthly sales, then fill missing months with zeros. — Use .pivot('month').agg(sum(...)).fillna(0).
  2. Unpivot unknown set of metric columns. — Build a stack expression dynamically or switch to Pandas API on Spark for melt().
  3. Performance tip: Provide an explicit list of pivot values to avoid an expensive distinct scan.

10) Window Functions

Definition: Perform calculations across sets of rows related to the current row, e.g., ranking, running totals, moving averages.

Rank employees by salary within each department

w_rank = Window.partitionBy('department').orderBy(desc('salary'))
df.withColumn('row_number', row_number().over(w_rank)) \
.withColumn('dense_rank', dense_rank().over(w_rank)) \
.withColumn('rank', rank().over(w_rank)) \
.show()

2-row rolling average salary ordered by month within department

w_roll = Window.partitionBy('department').orderBy('month').rowsBetween(-2, 0)
df.withColumn('rolling_avg_salary', avg('salary').over(w_roll)).show()

Lag/Lead examples

w_time = Window.partitionBy('department').orderBy('month')
df.withColumn('prev_salary', lag('salary', 1).over(w_time)) \
.withColumn('next_salary', lead('salary', 1).over(w_time)) \
.show()

Interview Questions (with answers):

  1. Assign a rank to employees within each department by salary; ties get the same rank. — Use dense_rank() over a partition by department order by salary desc.
  2. Compute a 3-month rolling average per department. — Window with rowsBetween(-2, 0) and avg('salary') ordered by month.
  3. Find employees whose salary increased compared to previous month. — Use lag('salary') and compare to current.
   w = Window.partitionBy('id').orderBy('month')
   df.withColumn('prev', lag('salary').over(w)) \
     .filter(col('salary') > col('prev'))

Bonus: End-to-End Exercise (with solution)

Problem:
You receive employee records monthly. Build a pipeline that:
1) Filters out rows with null salary,
2) Adds bonus = 0.1 * salary and total_comp = salary + bonus,
3) Aggregates average total_comp by department and month,
4) Pivots months as columns, filling nulls with 0,
5) Sorts departments by total of pivoted values descending.

Solution

clean = df.filter(col('salary').isNotNull())
enriched = clean.withColumn('bonus', col('salary')*0.1) \
.withColumn('total_comp', col('salary') + col('bonus'))
agg = enriched.groupBy('department','month').agg(avg('total_comp').alias('avg_total_comp'))
pivoted = agg.groupBy('department').pivot('month').agg(_sum('avg_total_comp')).fillna(0)
result = pivoted.withColumn('row_total', sum([col(c) for c in pivoted.columns if c != 'department'])) \
.orderBy(desc('row_total'))
result.show()

Extra Deep-Dive Questions (Short Answers)

  1. left_semi vs left_anti? — Semi keeps rows with matching keys in right; anti keeps rows without matches.
  2. Why prefer unionByName? — Safer across schema drift; avoids column order bugs.
  3. Pivot performance tips? — Provide explicit pivot values; reduce data beforehand; avoid high-cardinality pivots.
  4. Window vs GroupBy? — Window keeps row-level granularity; groupBy collapses rows per group.
  5. Null-safe equality in joins? — Use SQL <=> or DataFrame expression expr('a <=> b') for null-safe comparisons.
  6. Skew handling in joins? — Salting, broadcast small tables (broadcast(df)), repartitioning on key.
  7. Deterministic ordering caution: — Without an orderBy, output partition file order is not guaranteed.

Leave a Reply

Your email address will not be published. Required fields are marked *