달나라 노트

Airflow Operator : PostgresHook (PostgreSQL query 실행) 본문

Airflow/Airflow Operator

Airflow Operator : PostgresHook (PostgreSQL query 실행)

CosmosProject 2025. 10. 18. 13:34
728x90
반응형

 

 

 

Airflow에서 PostgresHook은 PostgreSQL database와 연결하여 PostgreSQL query를 실행하는 등의 작업을 해줍니다.

PostgresOperator와 차이점이 무엇이며, PostgresHook은 어떨 때 쓰면 용이한지 알아봅시다.

 

 

 

PostgresHook은 특히 PythonOperator와 같이 사용하면 용이하므로 아래 예시도 PythonOperator와 같이 사용하는 상황을 가정해보았습니다.

 

from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator

dag = DAG(
    dag_id='test_dag',
    start_date=datetime.datetime(2022, 9, 27),
    schedule_interval='0 20 * * *'
)


def test_function():
    import pandas as pd
    from airflow.providers.postgres.hooks.postgres import PostgresHook  # 1
    
    postgres_hook = PostgresHook('postgres_connection_id')  # 2
    
    query_test = '''  # 3
    select  name
            , id
            , quantity
    from temp_table_1
    ;
    '''
    
    result_first_row = postgres_hook.get_first(sql=query_test)  # 4
    result = postgres_hook.get_records(sql=query_test)  # 5
    
op_test_python = PythonVirtualenvOperator(
    dag=dag,
    task_id='test_operator_pg',
    requirements=[
        'pandas',
    ],
    python_callable=test_function
)

 

# 1

PostgresHook을 import합니다.

PostgresHook의 위치는 airflow 버전 등에 따라 달라질 수 있습니다.

 

# 2

PostgresHook의 사용을 위해 PostgreSQL database와 연결할 connection을 전달하며 postgres_hook class를 만듭니다.

 

# 3

PostgresHook을 이용하여 실행시킬 query를 작성하여 변수에 담아둡니다.

예시에서는 단순한 select 구문만 전달했지만 실제로는 drop, create, insert, delete, update, grant 등 모든 구문을 가진 query를 전달할 수 있습니다.

 

# 4

query를 postgres_hook으로 실행하는데 get_first method를 사용했습니다.

query를 실행하려면 sql이라는 parameter에 실행할 query를 전달해야하고,

get_first method는 query의 실행 결과 중 첫 번째 row 1줄만 가져오라는 의미입니다.

 

# 5

query를 postgres_hook으로 실행하는데 get_records method를 사용했습니다.

query를 실행하려면 sql이라는 parameter에 실행할 query를 전달해야하고,

get_records method는 query의 실행 결과 전부를 가져오라는 의미입니다.

 

 

PostgresOperator는 단순히 어떤 query를 수행하는 것이며, query 결과를 받아와서 원하는 추가적인 작업을 하기에는 어려울 수 있습니다.

그러나 PostgresHook을 이용하면 위처럼 PythonVirtualenvOperator 또는 PythonOperator와 동시에 사용하여 어떤 query를 수행하고 query 결과를 가져와서 어떠한 연산을 할 수 있습니다.

 

 

 

 

 

PostgresHook의 결과로 나오는 데이터는 아래와 같이 생겼습니다.

(단순 예시 데이터이므로 참고만 해주시면 됩니다.)

[
    ('apple', 1596138, 2),
    ('peach', 1235934, 1),
    ('banana', 9763156, 2),
    ('watermelon', 3198462, 10),
    ('grape', 7613512, 3),
]

 

query의 결과는 어떠한 표의 형태일텐데

이 결과의 1개 행이 1개의 tuple에 담겨있으며,

모든 row가 바깥의 list 속에 담겨있는 형태입니다.

 

이러한 형태는 pandas를 이용하여 DataFrame으로도 바꿀 수 있기에 위에서 언급한 추가적인 연산이 쉽게 가능합니다.

 

아래 예시는 PostgresHook의 결과를 DataFrame으로 바꾸는 예시입니다.

 

from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator

dag = DAG(
    dag_id='test_dag',
    start_date=datetime.datetime(2022, 9, 27),
    schedule_interval='0 20 * * *'
)


def test_function():
    import pandas as pd
    from airflow.providers.postgres.hooks.postgres import PostgresHook
    
    postgres_hook = PostgresHook('postgres_connection_id')
    
    query_test = '''
    select  name
            , id
            , quantity
    from temp_table_1
    ;
    '''
    
    result = postgres_hook.get_records(sql=query_test)  # 1
    df_result = pd.DataFrame(  # 2
        result,
        columns=[
            'name',
            'id',
            'quantity',
        ]
    )
    
op_test_python = PythonVirtualenvOperator(
    dag=dag,
    task_id='test_operator_pg',
    requirements=[
        'pandas',
    ],
    python_callable=test_function
)

 

# 1

PostgresHook으로 query를 실행하고 모든 결과를 얻어옵니다.

 

# 2

query 결과를 DataFrame으로 변환합니다.

주의할 점은 PostgresHook의 결과값에는 column 정보가 없으므로 위처럼 column을 이름을 명시해주는게 좋습니다.

 

 

 

 

FYI

https://airflow.apache.org/docs/apache-airflow/1.10.6/_api/airflow/hooks/postgres_hook/index.html

 

airflow.hooks.postgres_hook — Airflow Documentation

 

airflow.apache.org

 

 

 

 

 

 

 

728x90
반응형
Comments