Python/Python pyspark
Python pyspark : write, option, saveAsTable (spark dataframe을 AWS s3에 업로드하기)
CosmosProject
2021. 5. 19. 06:14
728x90
반응형
spark dataframe은 table에 삽입될 수도 있지만
AWS s3 server에 upload될 수도 있습니다.
from pyspark.sql import SparkSession
spark = SparkSession.builder\ # 1
.appName('Test_runner')\
.config('hive.mapred.mode', 'nonstrict')\
.config('hive.exec.dynamic.partition', 'true')\
.config('hive.exec.dynamic.partition.mode', 'nonstrict')\
.config('hive.exec.parallel', 'true')\
.config('hive.stats.fetch.column.stats', 'true')\
.config('hive.strict.checks.large.query', 'nonstrict')\
.enableHiveSupport()\
.getOrCreate()
query = '''
select *
from test_schema.test_table
'''
df_spark = spark.sql(query) # 2
spark.sql(''' # 3
drop table if exists test_schema.new_table
''')
df_spark.write.format('parquet').mode('overwrite').option('path', 's3://test_dir/sub_1'.saveAsTable('test_schema.new_table') # 4
1. SparkSession을 initialize합니다.
2. query를 돌려 query 결과를 얻습니다.
3. 2번에서 얻은 query결과를 s3 server에 upload하며 test_schema.new_table에도 넣어줄겁니다.
그러기 위해서 기존에 존재할 수 있는 test_schema.new_table를 drop해줍니다.
4. df_spark에 있는 spark dataframe을
parquet format으로 변환하여
덮어씌울건데(overwrite)
s3 server 상의 test_dir/sub_1이라는 directory 안에 parquet format으로 spark dataframe을 upload하며
test_schema.new_table이라는 테이블에 덮어씌웁니다.
728x90
반응형