AI

Mastering ExternalTaskSensor in Apache Airflow: The right way to Calculate Execution Delta | by Casey Cheng | Might, 2023

Exterior Job Sensors are like gatekeepers — they cease unhealthy knowledge from trickling downstream. Image by Freepik.

Orchestrating a knowledge pipeline is a fragile endeavor. In a knowledge pipeline, we are able to have 1000’s of duties operating concurrently and they’re usually depending on each other. If we’re not cautious, a single level of failure can have a domino-like impact that trickles downstream and mess up the entire pipeline.

Apache Airflow launched the Exterior Job Sensor to place an finish to those points. Whereas it’s an especially highly effective function, it additionally comes with a point of complexity.

On this introductory piece, I hope to untangle a number of the confusion surrounding the Exterior Job Sensor and present how we are able to use it to boost the reliability of our knowledge pipelines — making sense of sensors!

Meet Jamie, a rookie chef at Airflow Bakery. She’s new. Her solely duty is to make a brand new batch of cookie dough each hour.

Jamie’s obligations as proven in a “DAG” format. Chef (F) icon by Freepik.

After which we’ve Gordon Damnsie, the cookie grasp. Gordon takes the dough from Jamie and turns them into award-winning cookies.

Gordon’s obligations as proven in a “DAG” format. Chef (M) icon by Freepik.

One advantageous day, Gordon swoops in to seize the freshest dough he can discover and bakes cookies. However when he takes a chunk, yuck! “Unhealthy” would’ve been an understatement. Gordon rapidly discovers the basis trigger was the stale dough, which was left over from per week in the past.

Gordon, visibly annoyed, tosses the cookies into the bin. After he composes himself, he slowly turns to Jamie and asks, “Why is the dough not contemporary?”

“I needed to cease making them, Chef. There was an issue with the uncooked elements,” Jamie replies, making an attempt to remain calm within the face of Gordon’s anger. Sadly, the unhealthy cookies had already been served to purchasers and so they now not belief the meals high quality of the bakery.

This slight detour is a cautionary story on the significance of validating the freshness of information sources. Within the story, Gordon’s success depends on Jamie, however they’re working independently with out speaking with one another. They “belief” that the opposite particular person will do their job flawlessly. However as any knowledge practitioner will know, every little thing that may go unsuitable will go unsuitable in a knowledge pipeline.

Ideally, Gordon ought to verify with Jamie whether or not she made dough not too long ago. As soon as he has confirmed, it implies that the dough is contemporary so he can proceed to bake his cookies. In any other case, cease baking and determine what went unsuitable.

You see, what Gordon wants… is an exterior activity sensor.

An exterior activity sensor checks whether or not different folks accomplished their assigned activity. It senses the completion of an exterior activity, therefore the identify.

Within the context of Airflow, Jamie and Gordon are DAGs. They’ve particular duties that they should full.

After we add an Exterior Job Sensor, it turns into the intermediary that coordinates between the 2 impartial DAGs. The sensor will verify on Jamie at a selected time to see if she has accomplished her activity.

If Jamie efficiently completes her activity, the sensor will inform Gordon in order that he can keep it up together with his downstream duties.

The exterior activity sensor — check_dough() returns as successful after verifying that make_dough() ran efficiently. Chef (F) and Chef (M) icons by Freepik.

If Jamie fails to finish her activity, the sensor stops Gordon from doing any duties which have a dependency on the failed activity.

The exterior activity sensor — check_dough() returns as a fail after verifying that make_dough() didn’t run efficiently. Chef (F) and Chef (M) icons by Freepik.

Having this extra layer of validation basically stops stale knowledge from trickling additional downstream and polluting the remainder of our pipeline with soiled, inaccurate knowledge.

Airflow makes it very straightforward to create an Exterior Job Sensor — simply import them. The syntax will look one thing like this:

from airflow.sensors.external_task import ExternalTaskSensor

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_delta=timedelta(minutes=30),
# execution_date_fn=my_function,
timeout=1800,
poke_interval=300,
mode='reschedule'
)

