달나라 노트

Python pyspark : sql (spark에서 hive 쿼리 돌리기) 본문

Python/Python pyspark

Python pyspark : sql (spark에서 hive 쿼리 돌리기)

CosmosProject 2021. 5. 19. 06:02
728x90
반응형

 

 

 

hive에서 직접 쿼리를 돌릴때보단 spark에서 돌리는게 좀 더 빠릅니다.

물론 결과 데이터가 크면 용량 초과 에러가 뜰 순 있지만요.

 

from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .appName('Test_runner')\
    .config('hive.mapred.mode', 'nonstrict')\
    .config('hive.exec.dynamic.partition', 'true')\
    .config('hive.exec.dynamic.partition.mode', 'nonstrict')\
    .config('hive.exec.parallel', 'true')\
    .config('hive.stats.fetch.column.stats', 'true')\
    .config('hive.strict.checks.large.query', 'nonstrict')\
    .enableHiveSupport()\
    .getOrCreate()

query = '''
select  *
from test_schema.test_table
'''

df_spark = spark.sql(query)

위처럼 SparkSession을 initialize하는데 SparkSession에서 config method가 여러 개 명시되었습니다.

spark query를 hive에서 실행시킬 것이기 때문에 hive에서 query를 돌릴 때 제가 원하는 여러 가지 옵션을 config method로 설정해주는 것입니다.

예를 들어 config('hive.mapred.mode', 'nonstrict') 이 설정은 hive에서 아래와 같은 구문을 실행시키는 것과 동일한 효과를 가집니다.

 

hive.mapred.mode = nonstrict;

 

그러면 이것은 무엇을 의미하느냐?

실행시키는 query에서 partition이 있는 table이 있는 경우 partition 컬럼의 조건이 where 구문에 들어가지 않아도 query를 실행하라는 의미입니다.

 

반대로 아래처럼 설정하면 query에서 partition table을 이용할 경우 반드시 partition column의 조건을 where에 명시해줘야 query가 실행됩니다.

hive.mapred.mode = strict;

 

이런식으로 config method를 이용해서 hive query를 실행할 때 hive에 대해 여러 셋팅값을 설정해주는 것입니다.

 

 

 

 

그리고 생성한 spark class의 sql method를 이용해서 query를 돌리고 그 결과를 spark dataframe의 형태로 얻을 수 있습니다.

 

여기서 주의할 점은 query에 세미콜론(;)이 있으면 에러가 뜹니다.

반드시 세미콜론은 모두 없애주어야 합니다.

 

 

 

 

다중 쿼리 돌리기

 

위 예시처럼 단순히 select 구문 1개가 아니라 여러 개의 query를 순차적으로 한번에 돌리는 방법을 알아봅시다.

 

 

from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .appName('Test_runner')\
    .config('hive.mapred.mode', 'nonstrict')\
    .config('hive.exec.dynamic.partition', 'true')\
    .config('hive.exec.dynamic.partition.mode', 'nonstrict')\
    .config('hive.exec.parallel', 'true')\
    .config('hive.stats.fetch.column.stats', 'true')\
    .config('hive.strict.checks.large.query', 'nonstrict')\
    .enableHiveSupport()\
    .getOrCreate()

query = ''' # 1
drop view if exists temp_id;
create temporary view temp_id as
select	id
from test_schema.test_table
;


select  *
from test_schema.test_table_2 as tt2
join temp_id as ti
on ti.id = tt2.id
;
'''


list_queries = query.split(';') # 2
list_queries.remove('\n') # 3

list_results = []
for q in list_queries: # 4
    list_results.append(spark.sql(q))

1. 이전 예시와는 다르게 총 3개의 query가 존재합니다.

- 혹시라도 존재할 temp_id라는 view를 drop

- temp_id라는 이름의 temporary view 생성

- 생성한 temp_id를 test_table_2와 join하여 원하는 결과를 얻음

 

