| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | |||
| 5 | 6 | 7 | 8 | 9 | 10 | 11 |
| 12 | 13 | 14 | 15 | 16 | 17 | 18 |
| 19 | 20 | 21 | 22 | 23 | 24 | 25 |
| 26 | 27 | 28 | 29 | 30 |
- 파이썬
- Kotlin
- Apache
- SQL
- Excel
- c#
- string
- Redshift
- gas
- Tkinter
- PySpark
- Google Spreadsheet
- math
- Java
- google apps script
- django
- Python
- array
- matplotlib
- dataframe
- PostgreSQL
- Github
- Presto
- hive
- numpy
- list
- GIT
- PANDAS
- Google Excel
- Today
- Total
달나라 노트
Airflow Operator : PostgresHook (PostgreSQL query 실행) 본문
Airflow Operator : PostgresHook (PostgreSQL query 실행)
CosmosProject 2025. 10. 18. 13:34
Airflow에서 PostgresHook은 PostgreSQL database와 연결하여 PostgreSQL query를 실행하는 등의 작업을 해줍니다.
PostgresOperator와 차이점이 무엇이며, PostgresHook은 어떨 때 쓰면 용이한지 알아봅시다.
PostgresHook은 특히 PythonOperator와 같이 사용하면 용이하므로 아래 예시도 PythonOperator와 같이 사용하는 상황을 가정해보았습니다.
from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator
dag = DAG(
dag_id='test_dag',
start_date=datetime.datetime(2022, 9, 27),
schedule_interval='0 20 * * *'
)
def test_function():
import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook # 1
postgres_hook = PostgresHook('postgres_connection_id') # 2
query_test = ''' # 3
select name
, id
, quantity
from temp_table_1
;
'''
result_first_row = postgres_hook.get_first(sql=query_test) # 4
result = postgres_hook.get_records(sql=query_test) # 5
op_test_python = PythonVirtualenvOperator(
dag=dag,
task_id='test_operator_pg',
requirements=[
'pandas',
],
python_callable=test_function
)
# 1
PostgresHook을 import합니다.
PostgresHook의 위치는 airflow 버전 등에 따라 달라질 수 있습니다.
# 2
PostgresHook의 사용을 위해 PostgreSQL database와 연결할 connection을 전달하며 postgres_hook class를 만듭니다.
# 3
PostgresHook을 이용하여 실행시킬 query를 작성하여 변수에 담아둡니다.
예시에서는 단순한 select 구문만 전달했지만 실제로는 drop, create, insert, delete, update, grant 등 모든 구문을 가진 query를 전달할 수 있습니다.
# 4
query를 postgres_hook으로 실행하는데 get_first method를 사용했습니다.
query를 실행하려면 sql이라는 parameter에 실행할 query를 전달해야하고,
get_first method는 query의 실행 결과 중 첫 번째 row 1줄만 가져오라는 의미입니다.
# 5
query를 postgres_hook으로 실행하는데 get_records method를 사용했습니다.
query를 실행하려면 sql이라는 parameter에 실행할 query를 전달해야하고,
get_records method는 query의 실행 결과 전부를 가져오라는 의미입니다.
PostgresOperator는 단순히 어떤 query를 수행하는 것이며, query 결과를 받아와서 원하는 추가적인 작업을 하기에는 어려울 수 있습니다.
그러나 PostgresHook을 이용하면 위처럼 PythonVirtualenvOperator 또는 PythonOperator와 동시에 사용하여 어떤 query를 수행하고 query 결과를 가져와서 어떠한 연산을 할 수 있습니다.
PostgresHook의 결과로 나오는 데이터는 아래와 같이 생겼습니다.
(단순 예시 데이터이므로 참고만 해주시면 됩니다.)
[
('apple', 1596138, 2),
('peach', 1235934, 1),
('banana', 9763156, 2),
('watermelon', 3198462, 10),
('grape', 7613512, 3),
]
query의 결과는 어떠한 표의 형태일텐데
이 결과의 1개 행이 1개의 tuple에 담겨있으며,
모든 row가 바깥의 list 속에 담겨있는 형태입니다.
이러한 형태는 pandas를 이용하여 DataFrame으로도 바꿀 수 있기에 위에서 언급한 추가적인 연산이 쉽게 가능합니다.
아래 예시는 PostgresHook의 결과를 DataFrame으로 바꾸는 예시입니다.
from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator
dag = DAG(
dag_id='test_dag',
start_date=datetime.datetime(2022, 9, 27),
schedule_interval='0 20 * * *'
)
def test_function():
import pandas as pd
from airflow.providers.postgres.hooks.postgres import PostgresHook
postgres_hook = PostgresHook('postgres_connection_id')
query_test = '''
select name
, id
, quantity
from temp_table_1
;
'''
result = postgres_hook.get_records(sql=query_test) # 1
df_result = pd.DataFrame( # 2
result,
columns=[
'name',
'id',
'quantity',
]
)
op_test_python = PythonVirtualenvOperator(
dag=dag,
task_id='test_operator_pg',
requirements=[
'pandas',
],
python_callable=test_function
)
# 1
PostgresHook으로 query를 실행하고 모든 결과를 얻어옵니다.
# 2
query 결과를 DataFrame으로 변환합니다.
주의할 점은 PostgresHook의 결과값에는 column 정보가 없으므로 위처럼 column을 이름을 명시해주는게 좋습니다.
FYI
https://airflow.apache.org/docs/apache-airflow/1.10.6/_api/airflow/hooks/postgres_hook/index.html
airflow.hooks.postgres_hook — Airflow Documentation
airflow.apache.org
'Airflow > Airflow Operator' 카테고리의 다른 글
| Airflow Operator : PrestoHook (Airflow에서 Presto query 실행) (0) | 2025.10.18 |
|---|---|
| Airflow Operator : HiveOperator (Airflow에서 Hive query 실행) (0) | 2025.10.18 |
| Airflow Operator : PythonVirtualenvOperator (airflow에서 python code 실행) (0) | 2025.10.18 |
| Airflow Operator : PythonOperator (airflow에서 python code 실행) (0) | 2025.10.18 |
| Airflow Operator : PostgresOperator (PostgreSQL query 실행) (0) | 2025.10.18 |