달나라 노트

Python pyhive : access hive and run query using python, python으로 hive query 돌리기 본문

Python/Python ETC

Python pyhive : access hive and run query using python, python으로 hive query 돌리기

CosmosProject 2022. 1. 20. 19:26
728x90
반응형

 

 

 

pyhive 라이브러리는 python에서 서버에 접근하여 Hive SQL query를 실행할 수 있도록 해줍니다.

 

이번에는 pyhive 라이브러리를 이용해서 Hive query를 돌리는 예시를 봐봅시다.

 

 

 

아래 코드가 가장 기본적인 pyhive 사용 예시입니다.

from pyhive import hive
import pandas as pd

hive_con = hive.Connection(
    host='test_host',
    port=10000,
    username='test_user_id',
    password='test_user_password',
    database='default',
    auth='LDAP'
)
cursor = hive_con.cursor()

test_query = '''
select tt.id,
       tt.name
from test_schema.test_table as tt
;
'''

cursor.execute(test_query.replace(';', ''))

try:
    output = cursor.fetchall()
except:
    output = None

output_df = pd.DataFrame(output, columns=['id', 'name'])

 

순서대로 살펴봅시다.

 

 

 

 

from pyhive import hive
import pandas as pd

hive_con = hive.Connection(
    host='test_host',
    port=10000,
    username='test_user_id',
    password='test_user_password',
    database='default',
    auth='LDAP'
)

...

pyhive는 Hive query를 실행할 때 필요하고, pandas는 query의 결과를 DataFrame으로 저장할 때 필요하므로 둘 다 import해줍니다.

 

Hive에서 query를 실행하려면 먼저 hive의 Connection method를 이용해서 서버에 접속을 해야합니다.

서버와의 connection을 생성해주는 부분이죠.

 

connection에 들어가는 정보들을 하나씩 살펴봅시다.

 

host='test_host',

hive 연결에 필요한 host인데 이건 서버마다 다르므로 hive query를 실행시킬 서버의 host를 넣으면 됩니다.

 

port=10000,

hive에 연결할 때 사용할 port값인데 기본값이 10000이라 보통 10000을 입력하면 됩니다.

(에러가 발생할 경우 서버관리자에게 물어봐야합니다.)


username='test_user_id',
password='test_user_password',

hive 서버에 접속할 때 사용할 user의 id와 비밀번호입니다.

이것 또한 사용자에 따라 다르므로 본인에게 맞는 값을 입력하면 됩니다.

 

database='default',

hive 서버 database 기본값입니다.

보통 default를 적으면 됩니다.

 

auth='LDAP'

hive 서버 연결 시 인증 방식을 적어주는 곳입니다.

기본적으로 hive는 LDAP 인증방식을 사용하고 있으니 LDAP을 적어주면 됩니다.

 

 

 

 

 

 

from pyhive import hive
import pandas as pd

hive_con = hive.Connection(
    host='test_host',
    port=10000,
    username='test_user_id',
    password='test_user_password',
    database='default',
    auth='LDAP'
)
cursor = hive_con.cursor()

이렇게 hive 서버와 연결할 때 필요한 정보들을 담은 객체를 hive_con이라는 변수에 할당하였습니다.

그리고 hive_con에 있는 cursor method를 불러옵니다.

 

cursor method는 query를 직접 실행하거나 query 결과를 가져오는 등의 역할을 합니다.

마치 여러분이 실제 hive 쿼리를 직접 실행할 때 마우스 커서로 실행버튼을 누르고 결과값을 복사하는 등의 역할과 비슷하죠.

 

 

 

...

test_query = '''
select tt.id,
       tt.name
from test_schema.test_table as tt
;
'''

...

실행할 query입니다.

별도의 변수에 저장해놨는데 별도에 파일에 있는 것을 open method로 불러와도 됩니다.

