| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | 7 |
| 8 | 9 | 10 | 11 | 12 | 13 | 14 |
| 15 | 16 | 17 | 18 | 19 | 20 | 21 |
| 22 | 23 | 24 | 25 | 26 | 27 | 28 |
- string
- Redshift
- PANDAS
- c#
- GIT
- Tkinter
- matplotlib
- Java
- Python
- gas
- PySpark
- django
- Excel
- SQL
- hive
- list
- Github
- numpy
- math
- Kotlin
- Apache
- array
- PostgreSQL
- Presto
- 파이썬
- Google Spreadsheet
- google apps script
- dataframe
- Google Excel
- Today
- Total
달나라 노트
Airflow Operator : PostgresOperator (PostgreSQL query 실행) 본문
Airflow Operator : PostgresOperator (PostgreSQL query 실행)
CosmosProject 2025. 10. 18. 12:47
Airflow의 PostgresOperator는 PostgreSQL database와 communication하는 것을 가능하게 해주는 operator입니다.
간단하게 말하면 PostgreSQL database에 query를 날려서 query를 수행시킬 수 있다는 것인데, AWS의 Redshift 등이 대표적입니다.
예시를 봅시다.
from airflow import DAG
dag = DAG(
dag_id='test_dag',
start_date=datetime.datetime(2022, 9, 27),
schedule_interval='0 20 * * *'
)
from airflow.providers.postgres.operators.postgres import PostgresOperator # 1
op_test_pg = PostgresOperator( # 2
dag=dag, # 3
task_id='test_operator_pg', # 4
postgres_conn_id='connection_id_to_postgresql_database', # 5
sql=''' # 6
select *
from test_table
;
''',
autocommit=True, # 7
params=None # 8
)
# 1
PostgresOperator를 사용하기 위해선 import를 먼저 해야합니다.
PostgresOperator의 위치는 airflow 버전 등에 따라 달라질 수 있습니다.
# 2
PostgresOperator를 호출합니다.
# 3
Operator가 어느 dag에 속하는지 dag 정보를 전달합니다.
# 4
PostgresOperator의 task_id입니다. Operator의 이름입니다.
# 5
PostgreSQL database와 연결할 때 사용하는 connection id입니다.
이 또한 설정된 database에 따라 다릅니다.
# 6
PostgreSQL database에 전달할 SQL query입니다.
예시에서는 단순한 select 구문만 전달했지만 실제로는 drop, create, insert, delete, update, grant 등 모든 구문을 가진 query를 전달할 수 있습니다.
# 7
query 실행 후 autocommit을 할건지 아닌지를 결정합니다.
# 8
query에 전달할 parameter를 작성하는 부분입니다.
전달할 parameter가 없으면 None으로 적습니다.
from airflow import DAG
dag = DAG(
dag_id='test_dag',
start_date=datetime.datetime(2022, 9, 27),
schedule_interval='0 20 * * *'
)
from airflow.providers.postgres.operators.postgres import PostgresOperator # 1
op_test_pg = PostgresOperator( # 2
dag=dag, # 3
task_id='test_operator_pg', # 4
postgres_conn_id='connection_id_to_postgresql_database', # 5
sql=''' # 6
select *
from test_table
--
where 1=1
and dt between %(start_date)s AND %(end_date)s
;
''',
autocommit=True, # 7
params={ # 8
'start_date': '20251201',
'end_date': '20251225',
}
)
위 예시는 parameter를 전달하는 예시입니다.
# 8
params에 dictionary의 형태로 명시해주면 됩니다.
FYI
How-to Guide for PostgresOperator — apache-airflow-providers-postgres Documentation
airflow.apache.org
'Airflow > Airflow Operator' 카테고리의 다른 글
| Airflow Operator : PrestoHook (Airflow에서 Presto query 실행) (0) | 2025.10.18 |
|---|---|
| Airflow Operator : HiveOperator (Airflow에서 Hive query 실행) (0) | 2025.10.18 |
| Airflow Operator : PostgresHook (PostgreSQL query 실행) (0) | 2025.10.18 |
| Airflow Operator : PythonVirtualenvOperator (airflow에서 python code 실행) (0) | 2025.10.18 |
| Airflow Operator : PythonOperator (airflow에서 python code 실행) (0) | 2025.10.18 |