달나라 노트

Airflow Operator : PythonOperator (airflow에서 python code 실행) 본문

Airflow/Airflow Operator

Airflow Operator : PythonOperator (airflow에서 python code 실행)

CosmosProject 2025. 10. 18. 13:06
728x90
반응형

 

 

 

 

Airflow의 PythonOperator는 내가 정의해둔 Python 함수를 실행해주는 역할을 하는 operator입니다.

 

예시를 봅시다.

 

from airflow import DAG

dag = DAG(
    dag_id='test_dag',
    start_date=datetime.datetime(2022, 9, 27),
    schedule_interval='0 20 * * *'
)


from airflow.operators.python_operator import PythonOperator  # 1

def test_function(**kwargs):  # 2
    import pandas as pd
    import openpyxl
    import os
    import sys
    import datetime
    
    print('this is test function')
    
    param_1 = kwargs['test_param_1']
    param_2 = kwargs['test_param_2']
    
    # write your code

op_test_python = PythonOperator(  # 3
    dag=dag,  # 4
    task_id='test_operator_pg',  # 5
    python_callable=test_function  # 6
    op_kwargs={  # 7
        'test_param_1': 'param1',
        'test_param_2': 'param2',
    }
)

 

 

# 1

PythonOperator를 사용하기 위해선 import를 먼저 해야합니다.

PythonOperator의 위치는 airflow 버전 등에 따라 달라질 수 있습니다.

 

# 2

PythonOperator에서 사용할 함수를 정의합니다.

PythonOperator가 작동하는 원리는 Airflow DAG 내에서 별도로 정의된 어떤 함수를 실행해주는 것입니다.

위 예시에서는 test_function()이라는 함수를 만들어놨고 이 함수 안에 실행하고 싶은 Python code를 담아두면 되는 것입니다.

위 예시의 test_function() 함수 내에서 볼 수 있듯이 함수 내에서는 일반 Python code처럼 library를 import하는 것도 가능하며 일반 Python 함수와 동일하게 사용할 수 있습니다.

(**kwargs에 대해선 7번에서 설명합니다.)

 

# 3

PythonOperator를 호출합니다.

 

# 4

Operator가 어느 dag에 속하는지 dag 정보를 전달합니다.

 

# 5

Operator의 task_id입니다. Operator의 이름입니다.

 

# 5

PostgreSQL database와 연결할 때 사용하는 connection id입니다.

이 또한 설정된 database에 따라 다를 수 있습니다.

 

# 6

PythonOperator가 실행할 함수 이름입니다.

위 예시에서는 test_function()이라는 함수를 실행시킬 것이므로 test_function을 전달하였습니다.

(주의할 점은 함수를 명시할 때 괄호는 빼고 test_function만 적어주어야 합니다. test_function()이라고 전달하면 안됩니다.)

 

# 7

필요 시 parameter를 전달할 수 있습니다.

parameter는 kwargs의 형태로 전달할 수 있으며 그냥 간단하게 parameter로 dictionary를 전달할 수 있으며 이 dictionary 안에 원하는 정보를 명시해서 전달해주면 됩니다.

만약 parameter를 전달할 필요가 없다면 그냥 None으로 명시하면 됩니다.

이렇게 전달할 parameter가 없는 경우에는 test_function(**kwargs)라고 정의된 부분도 그냥 test_function()이라고 바꿔 써도 됩니다.

 

 

 

FYI

https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/python.html

 

PythonOperator — apache-airflow-providers-standard Documentation

 

airflow.apache.org

 

 

 

 

 

 

728x90
반응형
Comments