Right here’s what they imply:

  1. dag is the present DAG object. Since Gordon is the one who desires to verify whether or not Jamie made dough, this could level to Gordon’s DAG.
  2. task_id is the distinctive identify for this Exterior Job Sensor.
  3. external_dag_id is the identify of the DAG you need to verify. On this case, Jamie’s DAG.
  4. external_task_id is the identify of the particular activity you need to verify. Ideally, we must always all the time specify this. In any other case, the sensor will verify for the completion of the complete DAG as a substitute of only one particular activity. In different phrases, Gordon will do nothing till Jamie finishes chopping onions, washing dishes, and restocking the pantry, regardless that we solely need to know whether or not she made dough. Or worse, if any certainly one of these irrelevant duties fails, the sensor will unnecessarily pause your entire pipeline.
  5. e mail is the record of individuals you need Airflow to inform when the Exterior Job Sensor fails. Understand that for this to work, it is advisable have the SMTP settings correctly configured within the Airflow configuration file.
  6. execution_delta is arguably essentially the most complicated half about Exterior Job Sensors but in addition a very powerful. So, I’m dedicating a complete part to it below. Maintain scrolling!
  7. execution_date_fn and execution delta are very comparable. We are able to solely use certainly one of them at a time. Generally it’s simpler to make use of this moderately than execution delta. I’m additionally giving this its personal part below.
  8. timeout limits how lengthy a sensor can keep alive. After we create a sensor, it consumes assets by occupying one employee slot. If the goal activity by no means completes, these sensors will hold checking indefinitely whereas hogging the employee slot. Over time, we are able to run right into a Sensor Deadlock, the place all employee slots change into occupied by ineffective sensors and no duties can run anymore. Due to this fact, it’s greatest observe to set a most time restrict for the checks.
  9. poke_interval is the period earlier than the sensor checks once more if the earlier verify fails. The rationale is that we don’t need the sensor to verify excessively like a madman, because it provides pointless masses to the server. On the flip aspect, checking too sometimes means the sensor will wait longer than vital, delaying the pipeline. The trick is to search out the candy spot primarily based on the anticipated run time of the exterior activity.
  10. mode is how we wish the sensor to behave. It may be set to “poke” or “reschedule”.
    When set to “poke”, the sensor goes to sleep on failure and wakes up on the subsequent poke interval to attempt once more. It’s like being on standby mode. The sensor can be extra reactive, however because it’s on standby, the employee slot stays occupied all through the entire course of.
    When set to “reschedule”, the sensor will verify as soon as. If the verify fails, the sensor will schedule one other verify at a later time however terminates itself for now, releasing up the employee slot. Airflow recommends utilizing “reschedule” if the poke interval is larger than 60 seconds.

Alright, that’s nearly each parameter we have to find out about Exterior Job Sensor. Granted that this record just isn’t exhaustive, figuring out these 10 parameters can be greater than sufficient for us to arrange our Exterior Job Sensor correctly for just about all use circumstances.

For completeness’ sake, I’ll embody Airflow’s official documentation for individuals who are wanting to discover it in additional element.

Within the part above, I’ve glossed over these two parameters as a result of they’re arguably essentially the most infamous, annoying, and complicated a part of exterior activity sensors. However I feel it’s time we sort out them.

So what are execution_delta and execution_date_fn?

Constructing on our analogy, external_task_id tells the sensor to verify if Jamie accomplished the make_dough() activity. However she makes a whole lot of dough — as soon as each hour. Are we checking if she baked prior to now hour, yesterday, or final week?

This ambiguity confuses Exterior Job Sensors and that’s why Airflow got here up with two methods for us to speak this data. Each execution_delta and execution_date_fn are supposed to inform sensors the particular time of the duty.

  1. execution_delta expresses time on a relative foundation, e.g.: “Did Jamie bake half-hour in the past?” It accepts a datetime.timedelta object as its argument, e.g: datetime.timedelta(minutes=30).
  2. execution_date_fn expresses time on an absolute foundation, e.g.: “Did Jamie bake on the third Might 2023 at 4.30 pm?” It accepts a callable Python perform as its argument. This perform ought to return the execution date of the duty that we need to verify on, e.g: datetime.datetime(yr=2023,month=5,day=3,hour=4,minute=30).

Since each of them convey the identical data, Airflow solely permits us to make use of one or the opposite, however not each on the identical time.

I usually use execution_delta because the de-facto alternative. However, there are eventualities the place it’s too sophisticated to calculate the execution_delta. In that case, I’d use execution_date_fn as a substitute.

The right way to calculate execution_delta?

The phrase, execution_delta, is brief for delta (a.okay.a distinction) of execution dates (a.okay.a the earlier runtime of our duties).

The method for execution_delta. Picture by writer.

I’d like to focus on the key phrase right here — “earlier”.

A few of you might be questioning… Why does Airflow need the time distinction of earlier runs, however not the present runs? This used to confuse the crap out of me once I first began utilizing Airflow.

