Aggregations in PySpark involve performing summary computations on data, such as calculating sums, averages, counts, or other statistical measures. These operations are often used to gain insights from datasets, such as finding the total sales by region, average grades by student, or counts by category.


How Aggregations Work

  1. Group Data: Data is grouped using one or more columns.
  2. Apply Aggregation Functions: Operations like sum, avg, count, max, and min are applied to the grouped data.
  3. Return Results: The results of the aggregation are returned as a new DataFrame.

Syntax for Aggregations

1. Basic Aggregations

Basic aggregations involve applying aggregate functions (like sum, avg, count, etc.) directly to an entire DataFrame without grouping the data. These operations provide overall summary statistics for the dataset.


How It Works:

  1. No Grouping: The entire DataFrame is treated as a single group.
  2. Aggregation Functions: Functions like sum, avg, count, max, and min are applied to specific columns.
  3. Results: The output is a single row or scalar values representing the overall summary.

Case Scenarios for Basic Aggregations

1. Calculating Total Revenue

Scenario: A company wants to calculate the total revenue generated from all sales.

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

# Sample DataFrame
data = [("2023-01-01", 1000), ("2023-01-02", 1500), ("2023-01-03", 2000)]
columns = ["Date", "Revenue"]

spark = SparkSession.builder.appName("BasicAggregation").getOrCreate()
df = spark.createDataFrame(data, columns)

# Total Revenue
df.select(sum("Revenue").alias("Total Revenue")).show()

Output:

+-------------+
|Total Revenue|
+-------------+
|         4500|
+-------------+

2. Finding Maximum Salary

Scenario: HR wants to find the highest salary paid across all employees.

from pyspark.sql.functions import max

# Sample DataFrame
data = [("Alice", 5000), ("Bob", 6000), ("Charlie", 7000)]
columns = ["Name", "Salary"]

df = spark.createDataFrame(data, columns)

# Maximum Salary
df.select(max("Salary").alias("Highest Salary")).show()

Output:

+--------------+
|Highest Salary|
+--------------+
|          7000|
+--------------+

3. Counting Total Rows

Scenario: A data engineer wants to count the number of rows in a dataset to validate data loading.

from pyspark.sql.functions import count

# Counting total rows
df.select(count("*").alias("Total Rows")).show()

Output:

+----------+
|Total Rows|
+----------+
|         3|
+----------+

4. Average Sales

Scenario: A manager wants to calculate the average daily sales across a month.

from pyspark.sql.functions import avg

# Average Sales
df.select(avg("Salary").alias("Average Salary")).show()

Output:

+--------------+
|Average Salary|
+--------------+
|        6000.0|
+--------------+

5. Combining Multiple Aggregations

Scenario: Management wants a single report showing total revenue, maximum revenue, and average revenue.

from pyspark.sql.functions import sum, max, avg

# Combine multiple aggregations
df.select(
    sum("Salary").alias("Total Salary"),
    max("Salary").alias("Highest Salary"),
    avg("Salary").alias("Average Salary")
).show()

Output:

+------------+--------------+--------------+
|Total Salary|Highest Salary|Average Salary|
+------------+--------------+--------------+
|       18000|          7000|        6000.0|
+------------+--------------+--------------+

2. Grouped Aggregations

Grouped aggregations involve dividing a dataset into groups based on one or more columns, then applying aggregation functions like sum, avg, count, etc., to each group independently. This is commonly used to generate insights like totals, averages, or counts within categories or subcategories.


How It Works

  1. Group Data: Use the groupBy() function to specify one or more columns for grouping.
  2. Apply Aggregation Functions: Apply aggregate functions like sum(), avg(), count(), min(), and max() to each group.
  3. Output Results: Each group is treated separately, and the results of the aggregation are returned as a new DataFrame.

Case Scenarios for Grouped Aggregations

Scenario: A retail company wants to calculate total sales for each product.

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

# Create a Spark session
spark = SparkSession.builder.appName("GroupedAggregation").getOrCreate()

# Sample DataFrame
data = [("ProductA", 100), ("ProductB", 200), ("ProductA", 150), ("ProductB", 300)]
columns = ["Product", "Sales"]

df = spark.createDataFrame(data, columns)

# Group by Product and calculate total sales
df.groupBy("Product").agg(sum("Sales").alias("Total Sales")).show()

Output:

+--------+-----------+
| Product|Total Sales|
+--------+-----------+
|ProductA|        250|
|ProductB|        500|
+--------+-----------+

2. Average Salary by Department

Scenario: An organization wants to find the average salary of employees in each department.

from pyspark.sql.functions import avg

# Sample DataFrame
data = [("HR", 5000), ("Finance", 6000), ("HR", 5500), ("Finance", 6500)]
columns = ["Department", "Salary"]

df = spark.createDataFrame(data, columns)

# Group by Department and calculate average salary
df.groupBy("Department").agg(avg("Salary").alias("Average Salary")).show()

Output:

+----------+--------------+
|Department|Average Salary|
+----------+--------------+
|        HR|        5250.0|
|   Finance|        6250.0|
+----------+--------------+

3. Count of Employees by Department

Scenario: HR wants to count the number of employees in each department.

from pyspark.sql.functions import count

