달나라 노트

Airflow Operator : PostgresOperator (PostgreSQL query 실행) 본문

Airflow/Airflow Operator

Airflow Operator : PostgresOperator (PostgreSQL query 실행)

CosmosProject 2025. 10. 18. 12:47
728x90
반응형

 

 

 

Airflow의 PostgresOperator는 PostgreSQL database와 communication하는 것을 가능하게 해주는 operator입니다.

간단하게 말하면 PostgreSQL database에 query를 날려서 query를 수행시킬 수 있다는 것인데, AWS의 Redshift 등이 대표적입니다.

 

예시를 봅시다.

 

from airflow import DAG

dag = DAG(
    dag_id='test_dag',
    start_date=datetime.datetime(2022, 9, 27),
    schedule_interval='0 20 * * *'
)


from airflow.providers.postgres.operators.postgres import PostgresOperator  # 1

op_test_pg = PostgresOperator(  # 2
    dag=dag,  # 3
    task_id='test_operator_pg',  # 4
    postgres_conn_id='connection_id_to_postgresql_database',  # 5
    sql='''  # 6
    select  *
    from test_table
    ;
    ''',
    autocommit=True,  # 7
    params=None  # 8
)

 

 

# 1

PostgresOperator를 사용하기 위해선 import를 먼저 해야합니다.

PostgresOperator의 위치는 airflow 버전 등에 따라 달라질 수 있습니다.

 

# 2

PostgresOperator를 호출합니다.

 

# 3

Operator가 어느 dag에 속하는지 dag 정보를 전달합니다.

 

# 4

PostgresOperator의 task_id입니다. Operator의 이름입니다.

 

# 5

PostgreSQL database와 연결할 때 사용하는 connection id입니다.

이 또한 설정된 database에 따라 다릅니다.

 

# 6

PostgreSQL database에 전달할 SQL query입니다.

예시에서는 단순한 select 구문만 전달했지만 실제로는 drop, create, insert, delete, update, grant 등 모든 구문을 가진 query를 전달할 수 있습니다.

 

# 7

query 실행 후 autocommit을 할건지 아닌지를 결정합니다.

 

# 8

query에 전달할 parameter를 작성하는 부분입니다.

전달할 parameter가 없으면 None으로 적습니다.

 

 

 

 

from airflow import DAG

dag = DAG(
    dag_id='test_dag',
    start_date=datetime.datetime(2022, 9, 27),
    schedule_interval='0 20 * * *'
)


from airflow.providers.postgres.operators.postgres import PostgresOperator  # 1

op_test_pg = PostgresOperator(  # 2
    dag=dag,  # 3
    task_id='test_operator_pg',  # 4
    postgres_conn_id='connection_id_to_postgresql_database',  # 5
    sql='''  # 6
    select  *
    from test_table
    --
    where 1=1
    and dt between %(start_date)s AND %(end_date)s
    ;
    ''',
    autocommit=True,  # 7
    params={  # 8
        'start_date': '20251201',
        'end_date': '20251225',
    }
)

 

위 예시는 parameter를 전달하는 예시입니다.

 

# 8

params에 dictionary의 형태로 명시해주면 됩니다.

 

 

 

 

FYI

https://airflow.apache.org/docs/apache-airflow-providers-postgres/5.2.2/operators/postgres_operator_howto_guide.html

 

How-to Guide for PostgresOperator — apache-airflow-providers-postgres Documentation

 

airflow.apache.org

 

 

 

 

 

 

 

728x90
반응형
Comments