달나라 노트

Airflow Operator : PythonVirtualenvOperator (airflow에서 python code 실행) 본문

Airflow/Airflow Operator

Airflow Operator : PythonVirtualenvOperator (airflow에서 python code 실행)

CosmosProject 2025. 10. 18. 13:20
728x90
반응형



 

Airflow의 PythonVirtualenvOperator는 내가 정의해둔 Python 함수를 실행해주는 역할을 하는 operator입니다.

일반 PythonOperator와 다른 점은 가상 환경에서 Python 함수를 실행할 수 있다는 것이며 따라서 어떤 특정 version의 library를 사용하고 싶을 때 유용합니다.

 

예시를 봅시다.

 

from airflow import DAG

dag = DAG(
    dag_id='test_dag',
    start_date=datetime.datetime(2022, 9, 27),
    schedule_interval='0 20 * * *'
)


from airflow.operators.python import PythonVirtualenvOperator  # 1

def test_function():  # 2
    import pandas as pd
    import openpyxl
    import os
    import sys
    import datetime
    
    print('this is test function')
    
    # write your code

op_test_python = PythonVirtualenvOperator(  # 3
    dag=dag,  # 4
    task_id='test_operator_pg',  # 5
    requirements=[  # 6
        'pandas',
        'numpy==1.0.5',
        'openpyxl==2.9.18',
    ],
    python_callable=test_function  # 7
)

 

 

# 1

PythonVirtualenvOperator를 사용하기 위해선 import를 먼저 해야합니다.

PythonVirtualenvOperator의 위치는 airflow 버전 등에 따라 달라질 수 있습니다.

 

# 2

PythonVirtualenvOperator에서 사용할 함수를 정의합니다.

PythonVirtualenvOperator가 작동하는 원리는 Airflow DAG 내에서 별도로 정의된 어떤 함수를 실행해주는 것입니다.

위 예시에서는 test_function()이라는 함수를 만들어놨고 이 함수 안에 실행하고 싶은 Python code를 담아두면 되는 것입니다.

위 예시의 test_function() 함수 내에서 볼 수 있듯이 함수 내에서는 일반 Python code처럼 library를 import하는 것도 가능하며 일반 Python 함수와 동일하게 사용할 수 있습니다.

 

# 3

PythonVirtualenvOperator를 호출합니다.

 

# 4

Operator가 어느 dag에 속하는지 dag 정보를 전달합니다.

 

# 5

Operator의 task_id입니다. Operator의 이름입니다.

 

# 6

PythonVirtualenvOperator에서 사용할 다양할 library의 version을 명시합니다.

이 부분이 PythonVirtualenvOperator의 핵심입니다.

프로그램을 만들다보면 다양한 기능들의 충돌이 있을 수 있고 이를 막기 위해선 알맞은 version의 library를 사용해야할 때가 존재합니다.

이때 requirements에 특정 version을 원하는 library를 명시하여 내가 원하는 version의 library를 사용할 수 있습니다.

(단, requirements에 명시되지 않은 library라고 해서 사용이 불가한 것은 아닙니다.)

 

# 7

PythonOperator가 실행할 함수 이름입니다.

위 예시에서는 test_function()이라는 함수를 실행시킬 것이므로 test_function을 전달하였습니다.

(주의할 점은 함수를 명시할 때 괄호는 빼고 test_function만 적어주어야 합니다. test_function()이라고 전달하면 안됩니다.)

 

 

 

 

 

FYI

https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/python.html

 

PythonOperator — apache-airflow-providers-standard Documentation

 

airflow.apache.org

 

 

 

 

728x90
반응형
Comments