이처럼 다중쿼리가 적혀있죠.

 

보통 다중쿼리를 사용하면 단위별로 쿼리를 관리할 수 있게 되어서 select 속의 select 속의 select ... 등의 subquery로 관리할때보다 훨씬 편합니다.

 

다만 이런 경우 보통 temporary table(create temporary table)을 사용하는데 위 예시에서 temporary view(create temporary view)를 사용한 것은 hive에서 spark를 통해 다중쿼리를 돌릴 때 create temporary table이 지원되지 않는 경우가 있기 때문입니다.

 

 

 

2. split method를 이용해 쿼리를 세미콜론(;)을 기준으로 나누고 list의 형태로 저장합니다.

 

 

 

3. list_queries에서 줄바꿈('\n')을 제거하는 이유는 query를 split할 때 원본 query text의 어느 부분에서 세미콜론으로 구분된 줄바꿈이 list에 들어갈 수 있기 때문입니다.

이런 경우 줄바꿈('\n')이라는 텍스트가 쿼리 중 하나로 돌게되고 그러면 에러가뜨겠죠.

따라서 이런 에러를 방지하기 위해 제거하는 것입니다.

(기타 다른 쿼리 내용과 관계없는 text가 들어갈 가능성이 있거나 한다면 모두 제거해줘야합니다.)

 

위 예시에서 이게 무슨소린지 파악해봅시다.

query = '''
drop view if exists temp_id;
create temporary view temp_id as
select	id
from test_schema.test_table
;


select  *
from test_schema.test_table_2 as tt2
join temp_id as ti
on ti.id = tt2.id
;
'''


list_queries = query.split(';')


-- Result
list_queries = [
'''
drop view if exists temp_id''',

'''
create temporary view temp_id as
select	id
from test_schema.test_table
''',

'''


select  *
from test_schema.test_table_2 as tt2
join temp_id as ti
on ti.id = tt2.id
''',

'''\n'''
]

Result는 query를 ;을 기준으로 split했을 때 list_queires에 담기는 query들의 모양이 어떠한지 나타내봤습니다.

원본 query자체가 세미콜론(;) 앞 뒤로 줄바꿈이 존재하기 때문에 나눠진 쿼리도 세미콜론을 기준으로 나뉜 후 결과값에 줄바꿈까지 포함된 것을 볼 수 있습니다.

여기서 가장 주목할건 마지막 부분입니다.

 

list_queries의 마지막 요소로 '''\n''' (줄바꿈)이 있습니다.

이것은 query의 마지막 부분이 아래처럼 끝났기 때문에 세미콜론(;) 뒤에 있는 줄바꿈이 마지막 요소로서 split된겁니다.

query = '''
...
on ti.id = tt2.id
; # <<-- 이 부분과
''' # <<-- 이 부분에 줄바꿈이 있기 때문에 마지막 세미콜론의 split의 결과로 줄바꿈이 생기게 됩니다.

쿼리문과 함께 줄바꿈/띄어쓰기등이 동시에 있는 것은 상관없습니다만

줄바꿈만 있는 것은 query 실행 시 error를 일으킵니다.

 

그래서 아래처럼 remove로 줄바꿈인 요소를 list에서 삭제해주는거죠.

list_queries.remove('\n')

 

이처럼 줄바꿈, 공백 등 기타 쿼리와 관련없는 내용으로만 이뤄진 내용이 split된다면 반드시 지워줘야 에러가 나지 않습니다.

 

 

 

4. 반복문을 통해 순차적으로 쿼리를 돌려주며 결과를 list에 추가해줍니다.

여기서는 가장 마지막 값이 우리가 원하는 최종 select 구문에 대한 결과겠죠.

 

- PS

drop view if exists ~~;

위처럼 return값이 없는 쿼리는 결과로서 empty dataframe이 반환됩니다.

 

 

 

 

 

 

728x90
반응형
Comments