| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | 7 |
| 8 | 9 | 10 | 11 | 12 | 13 | 14 |
| 15 | 16 | 17 | 18 | 19 | 20 | 21 |
| 22 | 23 | 24 | 25 | 26 | 27 | 28 |
- PostgreSQL
- math
- c#
- list
- array
- numpy
- Presto
- Kotlin
- Github
- django
- Python
- PANDAS
- Apache
- SQL
- string
- 파이썬
- hive
- Tkinter
- matplotlib
- GIT
- Excel
- Google Excel
- Google Spreadsheet
- Redshift
- PySpark
- Java
- google apps script
- gas
- dataframe
- Today
- Total
달나라 노트
Airflow Operator : PythonVirtualenvOperator (airflow에서 python code 실행) 본문
Airflow Operator : PythonVirtualenvOperator (airflow에서 python code 실행)
CosmosProject 2025. 10. 18. 13:20
Airflow의 PythonVirtualenvOperator는 내가 정의해둔 Python 함수를 실행해주는 역할을 하는 operator입니다.
일반 PythonOperator와 다른 점은 가상 환경에서 Python 함수를 실행할 수 있다는 것이며 따라서 어떤 특정 version의 library를 사용하고 싶을 때 유용합니다.
예시를 봅시다.
from airflow import DAG
dag = DAG(
dag_id='test_dag',
start_date=datetime.datetime(2022, 9, 27),
schedule_interval='0 20 * * *'
)
from airflow.operators.python import PythonVirtualenvOperator # 1
def test_function(): # 2
import pandas as pd
import openpyxl
import os
import sys
import datetime
print('this is test function')
# write your code
op_test_python = PythonVirtualenvOperator( # 3
dag=dag, # 4
task_id='test_operator_pg', # 5
requirements=[ # 6
'pandas',
'numpy==1.0.5',
'openpyxl==2.9.18',
],
python_callable=test_function # 7
)
# 1
PythonVirtualenvOperator를 사용하기 위해선 import를 먼저 해야합니다.
PythonVirtualenvOperator의 위치는 airflow 버전 등에 따라 달라질 수 있습니다.
# 2
PythonVirtualenvOperator에서 사용할 함수를 정의합니다.
PythonVirtualenvOperator가 작동하는 원리는 Airflow DAG 내에서 별도로 정의된 어떤 함수를 실행해주는 것입니다.
위 예시에서는 test_function()이라는 함수를 만들어놨고 이 함수 안에 실행하고 싶은 Python code를 담아두면 되는 것입니다.
위 예시의 test_function() 함수 내에서 볼 수 있듯이 함수 내에서는 일반 Python code처럼 library를 import하는 것도 가능하며 일반 Python 함수와 동일하게 사용할 수 있습니다.
# 3
PythonVirtualenvOperator를 호출합니다.
# 4
Operator가 어느 dag에 속하는지 dag 정보를 전달합니다.
# 5
Operator의 task_id입니다. Operator의 이름입니다.
# 6
PythonVirtualenvOperator에서 사용할 다양할 library의 version을 명시합니다.
이 부분이 PythonVirtualenvOperator의 핵심입니다.
프로그램을 만들다보면 다양한 기능들의 충돌이 있을 수 있고 이를 막기 위해선 알맞은 version의 library를 사용해야할 때가 존재합니다.
이때 requirements에 특정 version을 원하는 library를 명시하여 내가 원하는 version의 library를 사용할 수 있습니다.
(단, requirements에 명시되지 않은 library라고 해서 사용이 불가한 것은 아닙니다.)
# 7
PythonOperator가 실행할 함수 이름입니다.
위 예시에서는 test_function()이라는 함수를 실행시킬 것이므로 test_function을 전달하였습니다.
(주의할 점은 함수를 명시할 때 괄호는 빼고 test_function만 적어주어야 합니다. test_function()이라고 전달하면 안됩니다.)
FYI
https://airflow.apache.org/docs/apache-airflow-providers-standard/stable/operators/python.html
PythonOperator — apache-airflow-providers-standard Documentation
airflow.apache.org
'Airflow > Airflow Operator' 카테고리의 다른 글
| Airflow Operator : PrestoHook (Airflow에서 Presto query 실행) (0) | 2025.10.18 |
|---|---|
| Airflow Operator : HiveOperator (Airflow에서 Hive query 실행) (0) | 2025.10.18 |
| Airflow Operator : PostgresHook (PostgreSQL query 실행) (0) | 2025.10.18 |
| Airflow Operator : PythonOperator (airflow에서 python code 실행) (0) | 2025.10.18 |
| Airflow Operator : PostgresOperator (PostgreSQL query 실행) (0) | 2025.10.18 |