달나라 노트

Python psycopg2 : access redshift and run query using python, python으로 redshift에서 쿼리 돌리기 본문

Python/Python ETC

Python psycopg2 : access redshift and run query using python, python으로 redshift에서 쿼리 돌리기

CosmosProject 2020. 12. 23. 00:20
728x90
반응형

 

 

import psycopg2
import pandas as pd

def query_runner(query):
    dbname = 'sandbox'
    host = 'test_host'
    port = 1111 # port_number
    user_id = 'redshift_user_id'
    password = 'redshift_user_name'
    connection_info = "dbname='{}' host='{}' port={} user='{}' password='{}'".format(dbname, host, port, user_id, password)

    connection = psycopg2.connect(connection_info)
    cursor = connection.cursor()
    cursor.execute(query)

    try:
        query_output = cursor.fetchall()
        column_names = [x[0] for x in cursor.description]
        df_output = pd.DataFrame(query_output, columns=column_names)
    except Exception as e:
        if str(e) == 'no results to fetch':
            print(e)
            df_output = pd.DataFrame({})
        else:
            print(e)
            sys.exit()

    connection.close()

    return df_output


query = '''
drop table if exists temp_table;
create temp table temp_table as
select id
from test_schema.raw_table
;

select	tt.*
from test_schema.test_table as tt
join test_schema.raw_table as rt on rt.id = tt.id
;
'''

query_result = query_runner(query)

You can run query and get the result in python.

The reulst will be stored in the variable 'query_result' and the type of result will be DataFrame of pandas.

 

 

 

import psycopg2
import pandas as pd

def query_runner(query):
    dbname = 'sandbox'
    host = 'test_host'
    port = 1111 # port_number
    user_id = 'redshift_user_id'
    password = 'redshift_user_name'
    connection_str = "dbname='{}' host='{}' port={} user='{}' password='{}'".format(dbname, host, port, user_id, password)

	...

The above part is to make string information to be needed when connect to database.

 

 

 

 

 

import psycopg2
import pandas as pd

def query_runner(query):
	...
    
    connection = psycopg2.connect(connection_info)
    cursor = connection.cursor()
    cursor.execute(query)
	
    ...

connect method in psycopg2 will connect you with server using required information stored in variable 'connection_str'.

 

And cursor method after connection will make you run query.

 

 

 

 

 

 

import psycopg2
import pandas as pd

def query_runner(query):
	...

    try:
        query_output = cursor.fetchall()
        column_names = [x[0] for x in cursor.description]
        df_output = pd.DataFrame(query_output, columns=column_names)
    except Exception as e:
        if str(e) == 'no results to fetch':
            print(e)
            df_output = pd.DataFrame({})
        else:
            print(e)
            sys.exit()

    connection.close()

    return df_output

	...
    
    

After run query using code(cursor.execute(query)), you can get result data using 'fetchall()' method of cursor.

And cursor.description contains column information of reuslt so you can use this column names as the column name of DataFrame.

 

The reason why 'try ~ except' is used, to dealing with error or no result situation.

 

Lastly, you should close connection with server using 'connection.close()'.

And this function will return the result DataFrame.

 

 

 

 

 

 

728x90
반응형
Comments