Boosting Spark Union Operator Efficiency: Optimization Suggestions for Improved Question Pace | by Chengzhi Zhao | Apr, 2023

Demystify Spark Efficiency in Union Operator
The union operator is without doubt one of the set operators to merge two enter information frames into one. Union is a handy operation in Apache Spark for combining rows with the identical order of columns. One steadily used case is making use of completely different transformations after which unioning them collectively.
The methods of utilizing the union operation in Spark are sometimes mentioned broadly. Nonetheless, a hidden incontrovertible fact that has been much less mentioned is the efficiency caveat related to the union operator. If we didn’t perceive the caveat of the union operator in Spark, we’d fall into the lure of doubling the execution time to get the consequence.
We are going to deal with the Apache Spark DataFrame union operator on this story with examples, present you the bodily question plan, and share methods for optimization on this story.
Like Relational Database (RDBMS) SQL, the union is a direct technique to mix rows. One vital factor to notice when coping with a union operator is to make sure rows observe the identical construction:
- The variety of columns needs to be equivalent. The union operation gained’t silently work or fill with NULL when the variety of columns differs on information frames.
- The column information kind ought to match and resolves columns by place. The column title ought to observe the identical sequence for every information body. Nonetheless, that’s not necessary. The primary information body will probably be chosen because the default for the column title. So mixing order can probably trigger an undesired consequence. Spark
unionByName
is meant to resolve this problem.
In Spark, the operation unionAll
is an alias to union
that doesn’t take away duplication. We’d want so as to add distinct after performing union to carry out SQL-like union operations with out duplication.
We are able to additionally mix a number of information frames to supply a ultimate information body.
df = df1.union(df2).union(df3)
One typical sample of utilizing the union operator is splitting a single information body into a number of, then making use of completely different transformations, and finally combining them into the ultimate one.
Right here is an instance: we’ve got two massive tables (truth desk) that want to hitch, and one of the simplest ways to hitch is the SortMerged take part Spark. As soon as we acquired the SortMerged information body, we cut up it into 4 subsets. Every subset makes use of completely different transformations, and finally, we mix these 4 information frames into the ultimate one.
Spark information body leverages Catalyst optimizer, which takes the information body code you had, then performs code evaluation, logical optimization, bodily planning, and code technology. Catalyst tries to create an optimum plan that executes your Spark job effectively.
Lately, Spark has extensively achieved numerous optimization on Catalyst to enhance efficiency on Spark be part of operations. The be part of operation has extra eventualities to make use of than the union operation, resulting in much less effort put into the union operation.
If customers don’t use union on completely completely different information sources, union operators will face a possible efficiency bottleneck — Catalyst isn’t “good” to establish the shared information frames to reuse.
On this case, Spark will take every information body as separate branches, then carry out every thing from the basis a number of occasions. In our instance, we’ll carry out the 2 massive desk be part of 4 occasions! It’s a big bottleneck.
It’s simple to breed a non-optimized bodily question plan for the union operator in Spark. We are going to do the next
- Create two information frames from 1 to 1000000. Let’s name them
df1
anddf2
- Carry out internal be part of on
df1
anddf2
- Cut up the joined consequence into two information frames: one solely accommodates the odd numbers, one other one for the even numbers
- Add a change with a discipline referred to as
magic_value
, which is generated by two dummy transformations. - Union the odd and even quantity information frames
## Create two information frames from 1 to 1000000. Let's name them df1 and df2
df1 = spark.createDataFrame([i for i in range(1000000)], IntegerType())
df2 = spark.createDataFrame([i for i in range(1000000)], IntegerType())## Carry out internal be part of on df1 and df2
df = df1.be part of(df2, how="internal", on="worth")
## Cut up the joined consequence into two information frames: one solely accommodates the odd numbers, one other one for the even numbers
df_odd = df.filter(df.worth % 2 == 1)
df_even = df.filter(df.worth % 2 == 0)
## Add a change with a discipline referred to as magic_value which is generated by two dummy transformations.
df_odd = df_odd.withColumn("magic_value", df.worth+1)
df_even = df_even.withColumn("magic_value", df.worth/2)
## Union the odd and even quantity information frames
df_odd.union(df_even).rely()
Here’s a high-level view of what the DAG appears to be like like. If we take a look at the DAG bottom-up, one factor that stands out is the be part of occurred twice, and the upstream nearly appears to be like equivalent.
We have now seen the place Spark must optimize the union operator extensively, and far time is wasted performing pointless recomputing if the information supply could be reused.
Right here is the bodily plan that has 50 levels scheduled with AQE enabled. We are able to see ids 13 and 27. Spark did carry out be part of twice on every department and recomputed its department.
Now we will see this potential bottleneck. How might we resolve this? One possibility is to double the variety of executors to run extra concurrent duties. However there’s a higher technique to trace to Catalyst and let it reuse the joined information body from reminiscence.
To resolve the difficulty of the Spark efficiency of union operation, we will explicitly name a cache
to persist the joined information body in reminiscence. So Catalyst is aware of the shortcut to fetch the information as an alternative of returning it to the supply.
The place so as to add the cache()
? The really helpful place can be the information body earlier than the filtering and after the be part of is accomplished.
Let’s see it in motion:
# ...........................
## Carry out internal be part of on df1 and df2
df = df1.be part of(df2, how="internal", on="worth")## add cache right here
df.cache()
## Cut up the joined consequence into two information frames: one solely accommodates the odd numbers, one other one for the even numbers
df_odd = df.filter(df.worth % 2 == 1)
# ...........................
Right here is the question plan: InMemoryTableScan is current, so we will reuse the information body to avoid wasting different computing.
Now the bodily plan is diminished to have solely 32 levels, and if we examine, ids 1 and 15 each leverage the InMemoryTableScan. This might save far more time if we cut up the unique information frames into smaller datasets after which union them.
I hope this story helps present some insights into why typically the union operation turns into a bottleneck to your Spark efficiency. Because of the lack of optimization in Catalyst for the union operator in Spark, customers want to pay attention to such caveats to develop Spark code extra successfully.
Including cache can save time in our instance, but it surely gained’t assist if the union is carried out on two fully completely different information sources and there’s no shared place to carry out cache.
Kazuaki Ishizaki’s speak conjures up this story — Goodbye Hell of Unions in Spark SQL, and my expertise dealing with the same problem for my tasks.
ps: If you’re curiosity in tips on how to deal with information skew a part of Spark efficiency, I’ve one other story on TDS for it.