Google Cloud Composer
One well-liked solution for coordinating data operations is Apache Airflow. Authoring, scheduling, and monitoring pipelines is made possible with Google Cloud’s fully managed workflow orchestration solution, Google Cloud Composer, which is based on Apache Airflow.
Apache Airflow DAG
The subtleties of DAG (Directed Acyclic Graph) and task concurrency can be frightening, despite Airflow’s widespread use and ease of use. This is because an Airflow installation involves several different components and configuration settings. Your data pipelines’ fault-tolerance, scalability, and resource utilisation are all improved by comprehending and putting concurrent methods into practice. The goal of this guide is to cover all the ground on Airflow concurrency at four different levels:
- The Composer Environment
- Installation of Airflow
- DAG
- Task
You can see which parameters need to be changed to make sure your Airflow tasks function exactly as you intended by referring to the visualisations in each section. Now let’s get going!
The environment level parameters for Cloud Composer 2
This represents the complete Google Cloud service. The managed infrastructure needed to run Airflow is entirely included, and it also integrates with other Google Cloud services like Cloud Monitoring and Cloud Logging. The DAGs, Tasks, and Airflow installation will inherit the configurations at this level.
Minimum and maximum number of workers
You will define the minimum and maximum numbers of Airflow Workers as well as the Worker size (processor, memory, and storage) while creating a Google Cloud Composer environment. The value of worker_concurrency by default will be set by these configurations.
Concurrency among workers
Usually, a worker with one CPU can manage twelve tasks at once. The default worker concurrency value on Cloud Composer 2 is equivalent to:
- A minimal value out of 32, 12 * worker_CPU and 8 * worker_memory in Airflow 2.3.3 and later versions.
- Versions of Airflow prior to 2.3.3 have 12 * worker_CPU.
For example:
Small Composer environment:
- worker_cpu = 0.5
- worker_mem = 2
- worker_concurrency = min(32, 12*0.5cpu, 8*2gb) = 6
Medium Composer environment:
- worker_cpu = 2
- worker_mem = 7.5
- worker_concurrency = min(32, 12*2cpu, 8*7.5gb) = 24
Large Composer environment:
- worker_cpu = 4
- worker_mem = 15
- worker_concurrency = min(32, 12*4cpu, 8*15gb) = 32
Autoscaling of workers
Two options are related to concurrency performance and the autoscaling capabilities of your environment:
- The bare minimum of Airflow employees
- The parameter [celery]worker_concurrency
In order to take up any waiting tasks, Google Cloud Composer keeps an eye on the task queue and creates more workers. When [celery]worker_concurrency is set to a high value, each worker can accept a large number of tasks; hence, in some cases, the queue may never fill and autoscaling may never occur.
Each worker would pick up 100 tasks, for instance, in a Google Cloud Composer setup with two Airflow workers, [celery]worker_concurrency set to 100, and 200 tasks in the queue. This doesn’t start autoscaling and leaves the queue empty. Results may be delayed if certain jobs take a long time to finish since other tasks may have to wait for available worker slots.
Taking a different approach, we can see that Composer’s scaling is based on adding up all of the Queued Tasks and Running Tasks, dividing that total by [celery]worker_concurrency, and then ceiling()ing the result. The target number of workers is ceiling((11+8)/6) = 4 if there are 11 tasks in the Running state and 8 tasks in the Queued state with [celery]worker_concurrency set to 6. The composer aims to reduce the workforce to four employees.
Airflow installation level settings
This is the Google Cloud Composer-managed Airflow installation. It consists of every Airflow component, including the workers, web server, scheduler, DAG processor, and metadata database. If they are not already configured, this level will inherit the Composer level configurations.
Worker concurrency ([celery]): For most use scenarios, Google Cloud Composer‘s default defaults are ideal, but you may want to make unique tweaks based on your environment.
core.parallelism: the most jobs that can be executed simultaneously within an Airflow installation. Infinite parallelism=0 is indicated.
Maximum number of active DAG runs per DAG is indicated by core.max_active_runs_per_dag.
Maximum number of active DAG tasks per DAG is indicated by core.max_active_tasks_per_dag.
Queues
It is possible to specify which Celery queues jobs are sent to when utilising the CeleryExecutor. Since queue is a BaseOperator attribute, any job can be assigned to any queue. The celery -> default_queue section of the airflow.cfg defines the environment’s default queue. This specifies which queue Airflow employees listen to when they start as well as the queue to which jobs are assigned in the absence of a specification.
Airflow Pools
It is possible to restrict the amount of simultaneous execution on any given collection of tasks by using airflow pools. Using the UI (Menu -> Admin -> Pools), you can manage the list of pools by giving each one a name and a number of worker slots. Additionally, you may choose there whether the pool’s computation of occupied slots should take postponed tasks into account.
Configuring the DAG level
The fundamental idea behind Airflow is a DAG, which groups tasks together and arranges them according to linkages and dependencies to specify how they should operate.
max_active_runs: the DAG’s maximum number of active runs. Once this limit is reached, the scheduler will stop creating new active DAG runs. backs to the core.If not configured, max_active_runs_per_dag
max_active_tasks: This is the maximum number of task instances that are permitted to run concurrently throughout all active DAG runs. The value of the environment-level option max_active_tasks_per_dag is assumed if this variable is not defined.
Configuring the task level
Concerning Airflow Tasks
A Task Instance may be in any of the following states:
- none: Because its dependencies have not yet been satisfied, the task has not yet been queued for execution.
- scheduled: The task should proceed because the scheduler has concluded that its dependencies are satisfied.
- queued: An Executor has been given the task, and it is awaiting a worker.
- running: A worker (or a local/synchronous executor) is performing the task.
- success: There were no mistakes in the task’s completion.
- restarting: While the job was operating, an external request was made for it to restart.
- failed: A task-related fault prevented it from completing.
- skipped: Branching, LatestOnly, or a similar reason led to the job being skipped.
- upstream_failed: The Trigger Rule indicates that we needed it, but an upstream task failed.
- up_for_retry: The job failed, but there are still retries available, and a new date will be set.
- up_for_reschedule: A sensor that is in reschedule mode is the task.
- deferred: A trigger has been assigned to complete the task.
- removed: Since the run began, the task has disappeared from the DAG.
A task should ideally go from being unplanned to being scheduled, queued, running, and ultimately successful. Unless otherwise indicated, tasks will inherit concurrency configurations established at the DAG or Airflow level. Configurations particular to a task comprise:
Pool: the area where the task will be carried out. Pools can be used to restrict the amount of work that can be done in parallel.
The maximum number of concurrently executing task instances across dag_runs per task is controlled by max_active_tis_per_dag.
Deferrable Triggers and Operators
Even when they are idle, Standard Operators and Sensors occupy a full worker slot. For instance, if you have 100 worker slots available for Task execution and 100 DAGs are waiting on an idle but running Sensor, you will not be able to run any other tasks, even though your entire Airflow cluster is effectively idle.
Deferrable operators can help in this situation.
When an operator is constructed to be deferrable, it can recognise when it needs to wait, suspend itself, free up the worker, and assign the task of resuming to something known as a trigger. Because of this, it is not consuming a worker slot while it is suspended (delayed), which means that your cluster will use a lot fewer resources on inactive Operators and Sensors. It should be noted that delayed tasks do not automatically eat up pool slots; however, you can modify the pool in question to make them do so if desired.
Triggers are short, asynchronous Python code segments that are intended to execute concurrently within a single Python session; their asynchrony allows them to coexist effectively. Here’s a rundown of how this procedure operates:
When a task instance, also known as a running operator, reaches a waiting point, it defers itself using a trigger connected to the event that should resume it. This allows the employee to focus on other tasks.
A triggerer process detects and registers the new Trigger instance within Airflow.
The source task of the trigger gets rescheduled after it fires. The trigger is triggered.
The task is queued by the scheduler to be completed on a worker node.
Sensor Modes
Sensors can operate in two separate modes as they are mostly idle, which allows you to use them more effectively:
Poke (default): Throughout its whole duration, the Sensor occupies a worker slot.
Reschedule: The Sensor sleeps for a predetermined amount of time in between checks, only using a worker slot when it is checking. Part of problem is resolved when Sensors are run in rescheduled mode, which restricts their operation to predetermined intervals. However, this mode is rigid and only permits the use of time as justification for resuming operations.
As an alternative, some sensors let you specify deferrable=True, which transfers tasks to a different Triggerer component and enhances resource efficiency even more.
Distinction between deferrable=True and mode=’reschedule’ in sensors
Sensors in Airflow wait for certain requirements to be satisfied before moving on to downstream operations. When it comes to controlling idle times, sensors have two options: mode=’reschedule’ and deferrable=True. If the condition is not met, the sensor can reschedule itself thanks to the mode=’reschedule’ parameter specific to the BaseSensorOperator in Airflow. In contrast, deferrable=True is a convention used by some operators to indicate that the task can be retried (or deferred), but it is not a built-in parameter or mode in the Airflow. The individual operator implementation may cause variations in the behaviour of the task’s retry.
mode=’reschedule’ | deferrable=True |
Continuously reschedules itself until condition is met | Pauses execution when idle, resumes when condition changes |
Resource Usage is Higher (repeated execution) | Resource Usage is Lower (pauses when idle, frees up worker slots) |
Conditions expected to change over time (e.g. file creation) | Waiting for external events or resources (e.g. API response) |
Built-in functionality for rescheduling | Requires custom logic to defer task and handle external changes |