달나라 노트

Python pyspark : join, left join, right join, full outer join (spark dataframe join 하기) 본문

Python/Python pyspark

Python pyspark : join, left join, right join, full outer join (spark dataframe join 하기)

CosmosProject 2021. 5. 31. 15:07
728x90
반응형

 

 

 

pyspark dataframe도 여러 dataframe을 아래와 같은 4개의 join을 통해 합칠 수 있습니다.

 

(inner) join

left join

right join

full outer join

 

join의 결과는 일반적인 sql에서의 join과 동일합니다.

 

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.getOrCreate()

df_item = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['apple', 'banana', 'tomato'],
    'price': [20000, 3500, 15000]
})
df_spark_item = spark.createDataFrame(df_item)
df_saprk_item.show()

df_qty = pd.DataFrame({
    'id': [1, 2, 4],
    'quantity': [3, 2, 5]
})
df_spark_qty = spark.createDataFrame(df_qty)
df_spark_qty.show()



-- Result
+---+--------+-------+
| id|    name|  price|
+---+--------+-------+
|  1|   apple|  20000|
|  2|  banana|   3500|
|  3|  tomato|  15000|
+---+--------+-------+

+---+----------+
| id|  quantity|
+---+----------+
|  1|         3|
|  2|         2|
|  4|         5|
+---+----------+

일단 위처럼 2개의 테스트용 spark dataframe을 만들어줍니다.

 

 

 

 


 

 

inner join

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.getOrCreate()

df_item = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['apple', 'banana', 'tomato'],
    'price': [20000, 3500, 15000]
})
df_spark_item = spark.createDataFrame(df_item)
df_saprk_item.show()

df_qty = pd.DataFrame({
    'id': [1, 2, 4],
    'quantity': [3, 2, 5]
})
df_spark_qty = spark.createDataFrame(df_qty)
df_spark_qty.show()


print('-- inner join --')
df_joined = df_spark_item.join(df_spark_qty, [df_spark_item.id == df_spark_qty.id], 'inner') # 1
df_joined.show()





-- Result
+---+--------+-------+
| id|    name|  price|
+---+--------+-------+
|  1|   apple|  20000|
|  2|  banana|   3500|
|  3|  tomato|  15000|
+---+--------+-------+

+---+----------+
| id|  quantity|
+---+----------+
|  1|         3|
|  2|         2|
|  4|         5|
+---+----------+

-- inner join --
+---+--------+-------+---+----------+
| id|    name|  price| id|  quantity|
+---+--------+-------+---+----------+
|  1|   apple|  20000|  1|         3|
|  2|  banana|   3500|  2|         2|
+---+--------+-------+---+----------+

 

1. 먼저 join은 아래처럼 사용할 수 있습니다.

df_left.join(df_right, df_left.col_name_1 == df_right.col_name_2, 'join_method') # (1)

df_left.join(df_right, 'col_name', 'join_method') # (2)

 

 

- 기본적으로 left dataframe에 join을 적용하고 right dataframe을 명시하여 사용합니다.

 

- join의 2번째 인자로는 join의 기준이 될 column을 명시해줍니다.

(1) [df_left.col_name_1 == df_right.col_name_2] 의 형태로 명시할 경우 join의 기준이 될 column이름이 2개의 dataframe에서 달라도 join 기준 column으로 사용가능합니다.

(2) ['col_name'] 처럼 그냥 column 이름만 명시한 경우는 양쪽 dataframe의 join 기준 column이름이 동일한 경우 사용할 수 있습니다.

(join 조건을 대괄호로 묶어서 list로 전달하는 이유는 join 조건이 여러 개일 경우 list의 형태로 조건을 전달하기 때문입니다.

join 조건이 하나일 땐 list 표기 없이 그냥 조건만 전달해도 되지만 다중조건이 있는 경우가 많으므로 통일성있게 list에 담아서 표현하는 방식을 사용하겠습니다.

이에 대한 예시는 맨 아래에서 살펴봅시다.)

 

- join의 마지막 인자로서는 join_method를 입력합니다.

아래 내용을 보면 각 키워드별 어떤 join을 의미하는지 알 수 있습니다.

inner -> inner join을 의미

left -> left join을 의미

right -> right join을 의미

full / outer -> full outer join을 의미.

 

 

이제 위 예시의 결과를 봅시다.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.getOrCreate()

...

print('-- inner join --')
df_joined = df_spark_item.join(df_spark_qty, [df_spark_item.id == df_spark_qty.id], 'inner') # 1
df_joined.show()





-- Result
...

-- inner join --
+---+--------+-------+---+----------+
| id|    name|  price| id|  quantity|
+---+--------+-------+---+----------+
|  1|   apple|  20000|  1|         3|
|  2|  banana|   3500|  2|         2|
+---+--------+-------+---+----------+

보면 inner join으로 join을 실행하고, join 기준 column은 id입니다.

따라서 df_spark_item, df_spark_qty 둘 다에 있는 id를 가진 id = 1, 2인 행만 join되어 나온 것을 알 수 있습니다.

 

여기서 한 가지 특징이 있습니다.