Seems there’s a completely good purpose. Nevertheless, I don’t need to derail from the subject at hand so I’ll embody it within the later part (here). For now, let’s simply settle for the method as-is and see how we’d apply this.

Suppose that Jamie makes dough each hour (e.g: 13:00 pm, 14:00 pm, 15:00 pm, …). Gordon additionally makes cookies each hour, however he makes them on the thirtieth minute of each hour (e.g: 13:30 pm, 14:30 pm, 15:30 pm, …).

At 14:30 pm sharp, Gordon will get able to bake his cookie. Earlier than he begins, he would want to verify if Jamie made contemporary dough not too long ago. The newest run for make_dough() could be 14:00 pm.

This time sequence reveals the duty dependencies between Jamie and Gordon. Gordon all the time checks whether or not Jamie accomplished her activity half an hour in the past. Chef (F) and Chef (M) icons by Freepik.

On condition that each Gordon and Jamie’s duties are scheduled hourly, their execution date (a.okay.a earlier runs) for the 14:30 pm run could be…

  • Gordon’s execution date = 14:30 pm — 1 hour = 13:30 pm
  • Jamie’s execution date = 14:00 pm — 1 hour = 13:00 pm

We are able to plug these values into the method, and voilà!

The execution_delta comes out to be datetime.timedelta(minute=30) for one particular run. Picture by writer.

You are able to do the identical calculation for various runs of the duties to get their respective execution_delta.

When calculating execution delta, it’s useful to put them out in a format like this. We need to calculate the execution deltas for a number of runs, not only one, to be able to be certain that they’re all the identical! Picture by writer.

On this (cherry-picked) instance, all the execution_delta seems to be precisely the identical. We are able to go this to our Exterior Job Sensor and every little thing will work.

from airflow.sensors.external_task import ExternalTaskSensor

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_delta=timedelta(minutes=30), # Move the execution delta right here
timeout=1800,
poke_interval=300,
mode='reschedule'
)

However-!

The execution_delta can be totally different generally. This often occurs when the schedule intervals of the 2 dags are totally different (e.g.: day by day vs weekly, day by day vs month-to-month, …).

For instance, let’s say that Jamie makes her dough weekly on Sunday at 14:00 pm, however Gordon makes his cookies day by day at 14:30 pm.

The arrow between Jamie’s activity and Gordon’s sensor represents the execution delta. The execution delta will get longer over the week till it resets once more on Sunday. Chef (F) and Chef (M) by Freepik.

If we do the identical calculations, you will note that the execution deltas differ for each run.

Be aware that execution deltas can range for various runs. Picture by writer.

This turns into an issue as a result of execution_delta solely accepts a single datetime object as its argument. We are able to’t enter a unique worth of execution_delta for each run.

In circumstances like this, we want execution_date_fn.

The right way to calculate Execution Date Operate?

The execution_date_fn is only a common Python perform. As with all Python features, it takes some argument(s) and returns some output(s). However the great thing about utilizing a perform is the flexibility to return a unique output primarily based on the perform’s inputs and logic.

Within the case of execution_date_fn, Airflow passes the present activity’s execution date as an argument and expects the perform to return the exterior activity’s execution date. Be aware that these execution dates should be expressed in UTC time.

def my_exec_date_fn(gordon_exec_date):
# Add your logic right here.
return jamie_exec_date

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn, # Move the perform right here.
timeout=1800,
poke_interval=300,
mode='reschedule'
)

Primarily based on our earlier case examine, our execution_date_fn would want to do the next…

My Airflow is configured to native time (GMT+8), so I must deduct 8 hours to get the UTC time. Picture by writer.

One naive approach may very well be hardcoding each single run, till the tip of time.

