Introduction
Spark does not execute transformations immediately instead it records the operations and executes when action is triggered.
Many programmers do mistakes when using spark dataframes in aws glue jobs like
Performing Expensive Computations Multiple Times
Mistake: Calling actions multiple times on the same dataset without caching.
Example:
df_filtered = df.filter(df["age"] > 30)
print(df_filtered.count()) # Trigger computation
df_filtered.show() # Trigger computation again
In this code second and third operation will trigger full computation twice.
Don’t worry if you are not able to understand why this recomputation is happening.
Tansformations (Lazy)
Transformations are operations that do not execute immediately. Instead of immediate execution, they return a reference to a new dataFrame, and the execution occurs when an action is triggered.
In spark if you load data from either s3 or databases, spark divides the data into multiple partitions.
To do some operations like groupBy(), join(), sum()
spark will have to shuffle data between these partitions, this can be expensive in terms of computation and memory.
Based on this, transformations are divided into two types.
Types of Transformations
Narrow Transformations (No Data Shuffle)
Narrow transformations are the operations which do not require data movement between partitions.
Examples of Narrow Transformations
Transformation | Description |
---|---|
map() | Transforms each row individually. |
filter() | Filters rows without moving data. |
select() | Selects specific columns. |
drop() | Removes specific columns. |
withColumn() | Adds or modifies a column. |
df_filtered = df.filter(col("amount") > 1000) # No data shuffling
df_mapped = df_filtered.withColumn("discount", col("amount") * 0.1) # Still no shuffling
Since each transformation works on the individual record, no data movement is required.
Wide Transformations (With Data Shuffle)
As name suggest wide transformation is an operation where data from multiple partitions needs to be reshuffled (moved between nodes) before computation. This is usually required for operations like aggregations, sorting, or joins.
Examples of Wide Transformations
Transformation | Description |
---|---|
groupBy() | Groups rows based on a column, causing shuffling. |
orderBy() | Requires sorting across partitions, causing shuffling. |
join() | If keys are not co-located, Spark has to reshuffle data. |
repartition() | Explicitly moves data to different partitions. |
df_grouped = df.groupBy("category").sum("amount") # Data needs to be reshuffled
df_sorted = df_grouped.orderBy("sum(amount)") # Data is shuffled again
These operations require data from multiple partitions, which means expensive computation.
Why Avoid Shuffling?
- Expensive in terms of computation and memory.
- Can cause performance bottlenecks if not optimized.
- Can lead to out-of-memory (OOM) errors in large datasets.
Optimization Tips to Minimize Shuffling
- Use narrow transformations whenever possible (avoid unnecessary
groupBy()
,join()
,orderBy()
). - Use
broadcast()
for small DataFrames in joins to avoid shuffling. - Cache intermediate results if used multiple times.
- Use partitioning strategies (
coalesce()
for reducing partitions,repartition()
only when needed).
Actions in Apache Spark
In Apache Spark, Actions are operations that trigger the execution of a computation and return a result. Unlike Transformations (which are lazy), Actions force Spark to compute and it also computes transformations mentioned before action as transformation are lazy and they execute only when action is called.
List of Common Spark Actions
Action | Description | Returns |
---|---|---|
show(n) | Displays the first n rows of a DataFrame. | Data on console |
collect() | Returns all elements from the DataFrame/RDD. | List/Array |
take(n) | Retrieves n elements. | List/Array |
count() | Returns the number of elements. | Integer |
first() | Returns the first element. | Single row/element |
head(n) | Similar to take(n) , retrieves n rows. | List/Array |
foreach(func) | Applies a function to each element but does not return a value. | None (Void) |
reduce(func) | Aggregates elements using a binary function (e.g., sum, max). | Single value |
saveAsTextFile(path) | Saves RDD to a text file. | None (Writes to storage) |
Example of Actions in Spark
Example 1: Using Actions with DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkActions").getOrCreate()
df = spark.read.csv("s3://bucket/transactions.csv", header=True, inferSchema=True)
# ACTIONS:
df.show(5) # Show first 5 rows
print(df.count()) # Count total rows
print(df.first()) # Get first row
data = df.collect() # Get all data as a list
Example 2: Using Actions with RDD
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
# ACTIONS:
print(rdd.collect()) # [1, 2, 3, 4, 5]
print(rdd.count()) # 5
print(rdd.reduce(lambda a, b: a + b)) # Sum: 15
Key Differences Between Actions & Transformations
Aspect | Transformations | Actions |
---|---|---|
Execution | Lazy (not executed immediately) | Triggers computation |
Return Type | Returns a new RDD/DataFrame | Returns a value or saves results |
Optimization | Can be optimized via DAG | Forces execution |
When to Use Actions?
- To retrieve data (
collect()
,show()
,take(n)
) - To count records (
count()
,first()
) - To save output (
saveAsTextFile(path)
) - To aggregate data (
reduce(func)
,foreach(func)
)
Best Practices for Using Actions
- Avoid
collect()
on large datasets (can cause memory overflow). - Use
take(n)
instead ofcollect()
if you only need a few rows. - Leverage
foreach()
for side effects (like logging). - Optimize with caching (
persist()
orcache()
before actions if used multiple times).
Key Takeaways
- Actions trigger execution of Spark computations.
- They return values (e.g.,
count()
,collect()
) or perform operations (e.g.,saveAsTextFile()
). - Be careful with
collect()
on large datasets (can crash Spark driver). - Optimize execution with caching and proper transformations before actions.
Directed Acyclic Graph (DAG) in Apache Spark
In Apache Spark, a DAG (Directed Acyclic Graph) is a logical execution plan that represents the sequence of computations performed on data. It is the backbone of Spark’s execution model.
Will explain more about DAG in upcoming blogs. Stay tuned!!
Please comment if you would like a real-world example of using Actions efficiently in a Glue job?