(Python open 관련 내용 = https://cosmosproject.tistory.com/169)

 

 

 

 

 

 

 

from pyhive import hive
import pandas as pd

hive_con = hive.Connection(
    host='test_host',
    port=10000,
    username='test_user_id',
    password='test_user_password',
    database='default',
    auth='LDAP'
)
cursor = hive_con.cursor()

test_query = '''
select tt.id,
       tt.name
from test_schema.test_table as tt
;
'''

cursor.execute(test_query.replace(';', ''))

위에서 설정한 cursorexecute method를 이용하여 query를 실행합니다.

execute method는 parameter로서 SQL query를 받습니다.

 

근데 위에서 한 가지 주의해야할 것은 test_query를 그대로 전달하지 않고 replace를 이용해서 세미콜론(;)을 공백으로 바꾼 채로 전달했다는 것입니다.

 

보통 query를 실행할 때에는 세미콜론까지 적어줘도 상관없지만 pyhive를 이용해서 query를 실행시킬 땐 마지막에 세미콜론이 없어야합니다.

 

 

 

 

 

from pyhive import hive
import pandas as pd

...

cursor.execute(test_query.replace(';', ''))

try:
    output = cursor.fetchall()
except:
    output = None

...

cursor.execute method로 query를 실행한 후 그 결과를 가져오는 method는 fetchall 입니다.

그래서 위 부분의 try ~ except 구문을 보면 cursor.fetchall() method를 이용해서 output이라는 변수에  query의 결과를 할당하고 있죠.

 

근데 왜 굳이 try ~ except 구문을 이용해서 query결과를 할당하는지 궁금할겁니다.

그 이유는 query의 결과가 없을수도 있기 때문입니다.

 

query의 결과가 없는 경우 fetchall method로 query결과를 가져오려고 할 때 error가 발생할 수 있으므로,

try ~ except 구문을 이용하여 fetchall에서 에러 발생 시 output 변수에 None을 할당하여 query 결과가 없다는걸 보여주려는 것이죠.

 

 

 

 

 

 

from pyhive import hive
import pandas as pd

...

test_query = '''
select tt.id,
       tt.name
from test_schema.test_table as tt
;
'''

...

output_df = pd.DataFrame(output, columns=['id', 'name'])

그리고 마지막으로 output에 저장된 query결과를 DataFrame으로 만들어서 output_df라는 변수에 저장하고 있습니다.

여기서 또한 주의할 점은 결과 data에는 컬럼의 이름이 포함되어있지 않습니다.

그래서 DataFrame으로 만들 때 columns 옵션에 ['id', 'name']과 같이 직접 column 이름을 입력해줬습니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

이제 예시를 좀 바꿔봅시다.

아래 예시가 이전에 봤던 예시와 달라진 점은 실행할 query가 1개에서 여러 개로 바뀐 것입니다.

from pyhive import hive
import pandas as pd

hive_con = hive.Connection(
    host='test_host',
    port=10000,
    username='test_user_id',
    password='test_user_password',
    database='default',
    auth='LDAP'
)
cursor = hive_con.cursor()

test_query = '''
drop table if exists temp_table;
create temporary table temp_table as
select id
from test_schema.raw_table
;

select tt.id,
       tt.name
from test_schema.test_table as tt
join temp_table as rt on rt.id = tt.id
;
'''

cursor.execute(test_query.replace(';', ''))

try:
    output = cursor.fetchall()
except:
    output = None

output_df = pd.DataFrame(output, columns=['id', 'name'])

 

원래 실행할 query는 그냥 select 구문 하나였는데 이제는 drop table, create temp table, select 등으로 총 3개의 query가 있습니다.

(세미콜론(;)으로 구분된 쿼리가 총 세 부분 존재하죠.)

 

위 코드를 그냥 실행하면 error가 발생합니다.

 

그 이유는 다음과 같습니다.

1. 처음에 봤던 예시에서 설명했듯이 pyhive에서 query를 실행할 때 query 구문에 세미콜론(;)이 있으면 안됩니다.

2. 따라서 위 예시에서와 같이 다중 query는 한번에 실행하면 error가 발생합니다.

 

 

이 내용을 해결하기 위해선 test_query 변수에 저장된 query를 세미콜론을 기준으로 각 단위별로 나누고 순차적으로 실행하는 것입니다.

 

이걸 실제 풀어서 써보면

drop table if exists temp_table
create temporary table temp_table as
select id
from test_schema.raw_table
select tt.id,
       tt.name
from test_schema.test_table as tt
join temp_table as rt on rt.id = tt.id

이렇게 query를 세미콜론을 기준으로 나눠서 3개로 분리하여 1개씩 순차적으로 execute method의 parameter로 전달하여 1개씩 query를 실행하면 된다는 의미입니다.

 

 

 

 

 

위처럼 query를 나누는 방식은 여러 방법이 있겠지만 저는 아래처럼 test_query를 세 부분으로 나눠서 list에 저장하는 형식으로 진행하였습니다.

from pyhive import hive
import pandas as pd

hive_con = hive.Connection(
    host='test_host',
    port=10000,
    username='test_user_id',
    password='test_user_password',
    database='default',
    auth='LDAP'
)
cursor = hive_con.cursor()

test_query = '''
drop table if exists temp_table;
create temporary table temp_table as
select id
from test_schema.raw_table
;

select tt.id,
       tt.name
from test_schema.test_table as tt
join temp_table as rt on rt.id = tt.id
;
'''

query_list = test_query.split(';\n')
query_list = [str(q).strip() for q in query_list]
query_list = list(filter(lambda q: q != '', query_list))

for q in query_list:
    cursor.execute(q.replace(';', ''))

try:
    output = cursor.fetchall()
except:
    output = None

output_df = pd.DataFrame(output, columns=['id', 'name'])

순서를 설명해보면 다음과 같습니다.

 

query_list = test_query.split(';\n')

원본 query가를 보면 각각의 query가 세미콜론(;) 다음 줄바꿈으로 구분되어있습니다.

따라서 세미콜론과 줄바꿈을 나타내는 \n을 기준으로 query를 나눠서 list로 만듭니다.

 

[
'\ndrop table if exists temp_table',
'create temporary table temp_table as\nselect id\nfrom test_schema.raw_table\n',
'\nselect tt.id,\n       tt.name\nfrom test_schema.test_table as tt\njoin temp_table as rt on rt.id = tt.id\n',
''
]

여기까지 해서 query_list를 print해보면 위와 같습니다.

query가 세 개의 부분으로 잘 나눠졌네요.

다만 query 양쪽에 줄바꿈 표시인 \n도 보이고 맨 마지막 요소로는 공백('') 도 존재하는 것 같습니다.

 

 

 

query_list = [str(q).strip() for q in query_list]

두 번째 부분입니다.

위에서 나눠진 query들을 봤을 때 query 구문 앞 뒤로 줄바꿈(\n)같은 불필요한 공백들이 존재했습니다.

따라서 strip method를 이용해서 query의 양쪽 끝에 있는 공백들을 다 없애주는겁니다.

(strip method 관련 내용 = https://cosmosproject.tistory.com/148)

 

여기까지 해서 query_list를 print해보면 다음과 같습니다.

[
'drop table if exists temp_table',
'create temporary table temp_table as\nselect id\nfrom test_schema.raw_table',
'select tt.id,\n tt.name\nfrom test_schema.test_table as tt\njoin temp_table as rt on rt.id = tt.id',
''
]

 

이전과 비교하면 각각의 query 구문 양쪽 끝에 존재하던 불필요한 공백(줄바꿈 등)들이 사라졌습니다.

 

하지만 아직 부족합니다.

가장 마지막 요소를 보시면 따옴표 2개가 보이시나요?

그냥 공백이 query_list의 일부로 포함되어 있습니다.

이렇게 그냥 공백을 execute method로 실행시키면 error가 발생할거라서 이것도 제거해줍시다.

 

 

 

query_list = list(filter(lambda q: q != '', query_list))

filter method를 이용해서 공백인 부분은 모두 제외하였습니다.

여기까지 해서 query_list를 print해보면 다음과 같습니다.

[
'drop table if exists temp_table',
'create temporary table temp_table as\nselect id\nfrom test_schema.raw_table',
'select tt.id,\n       tt.name\nfrom test_schema.test_table as tt\njoin temp_table as rt on rt.id = tt.id'
]

저희가 원하던대로 원본 query를 세미콜론 기준으로 세 부분으로 나누었죠.

 

 

 

 

from pyhive import hive
import pandas as pd

...

select tt.id,
       tt.name
from test_schema.test_table as tt
join temp_table as rt on rt.id = tt.id
;
'''

query_list = test_query.split(';\n')
query_list = [str(q).strip() for q in query_list]
query_list = list(filter(lambda q: q != '', query_list))

for q in query_list:
    cursor.execute(q.replace(';', ''))

...

그리고 이렇게 나눠진 query를 for loop를 이용해서 순차적으로 execute method에 전달해주면 됩니다.

 

그 후는 처음에 봤던 예시와 동일합니다.

 

 

 

 

 

 

728x90
반응형
Comments