| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | 7 |
| 8 | 9 | 10 | 11 | 12 | 13 | 14 |
| 15 | 16 | 17 | 18 | 19 | 20 | 21 |
| 22 | 23 | 24 | 25 | 26 | 27 | 28 |
- Excel
- string
- matplotlib
- PANDAS
- Github
- PySpark
- Presto
- Tkinter
- list
- dataframe
- numpy
- hive
- math
- Apache
- 파이썬
- Kotlin
- django
- array
- PostgreSQL
- GIT
- Python
- Java
- gas
- Redshift
- Google Excel
- Google Spreadsheet
- SQL
- c#
- google apps script
- Today
- Total
달나라 노트
Airflow Operator : PythonOperator (airflow에서 python code 실행) 본문
Airflow Operator : PythonOperator (airflow에서 python code 실행)
CosmosProject 2025. 10. 18. 13:06
Airflow의 PythonOperator는 내가 정의해둔 Python 함수를 실행해주는 역할을 하는 operator입니다.
예시를 봅시다.
from airflow import DAG
dag = DAG(
dag_id='test_dag',
start_date=datetime.datetime(2022, 9, 27),
schedule_interval='0 20 * * *'
)
from airflow.operators.python_operator import PythonOperator # 1
def test_function(**kwargs): # 2
import pandas as pd
import openpyxl
import os
import sys
import datetime
print('this is test function')
param_1 = kwargs['test_param_1']
param_2 = kwargs['test_param_2']
# write your code
op_test_python = PythonOperator( # 3
dag=dag, # 4
task_id='test_operator_pg', # 5
python_callable=test_function # 6
op_kwargs={ # 7
'test_param_1': 'param1',
'test_param_2': 'param2',
}
)
# 1
PythonOperator를 사용하기 위해선 import를 먼저 해야합니다.
PythonOperator의 위치는 airflow 버전 등에 따라 달라질 수 있습니다.
# 2
PythonOperator에서 사용할 함수를 정의합니다.
PythonOperator가 작동하는 원리는 Airflow DAG 내에서 별도로 정의된 어떤 함수를 실행해주는 것입니다.
위 예시에서는 test_function()이라는 함수를 만들어놨고 이 함수 안에 실행하고 싶은 Python code를 담아두면 되는 것입니다.
위 예시의 test_function() 함수 내에서 볼 수 있듯이 함수 내에서는 일반 Python code처럼 library를 import하는 것도 가능하며 일반 Python 함수와 동일하게 사용할 수 있습니다.
(**kwargs에 대해선 7번에서 설명합니다.)
# 3
PythonOperator를 호출합니다.
# 4
Operator가 어느 dag에 속하는지 dag 정보를 전달합니다.
# 5
Operator의 task_id입니다. Operator의 이름입니다.
# 5
PostgreSQL database와 연결할 때 사용하는 connection id입니다.
이 또한 설정된 database에 따라 다를 수 있습니다.
# 6
PythonOperator가 실행할 함수 이름입니다.
위 예시에서는 test_function()이라는 함수를 실행시킬 것이므로 test_function을 전달하였습니다.
(주의할 점은 함수를 명시할 때 괄호는 빼고 test_function만 적어주어야 합니다. test_function()이라고 전달하면 안됩니다.)
# 7
필요 시 parameter를 전달할 수 있습니다.
parameter는 kwargs의 형태로 전달할 수 있으며 그냥 간단하게 parameter로 dictionary를 전달할 수 있으며 이 dictionary 안에 원하는 정보를 명시해서 전달해주면 됩니다.
만약 parameter를 전달할 필요가 없다면 그냥 None으로 명시하면 됩니다.
이렇게 전달할 parameter가 없는 경우에는 test_function(**kwargs)라고 정의된 부분도 그냥 test_function()이라고 바꿔 써도 됩니다.
FYI
https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/python.html
PythonOperator — apache-airflow-providers-standard Documentation
airflow.apache.org
'Airflow > Airflow Operator' 카테고리의 다른 글
| Airflow Operator : PrestoHook (Airflow에서 Presto query 실행) (0) | 2025.10.18 |
|---|---|
| Airflow Operator : HiveOperator (Airflow에서 Hive query 실행) (0) | 2025.10.18 |
| Airflow Operator : PostgresHook (PostgreSQL query 실행) (0) | 2025.10.18 |
| Airflow Operator : PythonVirtualenvOperator (airflow에서 python code 실행) (0) | 2025.10.18 |
| Airflow Operator : PostgresOperator (PostgreSQL query 실행) (0) | 2025.10.18 |