PySpark transformations

  • Each transformation has:
    1. Definition
    2. Example code + output
    3. Multiple interview questions
    4. Clear answers

I’ll also give extra deep-dive questions that interviewers love to ask to test both hands-on skills and conceptual clarity.


1. Filtering

Definition: Select rows from a DataFrame based on a condition.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

data = [
    (1, "John", 28, "HR", 4000),
    (2, "Alice", 34, "Finance", 5000),
    (3, "Bob", 23, "IT", 4500),
    (4, "Eve", 29, "Finance", 5500),
    (5, "Tom", 31, "IT", 3000),
]
df = spark.createDataFrame(data, ["id", "name", "age", "department", "salary"])

df.filter(col("age") > 30).show()
+---+-----+---+----------+------+
| id| name|age|department|salary|
+---+-----+---+----------+------+
|  2|Alice| 34|  Finance | 5000 |
|  5|  Tom| 31|      IT  | 3000 |
+---+-----+---+----------+------+

Interview Questions & Answers

Q: How would you filter employees in Finance earning more than 5000, ignoring null salaries?

    df.filter((col("department") == "Finance") & (col("salary") > 5000) & col("salary").isNotNull())
    

    Q: How do you filter rows where a column contains a substring "New"?

    df.filter(col("city").contains("New"))
    

    Q: What’s the difference between .filter() and .where()?
    A: They are aliases in PySpark — functionally identical.

      2. Selecting Columns

      Definition: Extract only the required columns.

      df.select("name", "salary").show()
      

      Interview Q&A

      Q: How do you select columns dynamically based on datatype?

        numeric_cols = [f.name for f in df.schema.fields if str(f.dataType) == "IntegerType"]
        df.select(*numeric_cols).show()
        

        Q: What happens if you select a non-existent column?
        A: PySpark throws an AnalysisException.

        Adding Columns

        df.withColumn("bonus", col("salary") * 0.1).show()
        

        Interview Q&A

        Q: How do you create a new column with different logic for different departments?

          from pyspark.sql.functions import when
          df.withColumn("adjusted_salary",
              when(col("department") == "IT", col("salary") * 1.1)
              .otherwise(col("salary"))
          ).show()
          

          Q: Difference between .withColumn() and .select() for adding columns?
          A: .withColumn() adds/modifies in place; .select() can also add, but you must re-list all columns.

          Dropping Columns

          df.drop("age").show()
          

          Interview Q&A

          1. Q: How to drop multiple columns programmatically?
          cols_to_drop = ["age", "city"]
          df.drop(*cols_to_drop)
          

          Grouping & Aggregating

          from pyspark.sql.functions import avg
          df.groupBy("department").agg(avg("salary").alias("avg_salary")).show()
          

          Interview Q&A

          1. Q: How to find top 3 departments by total salary?
          from pyspark.sql.functions import sum
          df.groupBy("department").agg(sum("salary").alias("total_salary")).orderBy(col("total_salary").desc()).limit(3)
          

          Sorting

          df.orderBy(col("salary").desc()).show()
          

          Interview Q&A

          Q: How to sort by department asc, salary desc?

            df.orderBy(col("department").asc(), col("salary").desc())
            

            Joining

            df2 = spark.createDataFrame([(1, "Full-time"), (2, "Part-time")], ["id", "emp_type"])
            df.join(df2, "id", "left").show()
            

            Interview Q&A

            Q: How to perform an anti join (rows in df but not in df2)?

              df.join(df2, "id", "left_anti")
              

              Union

              df_extra = spark.createDataFrame([(6, "Sam", 26, "HR", 4200)], df.columns)
              df.union(df_extra).show()
              

              Interview Q&A

              Q: How to union DataFrames with different column orders?
              A: Reorder columns before union.

                Pivoting

                df.groupBy("department").pivot("month").sum("salary").show()
                

                Interview Q&A

                Q: How to fill null values after pivot?

                  .fillna(0)
                  

                  Window Functions

                  from pyspark.sql.window import Window
                  from pyspark.sql.functions import row_number
                  w = Window.partitionBy("department").orderBy(col("salary").desc())
                  df.withColumn("rank", row_number().over(w)).show()
                  

                  Interview Q&A

                  Q: How to calculate 3-month rolling average salary per department?

                    from pyspark.sql.functions import avg
                    w = Window.partitionBy("department").orderBy("month").rowsBetween(-2, 0)
                    df.withColumn("rolling_avg", avg("salary").over(w))
                    

                    Leave a Reply

                    Your email address will not be published. Required fields are marked *