일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- c#
- PySpark
- django
- array
- google apps script
- dataframe
- Github
- math
- GIT
- string
- gas
- Excel
- Tkinter
- Google Excel
- matplotlib
- SQL
- Java
- numpy
- Google Spreadsheet
- Apache
- list
- PANDAS
- Python
- Redshift
- 파이썬
- Kotlin
- hive
- PostgreSQL
- Mac
- Today
- Total
달나라 노트
Python pyhive : access hive and run query using python, python으로 hive query 돌리기 본문
Python pyhive : access hive and run query using python, python으로 hive query 돌리기
CosmosProject 2022. 1. 20. 19:26
pyhive 라이브러리는 python에서 서버에 접근하여 Hive SQL query를 실행할 수 있도록 해줍니다.
이번에는 pyhive 라이브러리를 이용해서 Hive query를 돌리는 예시를 봐봅시다.
아래 코드가 가장 기본적인 pyhive 사용 예시입니다.
from pyhive import hive
import pandas as pd
hive_con = hive.Connection(
host='test_host',
port=10000,
username='test_user_id',
password='test_user_password',
database='default',
auth='LDAP'
)
cursor = hive_con.cursor()
test_query = '''
select tt.id,
tt.name
from test_schema.test_table as tt
;
'''
cursor.execute(test_query.replace(';', ''))
try:
output = cursor.fetchall()
except:
output = None
output_df = pd.DataFrame(output, columns=['id', 'name'])
순서대로 살펴봅시다.
from pyhive import hive
import pandas as pd
hive_con = hive.Connection(
host='test_host',
port=10000,
username='test_user_id',
password='test_user_password',
database='default',
auth='LDAP'
)
...
pyhive는 Hive query를 실행할 때 필요하고, pandas는 query의 결과를 DataFrame으로 저장할 때 필요하므로 둘 다 import해줍니다.
Hive에서 query를 실행하려면 먼저 hive의 Connection method를 이용해서 서버에 접속을 해야합니다.
서버와의 connection을 생성해주는 부분이죠.
connection에 들어가는 정보들을 하나씩 살펴봅시다.
host='test_host',
hive 연결에 필요한 host인데 이건 서버마다 다르므로 hive query를 실행시킬 서버의 host를 넣으면 됩니다.
port=10000,
hive에 연결할 때 사용할 port값인데 기본값이 10000이라 보통 10000을 입력하면 됩니다.
(에러가 발생할 경우 서버관리자에게 물어봐야합니다.)
username='test_user_id',
password='test_user_password',
hive 서버에 접속할 때 사용할 user의 id와 비밀번호입니다.
이것 또한 사용자에 따라 다르므로 본인에게 맞는 값을 입력하면 됩니다.
database='default',
hive 서버 database 기본값입니다.
보통 default를 적으면 됩니다.
auth='LDAP'
hive 서버 연결 시 인증 방식을 적어주는 곳입니다.
기본적으로 hive는 LDAP 인증방식을 사용하고 있으니 LDAP을 적어주면 됩니다.
from pyhive import hive
import pandas as pd
hive_con = hive.Connection(
host='test_host',
port=10000,
username='test_user_id',
password='test_user_password',
database='default',
auth='LDAP'
)
cursor = hive_con.cursor()
이렇게 hive 서버와 연결할 때 필요한 정보들을 담은 객체를 hive_con이라는 변수에 할당하였습니다.
그리고 hive_con에 있는 cursor method를 불러옵니다.
cursor method는 query를 직접 실행하거나 query 결과를 가져오는 등의 역할을 합니다.
마치 여러분이 실제 hive 쿼리를 직접 실행할 때 마우스 커서로 실행버튼을 누르고 결과값을 복사하는 등의 역할과 비슷하죠.
...
test_query = '''
select tt.id,
tt.name
from test_schema.test_table as tt
;
'''
...
실행할 query입니다.
별도의 변수에 저장해놨는데 별도에 파일에 있는 것을 open method로 불러와도 됩니다.
(Python open 관련 내용 = https://cosmosproject.tistory.com/169)
from pyhive import hive
import pandas as pd
hive_con = hive.Connection(
host='test_host',
port=10000,
username='test_user_id',
password='test_user_password',
database='default',
auth='LDAP'
)
cursor = hive_con.cursor()
test_query = '''
select tt.id,
tt.name
from test_schema.test_table as tt
;
'''
cursor.execute(test_query.replace(';', ''))
위에서 설정한 cursor의 execute method를 이용하여 query를 실행합니다.
execute method는 parameter로서 SQL query를 받습니다.
근데 위에서 한 가지 주의해야할 것은 test_query를 그대로 전달하지 않고 replace를 이용해서 세미콜론(;)을 공백으로 바꾼 채로 전달했다는 것입니다.
보통 query를 실행할 때에는 세미콜론까지 적어줘도 상관없지만 pyhive를 이용해서 query를 실행시킬 땐 마지막에 세미콜론이 없어야합니다.
from pyhive import hive
import pandas as pd
...
cursor.execute(test_query.replace(';', ''))
try:
output = cursor.fetchall()
except:
output = None
...
cursor.execute method로 query를 실행한 후 그 결과를 가져오는 method는 fetchall 입니다.
그래서 위 부분의 try ~ except 구문을 보면 cursor.fetchall() method를 이용해서 output이라는 변수에 query의 결과를 할당하고 있죠.
근데 왜 굳이 try ~ except 구문을 이용해서 query결과를 할당하는지 궁금할겁니다.
그 이유는 query의 결과가 없을수도 있기 때문입니다.
query의 결과가 없는 경우 fetchall method로 query결과를 가져오려고 할 때 error가 발생할 수 있으므로,
try ~ except 구문을 이용하여 fetchall에서 에러 발생 시 output 변수에 None을 할당하여 query 결과가 없다는걸 보여주려는 것이죠.
from pyhive import hive
import pandas as pd
...
test_query = '''
select tt.id,
tt.name
from test_schema.test_table as tt
;
'''
...
output_df = pd.DataFrame(output, columns=['id', 'name'])
그리고 마지막으로 output에 저장된 query결과를 DataFrame으로 만들어서 output_df라는 변수에 저장하고 있습니다.
여기서 또한 주의할 점은 결과 data에는 컬럼의 이름이 포함되어있지 않습니다.
그래서 DataFrame으로 만들 때 columns 옵션에 ['id', 'name']과 같이 직접 column 이름을 입력해줬습니다.
이제 예시를 좀 바꿔봅시다.
아래 예시가 이전에 봤던 예시와 달라진 점은 실행할 query가 1개에서 여러 개로 바뀐 것입니다.
from pyhive import hive
import pandas as pd
hive_con = hive.Connection(
host='test_host',
port=10000,
username='test_user_id',
password='test_user_password',
database='default',
auth='LDAP'
)
cursor = hive_con.cursor()
test_query = '''
drop table if exists temp_table;
create temporary table temp_table as
select id
from test_schema.raw_table
;
select tt.id,
tt.name
from test_schema.test_table as tt
join temp_table as rt on rt.id = tt.id
;
'''
cursor.execute(test_query.replace(';', ''))
try:
output = cursor.fetchall()
except:
output = None
output_df = pd.DataFrame(output, columns=['id', 'name'])
원래 실행할 query는 그냥 select 구문 하나였는데 이제는 drop table, create temp table, select 등으로 총 3개의 query가 있습니다.
(세미콜론(;)으로 구분된 쿼리가 총 세 부분 존재하죠.)
위 코드를 그냥 실행하면 error가 발생합니다.
그 이유는 다음과 같습니다.
1. 처음에 봤던 예시에서 설명했듯이 pyhive에서 query를 실행할 때 query 구문에 세미콜론(;)이 있으면 안됩니다.
2. 따라서 위 예시에서와 같이 다중 query는 한번에 실행하면 error가 발생합니다.
이 내용을 해결하기 위해선 test_query 변수에 저장된 query를 세미콜론을 기준으로 각 단위별로 나누고 순차적으로 실행하는 것입니다.
이걸 실제 풀어서 써보면
drop table if exists temp_table
create temporary table temp_table as
select id
from test_schema.raw_table
select tt.id,
tt.name
from test_schema.test_table as tt
join temp_table as rt on rt.id = tt.id
이렇게 query를 세미콜론을 기준으로 나눠서 3개로 분리하여 1개씩 순차적으로 execute method의 parameter로 전달하여 1개씩 query를 실행하면 된다는 의미입니다.
위처럼 query를 나누는 방식은 여러 방법이 있겠지만 저는 아래처럼 test_query를 세 부분으로 나눠서 list에 저장하는 형식으로 진행하였습니다.
from pyhive import hive
import pandas as pd
hive_con = hive.Connection(
host='test_host',
port=10000,
username='test_user_id',
password='test_user_password',
database='default',
auth='LDAP'
)
cursor = hive_con.cursor()
test_query = '''
drop table if exists temp_table;
create temporary table temp_table as
select id
from test_schema.raw_table
;
select tt.id,
tt.name
from test_schema.test_table as tt
join temp_table as rt on rt.id = tt.id
;
'''
query_list = test_query.split(';\n')
query_list = [str(q).strip() for q in query_list]
query_list = list(filter(lambda q: q != '', query_list))
for q in query_list:
cursor.execute(q.replace(';', ''))
try:
output = cursor.fetchall()
except:
output = None
output_df = pd.DataFrame(output, columns=['id', 'name'])
순서를 설명해보면 다음과 같습니다.
query_list = test_query.split(';\n')
원본 query가를 보면 각각의 query가 세미콜론(;) 다음 줄바꿈으로 구분되어있습니다.
따라서 세미콜론과 줄바꿈을 나타내는 \n을 기준으로 query를 나눠서 list로 만듭니다.
[
'\ndrop table if exists temp_table',
'create temporary table temp_table as\nselect id\nfrom test_schema.raw_table\n',
'\nselect tt.id,\n tt.name\nfrom test_schema.test_table as tt\njoin temp_table as rt on rt.id = tt.id\n',
''
]
여기까지 해서 query_list를 print해보면 위와 같습니다.
query가 세 개의 부분으로 잘 나눠졌네요.
다만 query 양쪽에 줄바꿈 표시인 \n도 보이고 맨 마지막 요소로는 공백('') 도 존재하는 것 같습니다.
query_list = [str(q).strip() for q in query_list]
두 번째 부분입니다.
위에서 나눠진 query들을 봤을 때 query 구문 앞 뒤로 줄바꿈(\n)같은 불필요한 공백들이 존재했습니다.
따라서 strip method를 이용해서 query의 양쪽 끝에 있는 공백들을 다 없애주는겁니다.
(strip method 관련 내용 = https://cosmosproject.tistory.com/148)
여기까지 해서 query_list를 print해보면 다음과 같습니다.
[
'drop table if exists temp_table',
'create temporary table temp_table as\nselect id\nfrom test_schema.raw_table',
'select tt.id,\n tt.name\nfrom test_schema.test_table as tt\njoin temp_table as rt on rt.id = tt.id',
''
]
이전과 비교하면 각각의 query 구문 양쪽 끝에 존재하던 불필요한 공백(줄바꿈 등)들이 사라졌습니다.
하지만 아직 부족합니다.
가장 마지막 요소를 보시면 따옴표 2개가 보이시나요?
그냥 공백이 query_list의 일부로 포함되어 있습니다.
이렇게 그냥 공백을 execute method로 실행시키면 error가 발생할거라서 이것도 제거해줍시다.
query_list = list(filter(lambda q: q != '', query_list))
filter method를 이용해서 공백인 부분은 모두 제외하였습니다.
여기까지 해서 query_list를 print해보면 다음과 같습니다.
[
'drop table if exists temp_table',
'create temporary table temp_table as\nselect id\nfrom test_schema.raw_table',
'select tt.id,\n tt.name\nfrom test_schema.test_table as tt\njoin temp_table as rt on rt.id = tt.id'
]
저희가 원하던대로 원본 query를 세미콜론 기준으로 세 부분으로 나누었죠.
from pyhive import hive
import pandas as pd
...
select tt.id,
tt.name
from test_schema.test_table as tt
join temp_table as rt on rt.id = tt.id
;
'''
query_list = test_query.split(';\n')
query_list = [str(q).strip() for q in query_list]
query_list = list(filter(lambda q: q != '', query_list))
for q in query_list:
cursor.execute(q.replace(';', ''))
...
그리고 이렇게 나눠진 query를 for loop를 이용해서 순차적으로 execute method에 전달해주면 됩니다.
그 후는 처음에 봤던 예시와 동일합니다.