PySpark is a robust framework for big data processing, offering two main abstractions: RDD (Resilient Distributed Dataset) and DataFrame. Transformations in PySpark are operations applied to these datasets to produce new ones without altering the original data. Here’s a detailed guide on different transformations in PySpark with examples for both RDDs and DataFrames.
1. map()
Transformation
- Description: Applies a function to each element.
RDD Example:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
# Multiply each element by 2
mapped_rdd = rdd.map(lambda x: x * 2)
print(mapped_rdd.collect()) # Output: [2, 4, 6, 8, 10]
DataFrame Example:
df = spark.createDataFrame([(1,), (2,), (3,)], ["value"])
# Multiply each element by 2
df.withColumn("value_doubled", df["value"] * 2).show()
# Output:
# +-----+-------------+
# |value|value_doubled|
# +-----+-------------+
# | 1| 2|
# | 2| 4|
# | 3| 6|
# +-----+-------------+
2. filter()
Transformation
- Description: Filters elements or rows based on a condition.
RDD Example:
# Filter even numbers
filtered_rdd = rdd.filter(lambda x: x % 2 == 0)
print(filtered_rdd.collect()) # Output: [2, 4]
DataFrame Example:
# Filter rows where value is even
df.filter(df["value"] % 2 == 0).show()
# Output:
# +-----+
# |value|
# +-----+
# | 2|
# +-----+
3. flatMap()
Transformation
- Description: Maps each element to multiple outputs and flattens the result.
RDD Example:
rdd = spark.sparkContext.parallelize(["hello world", "apache spark"])
# Split each string into words
flat_mapped_rdd = rdd.flatMap(lambda x: x.split(" "))
print(flat_mapped_rdd.collect()) # Output: ['hello', 'world', 'apache', 'spark']
DataFrame Example:
df = spark.createDataFrame([("hello world",), ("apache spark",)], ["sentence"])
# Split sentences into words
from pyspark.sql.functions import explode, split
df.select(explode(split(df["sentence"], " ")).alias("word")).show()
# Output:
# +------+
# | word|
# +------+
# | hello|
# | world|
# |apache|
# | spark|
# +------+
4. groupByKey()
Transformation
- Description: Groups data by key.
RDD Example:
rdd = spark.sparkContext.parallelize([("a", 1), ("b", 2), ("a", 3)])
# Group values by key
grouped_rdd = rdd.groupByKey().mapValues(list)
print(grouped_rdd.collect()) # Output: [('a', [1, 3]), ('b', [2])]
DataFrame Example:
df = spark.createDataFrame([("a", 1), ("b", 2), ("a", 3)], ["key", "value"])
# Group values by key
df.groupBy("key").agg(collect_list("value").alias("values")).show()
# Output:
# +---+-------+
# |key| values|
# +---+-------+
# | a|[1, 3] |
# | b|[2] |
# +---+-------+
5. reduceByKey()
Transformation
- Description: Aggregates values for each key.
RDD Example:
# Sum values by key
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
print(reduced_rdd.collect()) # Output: [('a', 4), ('b', 2)]
DataFrame Example:
# Sum values by key
df.groupBy("key").sum("value").show()
# Output:
# +---+----------+
# |key|sum(value)|
# +---+----------+
# | a| 4|
# | b| 2|
# +---+----------+
6. join()
Transformation
- Description: Joins two datasets by key.
RDD Example:
rdd1 = spark.sparkContext.parallelize([("a", 1), ("b", 2)])
rdd2 = spark.sparkContext.parallelize([("a", 3), ("b", 4)])
# Join RDDs
joined_rdd = rdd1.join(rdd2)
print(joined_rdd.collect()) # Output: [('a', (1, 3)), ('b', (2, 4))]
DataFrame Example:
df1 = spark.createDataFrame([("a", 1), ("b", 2)], ["key", "value1"])
df2 = spark.createDataFrame([("a", 3), ("b", 4)], ["key", "value2"])
# Join DataFrames
df1.join(df2, "key").show()
# Output:
# +---+------+------+
# |key|value1|value2|
# +---+------+------+
# | a| 1| 3|
# | b| 2| 4|
# +---+------+------+
7. distinct()
Transformation
- Description: Removes duplicate elements or rows.
RDD Example:
rdd = spark.sparkContext.parallelize([1, 2, 2, 3, 3, 4])
# Get distinct elements
distinct_rdd = rdd.distinct()
print(distinct_rdd.collect()) # Output: [1, 2, 3, 4]
DataFrame Example:
df = spark.createDataFrame([(1,), (2,), (2,), (3,)], ["value"])
# Drop duplicate rows
df.distinct().show()
# Output:
# +-----+
# |value|
# +-----+
# | 1|
# | 2|
# | 3|
# +-----+
Key Differences Between RDD and DataFrame Transformations
- RDD Transformations: Provide low-level, functional transformations and are better suited for custom operations.
- DataFrame Transformations: Offer high-level APIs and are optimized for SQL-like queries.