PySpark is a robust framework for big data processing, offering two main abstractions: RDD (Resilient Distributed Dataset) and DataFrame. Transformations in PySpark are operations applied to these datasets to produce new ones without altering the original data. Here’s a detailed guide on different transformations in PySpark with examples for both RDDs and DataFrames.

1. map() Transformation

  • Description: Applies a function to each element.

RDD Example:

rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

# Multiply each element by 2
mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect())  # Output: [2, 4, 6, 8, 10]

DataFrame Example:

df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])

# Multiply each element by 2
df.withColumn("value_doubled", df["value"] * 2).show()
# Output:
# +-----+-------------+
# |value|value_doubled|
# +-----+-------------+
# |    1|            2|
# |    2|            4|
# |    3|            6|
# +-----+-------------+

2. filter() Transformation

  • Description: Filters elements or rows based on a condition.

RDD Example:

# Filter even numbers
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print(filtered_rdd.collect())  # Output: [2, 4]

DataFrame Example:

# Filter rows where value is even
df.filter(df["value"] % 2 == 0).show()
# Output:
# +-----+
# |value|
# +-----+
# |    2|
# +-----+

3. flatMap() Transformation

  • Description: Maps each element to multiple outputs and flattens the result.

RDD Example:

rdd = spark.sparkContext.parallelize(["hello world", "apache spark"])

# Split each string into words
flat_mapped_rdd = rdd.flatMap(lambda x: x.split(" "))
print(flat_mapped_rdd.collect())  # Output: ['hello', 'world', 'apache', 'spark']

DataFrame Example:

df = spark.createDataFrame([("hello world",), ("apache spark",)], ["sentence"])

# Split sentences into words
from pyspark.sql.functions import explode, split
df.select(explode(split(df["sentence"], " ")).alias("word")).show()
# Output:
# +------+
# |  word|
# +------+
# | hello|
# | world|
# |apache|
# | spark|
# +------+

4. groupByKey() Transformation

  • Description: Groups data by key.

RDD Example:

rdd = spark.sparkContext.parallelize([("a", 1), ("b", 2), ("a", 3)])

# Group values by key
grouped_rdd = rdd.groupByKey().mapValues(list)
print(grouped_rdd.collect())  # Output: [('a', [1, 3]), ('b', [2])]

DataFrame Example:

df = spark.createDataFrame([("a", 1), ("b", 2), ("a", 3)], ["key", "value"])

# Group values by key
df.groupBy("key").agg(collect_list("value").alias("values")).show()
# Output:
# +---+-------+
# |key| values|
# +---+-------+
# |  a|[1, 3] |
# |  b|[2]    |
# +---+-------+

5. reduceByKey() Transformation

  • Description: Aggregates values for each key.

RDD Example:

# Sum values by key
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
print(reduced_rdd.collect())  # Output: [('a', 4), ('b', 2)]

DataFrame Example:

# Sum values by key
df.groupBy("key").sum("value").show()
# Output:
# +---+----------+
# |key|sum(value)|
# +---+----------+
# |  a|         4|
# |  b|         2|
# +---+----------+

6. join() Transformation

  • Description: Joins two datasets by key.

RDD Example:

rdd1 = spark.sparkContext.parallelize([("a", 1), ("b", 2)])
rdd2 = spark.sparkContext.parallelize([("a", 3), ("b", 4)])

# Join RDDs
joined_rdd = rdd1.join(rdd2)
print(joined_rdd.collect())  # Output: [('a', (1, 3)), ('b', (2, 4))]

DataFrame Example:

df1 = spark.createDataFrame([("a", 1), ("b", 2)], ["key", "value1"])
df2 = spark.createDataFrame([("a", 3), ("b", 4)], ["key", "value2"])

# Join DataFrames
df1.join(df2, "key").show()
# Output:
# +---+------+------+
# |key|value1|value2|
# +---+------+------+
# |  a|     1|     3|
# |  b|     2|     4|
# +---+------+------+

7. distinct() Transformation

  • Description: Removes duplicate elements or rows.

RDD Example:

rdd = spark.sparkContext.parallelize([1, 2, 2, 3, 3, 4])

# Get distinct elements
distinct_rdd = rdd.distinct()
print(distinct_rdd.collect())  # Output: [1, 2, 3, 4]

DataFrame Example:

df = spark.createDataFrame([(1,), (2,), (2,), (3,)], ["value"])

# Drop duplicate rows
df.distinct().show()
# Output:
# +-----+
# |value|
# +-----+
# |    1|
# |    2|
# |    3|
# +-----+

Key Differences Between RDD and DataFrame Transformations

  • RDD Transformations: Provide low-level, functional transformations and are better suited for custom operations.
  • DataFrame Transformations: Offer high-level APIs and are optimized for SQL-like queries.