# Count employees by department
df.groupBy("Department").agg(count("*").alias("Employee Count")).show()

Output:

+----------+--------------+
|Department|Employee Count|
+----------+--------------+
|        HR|             2|
|   Finance|             2|
+----------+--------------+

4. Highest Sale by Region

Scenario: A company wants to identify the highest sale in each region.

from pyspark.sql.functions import max

# Sample DataFrame
data = [("East", 1000), ("West", 2000), ("East", 1500), ("West", 2500)]
columns = ["Region", "Sales"]

df = spark.createDataFrame(data, columns)

# Group by Region and calculate maximum sales
df.groupBy("Region").agg(max("Sales").alias("Highest Sale")).show()

Output:

+------+------------+
|Region|Highest Sale|
+------+------------+
|  East|        1500|
|  West|        2500|
+------+------------+

5. Multi-Aggregations for Each Group

Scenario: A manager wants a comprehensive report for each department, showing total salary, average salary, and the number of employees.

from pyspark.sql.functions import sum, avg, count

# Multi-aggregations
df.groupBy("Department").agg(
    sum("Salary").alias("Total Salary"),
    avg("Salary").alias("Average Salary"),
    count("*").alias("Employee Count")
).show()

Output:

+----------+------------+--------------+--------------+
|Department|Total Salary|Average Salary|Employee Count|
+----------+------------+--------------+--------------+
|        HR|        10500|        5250.0|             2|
|   Finance|        12500|        6250.0|             2|
+----------+------------+--------------+--------------+

6. Aggregations on Multiple Columns

Scenario: Calculate total and average revenue for each product and region combination.

# Sample DataFrame
data = [
    ("East", "ProductA", 100),
    ("West", "ProductB", 200),
    ("East", "ProductA", 150),
    ("West", "ProductB", 300)
]
columns = ["Region", "Product", "Revenue"]

df = spark.createDataFrame(data, columns)

# Group by Region and Product and calculate aggregations
df.groupBy("Region", "Product").agg(
    sum("Revenue").alias("Total Revenue"),
    avg("Revenue").alias("Average Revenue")
).show()

Output:

+------+--------+-------------+---------------+
|Region| Product|Total Revenue|Average Revenue|
+------+--------+-------------+---------------+
|  East|ProductA|          250|          125.0|
|  West|ProductB|          500|          250.0|
+------+--------+-------------+---------------+

Key Notes

  1. Aggregation Functions:
    • sum: Sum of values in the group.
    • avg: Average value in the group.
    • count: Count of rows in the group.
    • max: Maximum value in the group.
    • min: Minimum value in the group.
  2. Combining Aggregations: You can apply multiple aggregations to the same group using .agg().
  3. Multiple Columns in groupBy: Use multiple columns in groupBy() to create nested groups.
  4. Performance: Ensure your data is partitioned efficiently for large-scale aggregations.

3. Using groupBy with Specific Functions

You can apply specific aggregation functions directly after groupBy.

  • sum: Sum of values in the group.
  • avg: Average value in the group.
  • count: Count of rows in the group.
  • max: Maximum value in the group.
  • min: Minimum value in the group.
# Group by Department and calculate total salary
df.groupBy("Department").sum("Salary").show()

4. Window Functions for Aggregation

Window functions allow aggregation over a specific window of data without grouping the entire dataset.

will learn more about this in window functions in pyspark page.

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

# Define a window partitioned by Department and ordered by Salary
window_spec = Window.partitionBy("Department").orderBy(col("Salary").desc())

# Add rank based on salary within each department
df.withColumn("Rank", rank().over(window_spec)).show()

5. Pivot Tables

Perform pivoting to calculate aggregations across columns.

# Pivot by Department and calculate total salary
df.groupBy("Name").pivot("Department").sum("Salary").show()

6. RDD Aggregations

If working with RDDs, you can use transformations like reduceByKey, aggregateByKey, or fold.

# Sample RDD
rdd = spark.sparkContext.parallelize([("Sales", 5000), ("Sales", 4000), ("Marketing", 6000)])

# Calculate total salary by department
rdd.reduceByKey(lambda x, y: x + y).collect()

Common Aggregation Functions

PySpark provides a wide range of functions for aggregation:

FunctionDescription
sumSum of values
avgAverage of values
countCount of rows
maxMaximum value
minMinimum value
countDistinctCount distinct values
stddevStandard deviation
varianceVariance

13 thoughts on “Aggregations”
  1. Very efficiently written information. It will be valuable to anyone who usess it, including yours truly :). Keep up the good work – for sure i will check out more posts.

  2. Wow, incredible blog layout! How long have you been blogging for? you made blogging look easy. The overall look of your web site is wonderful, let alone the content!

  3. Howdy! I know this is kinda off topic but I’d figured I’d ask. Would you be interested in trading links or maybe guest authoring a blog article or vice-versa? My website covers a lot of the same subjects as yours and I feel we could greatly benefit from each other. If you’re interested feel free to send me an e-mail. I look forward to hearing from you! Excellent blog by the way!

  4. Keep up the fantastic piece of work, I read few posts on this website and I think that your web site is really interesting and has bands of fantastic info .

Leave a Reply

Your email address will not be published. Required fields are marked *