# The naive approach (It is a unhealthy observe. Do not do that.)
def my_exec_date_fn(gordon_exec_date):
if gordon_exec_date == datetime(yr=2023,month=3,day=14,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(yr=2023,month=3,day=15,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(yr=2023,month=3,day=16,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
elif gordon_exec_date == datetime(yr=2023,month=3,day=17,hour=6,minute=30):
jamie_exec_date = datetime(yr=2023,month=3,day=5,hour=6,minute=0)
...

return jamie_exec_date

This works however it’s positively not essentially the most environment friendly approach.

A greater method is to search for constant patterns and use that to programmatically derive the outputs. Normally, a great place to search for patterns is the execution_delta, because it comprises the connection between the execution dates (we talked about this here).

Moreover, we are able to additionally have a look at datetime attributes, such because the day of the week. If we actually give it some thought, our Exterior Job Sensor will all the time be pointing to a Sunday as a result of Jamie solely makes dough on Sunday. As we transfer by way of the week, Gordon’s activity date can be additional and additional away from this Sunday till it resets once more the subsequent Sunday. Then, it repeats.

That is displaying the time distinction between the present runs for simplicity’s sake. Execution_date_fn appears at earlier runs, however we’ll see the identical patterns there too. Chef (F) and Chef (M) icons by Freepik.

This implies that day of the week can be useful in arising with our execution_date_fn. So let’s add the day of the week to our desk. I’ll be labeling Monday as 1 and Sunday as 7 as per the ISO 8601 commonplace.

The numbers in brackets are the week of day, the place Monday is 1 and Sunday is 7. Picture by writer.

By labeling them, it turns into instantly clear that…

  • The execution_delta begins from 6 on a Saturday.
  • The execution_delta will increase by 1 day-after-day, as much as a most of 12 each Friday.
  • The execution_delta then resets again to a 6 on a Saturday.

We are able to re-create that relationship in a Python perform and assign this execution_date_fn to our Exterior Job Sensor.

def my_exec_date_fn(gordon_exec_date):
day_of_week = gordon_exec_date.isoweekday()

if day_of_week in (6, 7):
time_diff = timedelta(days=day_of_week, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
elif day_of_week in (1, 2, 3, 4, 5):
time_diff = timedelta(days=day_of_week+7, minute=30)
jamie_exec_date = gordon_exec_date - time_diff

return jamie_exec_date

ext_task_sensor = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn,
timeout=1800,
poke_interval=300,
mode='reschedule'
)

There we’ve it — our very personal execution_date_fn. With a little bit of creativity, execution_date_fn can cater to any situation.

Up till this level, we’ve coated every little thing it is advisable know to get began with Exterior Job Sensor. On this part, I believed it’d be good to collate all the issues we’ve realized to see how the items match collectively in our knowledge pipelines.

Initially, we’ll be creating Jamie DAG, in a file known as jamie_dag.py.

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor

# Outline activity 1
def make_dough():
# embody your secret recipe right here!
return cookies

# Create DAG
jamie_tasks = DAG(
dag_id='jamie_tasks',
description='Jamie to do record. (a.okay.a making dough solely)',
schedule_interval='5 3 * * *',
...
)

# Embrace activity 0 in DAG (as a place to begin)
begin = DummyOperator(
dag=jamie_tasks,
task_id='begin'
)

# Embrace activity 1 in DAG
make_dough = PythonOperator(
dag=jamie_tasks,
task_id='make_dough',
python_callable=make_dough,
...
)

# Create dependencies (deciding the sequence of activity to run)
begin >> make_dough

Then, we’ll be creating Gordon DAG, in one other file known as gordon_dag.py.

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor

# Outline activity 1
def bake_cookies():
# embody your secret recipe right here!
return cookies

# Outline activity 2
def make_money():
# embody your cash making approach step-by-step right here.
return cash

# Outline execution_date_fn for sensor 1
def my_exec_date_fn(gordon_exec_date):
day_of_week = gordon_exec_date.isoweekday()

if day_of_week in (6, 7):
time_diff = timedelta(days=day_of_week, minute=30)
jamie_exec_date = gordon_exec_date - time_diff
elif day_of_week in (1, 2, 3, 4, 5):
time_diff = timedelta(days=day_of_week+7, minute=30)
jamie_exec_date = gordon_exec_date - time_diff

return jamie_exec_date

# Create DAG
gordon_tasks = DAG(
dag_id='gordon_tasks',
description='Checklist of issues that Gordon must do.',
schedule_interval='5 3 * * *',
...
)

# Embrace activity 0 in DAG (as a place to begin)
begin = DummyOperator(
dag=gordon_tasks,
task_id='begin'
)

# Embrace activity 1 in DAG
bake_cookies = PythonOperator(
dag=gordon_tasks,
task_id='bake_cookies',
python_callable=bake_cookies,
...
)

# Embrace activity 2 in DAG
make_money = PythonOperator(
dag=gordon_tasks,
task_id='make_money',
python_callable=make_money,
...
)

# Create sensor 1
check_dough_freshness = ExternalTaskSensor(
dag=gordon_tasks,
task_id='check_dough_freshness',
external_dag_id='jamie_tasks',
external_task_id='make_new_dough',
e mail=['gordon.damnsie@gmail.com', 'jamie@gmail.com'],
execution_date_fn=my_exec_date_fn,
timeout=1800,
poke_interval=300,
mode='reschedule'
)

# Create dependencies (deciding the sequence of activity to run)
(begin
>> check_dough_freshness
>> bake_cookies
>> make_money)

Be aware that Exterior Job Sensor is in gordon_dag.py and never jamie_dag.py since we wish Gordon to be checking on Jamie, not the opposite approach round. Gordon’s DAG could be the present DAG and Jamie the exterior DAG.

And… there we’ve it!

We’ve created our very first Exterior Job Sensor, check_dough_fresness. This sensor will poke Jamie’s make_new_dough() returns both Success or Fail. If it fails, bake_cookies() and make_money() won’t run.

Dates in Apache Airflow are complicated as a result of there are such a lot of date-related terminologies, comparable to start_date, end_date, schedule_interval, execution_date, and so forth. It’s a large number, actually. However let’s attempt to determine it out with a narrative.

Suppose that our boss desires to know the gross sales efficiency of his firm. He desires this knowledge to be refreshed day-after-day at 12 midnight for the subsequent 6 months.

First, we write a sophisticated SQL question that generates the gross sales efficiency knowledge. It takes 6 hours to run the question.

  • task_start is the beginning time of a activity.
  • task_end is the tip time of a activity.
  • task_duration is the time it takes to run the duty.
A single activity. Picture by writer.

Each day, we might want to run this activity at 12 midnight.

A single activity, scheduled at 12am and runs for six hours. Picture by writer.

To automate this question, we create an Airflow DAG and specify the start_date and end_date. Airflow will execute the DAG so long as immediately’s date falls inside this era.

An Airflow DAG. Picture by writer.

Then, we put the duty into the Airflow DAG.

We’d like this knowledge refreshed as soon as a day at 12 midnight. So, we set the schedule_interval to "0 0 * * *", which is the CRON equal of day by day at 12 midnight.

The schedule_interval basically provides a delay between every consecutive schedule, telling Airflow solely run the duty at a selected time, since we don’t need the duty to re-run once more as quickly because it finishes.

  • interval_start refers back to the begin time of a specific schedule interval.
  • interval_end refers back to the finish time of a specific schedule interval.
Be aware that interval_start and interval_end can overlap. The interval_end of the earlier schedule interval would be the identical because the interval_start of the subsequent schedule interval. Picture by writer.

Right here comes essentially the most mind-blowing half — though seemingly counterintuitive, Airflow Scheduler triggers a DAG run on the finish of its schedule interval, moderately than at the start of it.

Which means Airflow won’t do something within the first-ever schedule interval. Our question will run for the primary time on 2nd Jan 2023 at 12 am.

The coloured bars are like knowledge. All of the “yellow” knowledge solely will get summarized on 2nd Jan. Picture by writer.

It’s because Airflow is initially created as an ETL device. It’s constructed on the concept that knowledge from a time frame will get summarised on the finish of the interval.

For instance, if we needed to know the gross sales of cookies for the first of January, we wouldn’t create a gross sales report on the first of January at 1 pm as a result of the day hasn’t ended but and the gross sales quantity could be incomplete. As a substitute, we’d solely course of the info when the clock strikes 12 midnight. Right this moment, we can be processing yesterday’s knowledge.

Why is that this essential?

Since we’re summarizing the earlier run’s knowledge, the gross sales report we’re producing on the 2nd of Jan describes the first of Jan gross sales, not the 2nd of Jan gross sales.

For that purpose, Airflow finds it extra significant to confer with this run as the first of Jan run regardless that it’s executed on the 2nd. To higher differentiate the dates, Airflow provides a particular identify to the start of a schedule interval—execution_date.

Though we run the “yellow” activity on 2nd Jan, its execution date is definitely 1st Jan. Picture by writer.

Because of this we all the time take the distinction of the “earlier” run once we calculate execution_delta as a result of it’s the delta of the execution_dates, which is basically the “earlier” run.

Exterior Job Sensors are like gatekeepers. They cease unhealthy knowledge from going downstream by ensuring that duties are executed in a selected order and that the required dependencies are met earlier than continuing with subsequent duties.

For individuals who have by no means used Exterior Job Sensors earlier than, I hope the article was in a position to convey its significance and persuade you to start out utilizing them. For individuals who have been utilizing them, I hope a number of the insights listed below are in a position to assist deepen your understanding.

Thanks in your time, and have an amazing day.

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button