join 기준 column의 이름이 2개의 dataframe에 대해 모두 id라는 이름을 가지고 있습니다.

그래서 결과에서도 id컬럼이 2개가 중복으로 있는 것을 볼 수 있죠.

 

보통 저렇게 동일한 이름을 가진 column이 하나의 dataframe에 있으면 좋지않습니다.

이 경우를 다룰 수 있는 방법은 여러 가지가 있는데 알아봅시다.

 

 

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.getOrCreate()

df_item = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['apple', 'banana', 'tomato'],
    'price': [20000, 3500, 15000]
})
df_spark_item = spark.createDataFrame(df_item)

df_qty = pd.DataFrame({
    'id': [1, 2, 4],
    'quantity': [3, 2, 5]
})
df_spark_qty = spark.createDataFrame(df_qty)


print('-- inner join --')
df_joined = df_spark_item.join(df_spark_qty, [df_spark_item.id == df_spark_qty.id], 'inner') # 1
df_joined.show()

df_joined = df_spark_item.join(df_spark_qty, ['id'], 'inner') # 2
df_joined.show()

df_joined = df_spark_item.join(df_spark_qty, [df_spark_item.id == df_spark_qty.id], 'inner').drop(df_spark_qty.id) # 3
df_joined.show()





-- Result

-- inner join --
+---+--------+-------+---+----------+
| id|    name|  price| id|  quantity|
+---+--------+-------+---+----------+
|  1|   apple|  20000|  1|         3|
|  2|  banana|   3500|  2|         2|
+---+--------+-------+---+----------+

+---+--------+-------+----------+
| id|    name|  price|  quantity|
+---+--------+-------+----------+
|  1|   apple|  20000|         3|
|  2|  banana|   3500|         2|
+---+--------+-------+----------+

+---+--------+-------+----------+
| id|    name|  price|  quantity|
+---+--------+-------+----------+
|  1|   apple|  20000|         3|
|  2|  banana|   3500|         2|
+---+--------+-------+----------+

1. 이전 예시와 동일하게 id 컬럼이 중복된채로 return됩니다.

 

2. join사용 시 join 기준 column의 이름이 동일한 경우 column 이름으로만 명시해주면 이름이 동일한 2개의 기준 column 중 하나만 남은채로 결과가 return됩니다.

 

3. join 후 drop method를 이용하여 양쪽 dataframe 중 하나의 id column을 drop해줍니다. 그러면 결과에는 1개의 id column만 남게 되겠죠.

 

 

 




 

left join, right join

 

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.getOrCreate()

df_item = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['apple', 'banana', 'tomato'],
    'price': [20000, 3500, 15000]
})
df_spark_item = spark.createDataFrame(df_item)
df_saprk_item.show()

df_qty = pd.DataFrame({
    'id': [1, 2, 4],
    'quantity': [3, 2, 5]
})
df_spark_qty = spark.createDataFrame(df_qty)
df_spark_qty.show()


print('-- left join --')
df_joined = df_spark_item.join(df_spark_qty, [df_spark_item.id == df_spark_qty.id], 'left') # 1
df_joined.show()

print('-- right join --')
df_joined = df_spark_item.join(df_spark_qty, [df_spark_item.id == df_spark_qty.id], 'right') # 1
df_joined.show()





-- Result
+---+--------+-------+
| id|    name|  price|
+---+--------+-------+
|  1|   apple|  20000|
|  2|  banana|   3500|
|  3|  tomato|  15000|
+---+--------+-------+

+---+----------+
| id|  quantity|
+---+----------+
|  1|         3|
|  2|         2|
|  4|         5|
+---+----------+

-- inner join --
+---+--------+-------+-----+----------+
| id|    name|  price|   id|  quantity|
+---+--------+-------+-----+----------+
|  1|   apple|  20000|    1|         3|
|  2|  banana|   3500|    2|         2|
|  3|  tomato|  15000| null|      null|
+---+--------+-------+-----+----------+

-- right join --
+-----+--------+-------+----+----------+
|   id|    name|  price|  id|  quantity|
+-----+--------+-------+----+----------+
|    1|   apple|  20000|   1|         3|
|    2|  banana|   3500|   2|         2|
| null|    null|   null|   4|         5|
+-----+--------+-------+----+----------+

left join, right join은 위처럼 join method만 바꿔줌으로서 사용 가능합니다.

그 외에는 inner join과 모두 동일합니다.

 

 

 

 

 


 

full outer join

 

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.getOrCreate()

df_item = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['apple', 'banana', 'tomato'],
    'price': [20000, 3500, 15000]
})
df_spark_item = spark.createDataFrame(df_item)
df_saprk_item.show()

df_qty = pd.DataFrame({
    'id': [1, 2, 4],
    'quantity': [3, 2, 5]
})
df_spark_qty = spark.createDataFrame(df_qty)
df_spark_qty.show()


print('-- full outer join --')
df_joined = df_spark_item.join(df_spark_qty, [df_spark_item.id == df_spark_qty.id], 'outer') # 1
df_joined.show()

df_joined = df_spark_item.join(df_spark_qty, [df_spark_item.id == df_spark_qty.id], 'full') # 1
df_joined.show()





