Data Engineering 101 – Getting Started with Python Operator in Apache Airflow

Lakshay Arora 13 Dec, 2023 • 6 min read

Overview

  • We understand Python Operator in Apache Airflow with an example
  • We will also discuss the concept of Variables in Apache Airflow

Introduction

Apache Airflow is a must-have tool for Data Engineers. It makes it easier to create and monitor all your workflows. When you have multiple workflows, there are higher chances that you might be using the same databases and same file paths for multiple workflows. Using variables is one of the most efficient ways to define such shared information among different workflows.

We will cover the concept of variables in this article and an example of a Python Operator in Apache Airflow.

Apache Airflow

This article is in continuation of the Data Engineering 101 – Getting Started with Apache Airflow where we covered the features and components of airflow databases, installation steps, and created a basic DAG. So if you are a complete beginner in Apache Airflow, I would recommend you go through that article first.

What is Apache Airflow?

Apache Airflow is a workflow engine that will easily schedule and run your complex data pipelines. It will make sure that each task of your data pipeline will get executed in the correct order and each task gets the required resources.

It will provide you an amazing user interface to monitor and fix any issues that may arise.

Apache Airflow

Start the Airflow

We have already discussed the installation steps in the previous article of this series.

To start the airflow server, open the terminal and run the following command. The default port is 8080 and if you are using that port for something else then you can change it.

airflow webserver -p 8080

Now, start the airflow scheduler using the following command in a different terminal. It will monitor all your workflows and triggers them as you have assigned them.

airflow scheduler

Now, make sure that you have a folder name dags in the airflow directory where you will define your DAGS and open the web browser and go open: http://localhost:8080/admin/ and you will see something like this:

airflow dashboard

Python Operator in Apache Airflow

An operator describes a single task of the workflow and Operators provide us, different operators, for many different tasks for example BashOperator, PythonOperator, EmailOperator, MySqlOperator, etc. In the last article, we learned how to use the BashOperator to get the live cricket scores and in this, we will see how to use the PythonOperator.

Let’s have a look at the following example:

  1. Importing the Libraries

    Let’s start with importing the libraries that we need. We will use the PythonOperator this time.

  2. Defining DAG Arguments

    For each of the DAG, we need to pass one argument dictionary. Here is the description of some of the arguments that you can pass:

    • owner: The name of the owner of the workflow, should be alphanumeric and can have underscores but should not contain any spaces.
    • depends_on_past: If each time you run your workflow, the data depends upon the past run then mark it as True otherwise mark it as False.
    • start_date: Start date of your workflow
    • email: Your email ID, so that you can receive an email whenever any task fails due to any reason.
    • retry_delay: If any task fails, then how much time it should wait to retry it.
  3. Defining the Python Function

    Now, we will define the python function that will print a string using an argument and this function will later be used by the PythonOperator.

  4. Defining DAG

    Now, we will create a DAG object and pass the dag_id which is the name of the DAG and make sure you have not created any DAG with this name before. Pass the arguments that we defined earlier and add a description and schedule_interval which will run the DAG after the specified interval of time

  5. Defining the Task

    We just have one task for our workflow:

    print: In the task, we will print the “Apache Airflow is a must-have tool for Data Engineers” on the terminal using the python function.

    We will pass the task_id to the PythonOperator object.  You will see this name on the nodes of Graph View of your DAG. Pass the python function name to the argument “python_callable” that you want to run and the arguments that you function is using to the parameter “op_kwargs” as a dictionary and finally, the DAG object to which you want to link this task.

  6. Run the DAG

    Now, when you refresh your Airflow dashboard, you will see your new DAG in the list.

    Click on the DAG and open the graph view and you will see something like this. Each of the steps in the workflow will be in a separate box. In this workflow, we only have one step that is print. Run the workflow and wait until its border turns dark green which is an indication that it is completed successfully. Python Operator airflow

    Click on the node “print” to get more details about this step and then click on Logs and you will see the output like this.

What are Variables in Apache Airflow?

We know that Airflow can be used to create and manage complex workflows. We can run multiple workflows at the same time. There is a possibility that most of your workflows are using the same database or same file path. Now, if make any changes like changing the path of the directory where use save files or change the configuration of the databases. In that case, you don’t want to go and update each of the DAGS separately.

Airflow provides a solution for this, you can create variables where you can store and retrieve data at runtime in the multiple DAGS. So if any major changes occur, you can just edit your variable and your workflows are good to go.

How to create Variables?

Open the Airflow dashboard and click on the Admin from the top menu and then click on Variables.

Variables Dashboard, Apache airflow

Now, click on Create to create a new variable and a window will open like this. Add the key and value and submit it. Here, I am creating a variable with the key name as data_path and value as the path of any random text file.

python operator airflow data path

Now, we will create a DAG where we will find out the word count of the text data present in this file. When you want to use the variables, you need to import it. Let’s see how to do this:

Then, we will define the function that will use the path from the variable, read it, and calculate the word count.

The rest of the steps are the same as we did earlier, you need to define the DAG and tasks and your workflow is ready to run.

You can see the results in the log and now if you can use this variable in any other DAG and also you can edit it whenever you want and all your DAGS get updated.

Python operator airflow- DAG update

Conclusion

In this article, we understood how to use the Python Operator in Apache Airflow, concepts like branching and variables, and how to create them. In the upcoming article, we will create a machine learning project and automate its workflow using Apache Airflow.

Frequently Asked Questions

Q1.What is the difference between Apache and Airflow?

Apache is like a big umbrella for many open-source projects. Airflow is a specific tool under this umbrella that helps manage and automate workflows.

Q2.Is Apache Airflow an ETL tool?

Yes, Apache Airflow is a tool that’s good at helping with ETL, which stands for Extract, Transform, and Load. It makes moving and transforming data between systems more accessible.

Q3.What are the two types of Airflow?

There are two main types of Airflow setups:
LocalExecutor: Works on one machine.
CeleryExecutor: Spreads tasks across multiple machines, making it suitable for handling large amounts of work.

I recommend you go through the following data engineering resources to enhance your knowledge-

If you have any questions related to this article do let me know in the comments section below.

Lakshay Arora 13 Dec 2023

Frequently Asked Questions

Lorem ipsum dolor sit amet, consectetur adipiscing elit,

Responses From Readers

Clear

Manoj
Manoj 05 Jan, 2021

Thanks! This was really useful :)