달나라 노트

Airflow Operator : PrestoHook (Airflow에서 Presto query 실행) 본문

Airflow/Airflow Operator

Airflow Operator : PrestoHook (Airflow에서 Presto query 실행)

CosmosProject 2025. 10. 18. 14:21
728x90
반응형

 

 

 

 

 

Airflow에서 PrestoHook은 Presto database와 연결하여 PrestoHook query를 실행하는 등의 작업을 해줍니다.

 

PrestoHook은 특히 PythonOperator와 같이 사용하면 용이하므로 아래 예시도 PythonOperator와 같이 사용하는 상황을 가정해보았습니다.

 

from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator

dag = DAG(
    dag_id='test_dag',
    start_date=datetime.datetime(2022, 9, 27),
    schedule_interval='0 20 * * *'
)


def test_function():
    import pandas as pd
    from airflow.providers.presto.hooks.presto import PrestoHook  # 1
    
    presto_hook = PrestoHook('presto_connection_id')  # 2
    
    query_test = '''  # 3
    select  name
            , id
            , quantity
    from temp_table_1
    ;
    '''
    
    result_first_row = presto_hook.get_first(query_test)  # 4
    result = presto_hook.get_records(query_test)  # 5
    
op_test_python = PythonVirtualenvOperator(
    dag=dag,
    task_id='test_operator_pg',
    requirements=[
        'pandas',
    ],
    python_callable=test_function
)

 

# 1

PrestoHook을 import합니다.

PrestoHook의 위치는 airflow 버전 등에 따라 달라질 수 있습니다.

 

# 2

PrestoHook의 사용을 위해 Presto database와 연결할 connection을 전달하며 presto_hook class를 만듭니다.

 

# 3

PrestoHook을 이용하여 실행시킬 query를 작성하여 변수에 담아둡니다.

예시에서는 단순한 select 구문만 전달했지만 실제로는 drop, create, insert, delete, update, grant 등 모든 구문을 가진 query를 전달할 수 있습니다.

 

# 4

query를 PrestoHook으로 실행하는데 get_first method를 사용했습니다.

query를 실행하려면 실행할 query를 전달해야하고,

get_first method는 query의 실행 결과 중 첫 번째 row 1줄만 가져오라는 의미입니다.

 

# 5

query를 postgres_hook으로 실행하는데 get_records method를 사용했습니다.

query를 실행하려면 실행할 query를 전달해야하고,

get_records method는 query의 실행 결과 전부를 가져오라는 의미입니다.

 

 

 

 

 

 

PrestoHook의 결과로 나오는 데이터는 아래와 같이 생겼습니다.

(단순 예시 데이터이므로 참고만 해주시면 됩니다.)

[
    ('apple', 1596138, 2),
    ('peach', 1235934, 1),
    ('banana', 9763156, 2),
    ('watermelon', 3198462, 10),
    ('grape', 7613512, 3),
]

 

query의 결과는 어떠한 표의 형태일텐데

이 결과의 1개 행이 1개의 tuple에 담겨있으며,

모든 row가 바깥의 list 속에 담겨있는 형태입니다.

 

이러한 형태는 pandas를 이용하여 DataFrame으로도 바꿀 수 있기에 위에서 언급한 추가적인 연산이 쉽게 가능합니다.

 

아래 예시는 PrestoHook의 결과를 DataFrame으로 바꾸는 예시입니다.

 

from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator

dag = DAG(
    dag_id='test_dag',
    start_date=datetime.datetime(2022, 9, 27),
    schedule_interval='0 20 * * *'
)


def test_function():
    import pandas as pd
    from airflow.providers.presto.hooks.presto import PrestoHook
    
    presto_hook = PrestoHook('presto_connection_id')
    
    query_test = '''
    select  name
            , id
            , quantity
    from temp_table_1
    ;
    '''
    
    result = presto_hook.get_records(query_test)  # 1
    df_result = pd.DataFrame(  # 2
        result,
        columns=[
            'name',
            'id',
            'quantity',
        ]
    )
    
op_test_python = PythonVirtualenvOperator(
    dag=dag,
    task_id='test_operator_pg',
    requirements=[
        'pandas',
    ],
    python_callable=test_function
)

 

# 1

PrestoHook으로 query를 실행하고 모든 결과를 얻어옵니다.

 

# 2

query 결과를 DataFrame으로 변환합니다.

주의할 점은 PrestoHook의 결과값에는 column 정보가 없으므로 위처럼 column을 이름을 명시해주는게 좋습니다.

 

 

 

 

 

 

 

추가로 PrestoHook은 좀 특이하게 ;으로 구문된 다중 query를 그냥 전달하면 에러를 일으킬 때가 있습니다.

from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator

dag = DAG(
    dag_id='test_dag',
    start_date=datetime.datetime(2022, 9, 27),
    schedule_interval='0 20 * * *'
)


def test_function():
    import pandas as pd
    from airflow.providers.presto.hooks.presto import PrestoHook
    
    presto_hook = PrestoHook('presto_connection_id')
    
    query_test = '''
    drop table if exists test_table_1;
    create table test_table_1 as
    select  *
    from main_tabe
    ;
    select  name
            , id
            , quantity
    from test_table_1
    ;
    '''
    
    list_query = query_test.split(';')  # 1
    for q in list_query:  # 2
        result = presto_hook.get_records(query_test)
    
op_test_python = PythonVirtualenvOperator(
    dag=dag,
    task_id='test_operator_pg',
    requirements=[
        'pandas',
    ],
    python_callable=test_function
)

 

그럴 때에는 위처럼 query를 세미콜론(;)으로 나눠서 for loop를 이용해 각각 실행하는 것이 해결책이 될 때가 있습니다.

 

 

 

 

 

FYI

https://airflow.apache.org/docs/apache-airflow-providers-presto/stable/_api/airflow/providers/presto/hooks/presto/index.html#airflow.providers.presto.hooks.presto.PrestoHook

 

airflow.providers.presto.hooks.presto — apache-airflow-providers-presto Documentation

 

airflow.apache.org

 

 

 

 

 

 

 

 

 

 

code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template
code template

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

728x90
반응형
Comments