-- Result
+---+--------+-------+
| id|    name|  price|
+---+--------+-------+
|  1|   apple|  20000|
|  2|  banana|   3500|
|  3|  tomato|  15000|
+---+--------+-------+

+---+----------+
| id|  quantity|
+---+----------+
|  1|         3|
|  2|         2|
|  4|         5|
+---+----------+

-- inner join --
+-----+--------+-------+-----+----------+
|   id|    name|  price|   id|  quantity|
+-----+--------+-------+-----+----------+
|    1|   apple|  20000|    1|         3|
|    2|  banana|   3500|    2|         2|
|    3|  tomato|  15000| null|      null|
| null|    null|   null|    4|         5|
+-----+--------+-------+-----+----------+

+-----+--------+-------+-----+----------+
|   id|    name|  price|   id|  quantity|
+-----+--------+-------+-----+----------+
|    1|   apple|  20000|    1|         3|
|    2|  banana|   3500|    2|         2|
|    3|  tomato|  15000| null|      null|
| null|    null|   null|    4|         5|
+-----+--------+-------+-----+----------+

full outer join도 join method만 변경하여 사용 가능하지만,

full outer join은 join method로서 'full', 'outer' 모두 사용 가능합니다.

 

 

 


 

 

inner join 다중조건

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

spark = SparkSession.builder.getOrCreate()

df_item = pd.DataFrame({
    'id': [1, 2, 3],
    'name': ['apple', 'banana', 'tomato'],
    'price': [20000, 3500, 15000],
    'valid': [1, 1, 1]
})
df_spark_item = spark.createDataFrame(df_item)
df_saprk_item.show()

df_qty = pd.DataFrame({
    'id': [1, 2, 4],
    'quantity': [3, 2, 5],
    'valid': [1, 0, 0]
})
df_spark_qty = spark.createDataFrame(df_qty)
df_spark_qty.show()


print('-- inner join --')
print('ex 1')
df_joined = df_spark_item.join(df_spark_qty,
                               [df_spark_item.id == df_spark_qty.id, df_spark_item.valid == df_spark_qty.valid],
                               'inner') # 1
df_joined.show()


print('ex 2')
df_joined = df_spark_item.join(df_spark_qty,
                               [df_spark_item.id == df_spark_qty.id, df_spark_item.valid == df_spark_qty.valid],
                               'inner').drop(df_spark_qty.id).drop(df_spark_qty.valid) # 1
df_joined.show()


print('ex 3')
df_joined = df_spark_item.join(df_spark_qty, ['id', 'valid'], 'inner') # 2
df_joined.show()


print('ex 4')
df_joined = df_spark_item.join(df_spark_qty, ['id'], 'inner') # 3
df_joined.show()





-- Result
+---+--------+-------+------+
| id|    name|  price| valid|
+---+--------+-------+------+
|  1|   apple|  20000|     1|
|  2|  banana|   3500|     1|
|  3|  tomato|  15000|     1|
+---+--------+-------+------+

+---+----------+------+
| id|  quantity| valid|
+---+----------+------+
|  1|         3|     1|
|  2|         2|     0|
|  4|         5|     0|
+---+----------+------+

-- inner join --
ex 1
+---+--------+-------+------+---+----------+------+
| id|    name|  price| valid| id|  quantity| valid|
+---+--------+-------+------+---+----------+------+
|  1|   apple|  20000|     1|  1|         3|     1|
+---+--------+-------+------+---+----------+------+

ex 2
+---+--------+-------+------+----------+
| id|    name|  price| valid|  quantity|
+---+--------+-------+------+----------+
|  1|   apple|  20000|     1|         3|
+---+--------+-------+------+----------+

ex 3
+---+--------+-------+------+----------+
| id|    name|  price| valid|  quantity|
+---+--------+-------+------+----------+
|  1|   apple|  20000|     1|         3|
+---+--------+-------+------+----------+

ex 4
+---+--------+-------+------+----------+------+
| id|    name|  price| valid|  quantity| valid|
+---+--------+-------+------+----------+------+
|  1|   apple|  20000|     1|         3|     1|
|  2|  banana|   3500|     1|         2|     0|
+---+--------+-------+------+----------+------+

만약 join 조건을 2개 이상 사용하려면 join 조건들을 list에 모두 담아 전달하면 됩니다.

 

 

1. 2개의 dataframe의 id column이 동일함과 동시에 valid column이 동일한 행만을 join한다는 조건입니다.

결과를 보면 id, valid column이 모두 존재하는 것을 볼 수 있습니다.

 

2. 1번과 동일하지만 join 후 df_spark_qty의 id, valid column을 drop함으로써 결과 dataframe에 중복된 이름의 column이 없도록 한 것입니다.

 

3. 1번과 동일하지만 join 조건을 column이름으로만 명시하여 id, valid column이 결과 dataframe에 중복되어 return되지 않도록 한 것입니다.

 

4. 조건을 1개만 명시할 때에도 list에 담아 전달할 수 있습니다.

 

 

 

 

 

 

728x90
반응형
Comments