Study Anything ๐ง
๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ - PySpark์์ SQL ์ฌ์ฉํ๊ธฐ ๋ณธ๋ฌธ
๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ - PySpark์์ SQL ์ฌ์ฉํ๊ธฐ
์ 2022. 3. 12. 22:48์ ๋ฒ ํฌ์คํธ์์ ์ค๋ช ํ๋ฏ์ด ์ด๋ฒ ํ๋ก์ ํธ์์๋ ๋ฐฐ๊ตฌ ์ ์๋ค์ ๋ฅ๋ ฅ์น๋ฅผ ํตํด ํ ๋ณ ๊ฒฝ๊ธฐ์์ ์ด๋ค ํ์ด ์น๋ฆฌํ ์ง๋ฅผ ์์ธกํ๋ ๋ชจ๋ธ์ ๋ง๋ค ๊ฒ์ด๋ค.
๊ทธ๋ฌ๊ธฐ ์ํด์๋ ์ ์ ๋ฅ๋ ฅ์น๋ฅผ ํ๋จํ ์ ์๋ ์งํ๊ฐ ํ์ํ๋ฐ, ์ด์ ์ ๋ชจ์๋ ๋จ์ ๋ฐ์ดํฐ๋ก๋ ํ ๋์ ํ์ ํ๊ธฐ ์ด๋ ต๋ค.
ํ์ฌ ๋ฐ์ดํฐ์ ์ปฌ๋ผ์๋ ์ด๋ฆ, ์์ ํ, ํฌ์ง์ , ์ด ์ ์ ์ธ์๋ ๊ณต๊ฒฉ ์๋, ๊ณต๊ฒฉ ์ฑ๊ณต, ๋ฆฌ์๋ธ ์๋, ๋ฆฌ์๋ธ ์ ํ, ๋ฆฌ์๋ธ ์คํจ, ์๋ธ ์ฑ๊ณต, ์ธํธ ์ฑ๊ณต, ๋ธ๋กํน ์ฑ๊ณต, ๋๊ทธ ์ฑ๊ณต ์ด ์๋ค.
์ด ์ค ๊ณต๊ฒฉ ์งํ๋ก ์ฌ์ฉํ ์ ์๋ ๊ณต๊ฒฉ ์ฑ๊ณต๋ฅ ๊ณผ ์๋น ์งํ๋ก ์ฌ์ฉํ ์ ์๋ ๋ฆฌ์๋ธ ํจ์จ์ ๊ตฌํ๊ณ , ์ฐธ์ฌํ ์ธํธ ์๋ฅผ ํตํด ์ธํธ๋น ์๋ธ, ์ธํธ, ๋ธ๋กํน, ๋๊ทธ ๊ฐ์๋ฅผ ๊ตฌํด์ ์๋กญ๊ฒ ๋ฐ์ดํฐ๋ฅผ ๋ง๋ค ๊ฒ์ด๋ค.
์์ ๋ด์ฉ์ผ๋ก ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ๋ฅผ ํด ๋ณผ ํ ๋ฐ, ์ด๋ค ๋๊ตฌ๋ฅผ ์ฌ์ฉํ ๊น ๊ณ ๋ฏผํ๋ค๊ฐ ํ์ด์คํํฌ๋ฅผ ์ฌ์ฉํ๋ ์คํํฌ์์ ์ ๊ณตํ๋ SQL์ ์ฌ์ฉํด๋ณด๊ธฐ๋ก ํ๋ค.
๋จผ์ , ์คํํฌ๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด ์ฌ์ฉํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ค์ import ํ๊ณ ์ธ์ ์ ์ด๊ธฐํํ๋ค.
# spark ์์ฑ
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
ํฌ๋กค๋งํด์ ๋ง๋ ์ ์ ๋ฐ์ดํฐ๋ฅผ ํ๋ก์ ์ ์ฅํ๊ณ ๋ถ๋ฌ์จ๋ค.
csv ํ์ผ์ ํ๋ก์ ์ ์ฅํ๊ณ ์ฅฌํผํฐ ๋ ธํธ๋ถ์ ํตํด ํ์ด์คํํฌ๋ฅผ ์ฌ์ฉํ๋ ๋ด์ฉ์ ์ด์ ์ ํฌ์คํธ๋ก ์์ฑํด๋์๋ค.
data = spark.read.csv("hdfs://localhost:9000/data/player.csv", header="true", inferSchema="true")
๋ถ๋ฌ์จ ๋ฐ์ดํฐ๋ฅผ ํ์ธํด๋ณด์๋ค. show() ๋ฉ์๋๋ฅผ ์์ฑ๊ฐ ์ง์ ์์ด ์ฌ์ฉํ๋ฉด ์์ 20์ค์ ๋ด์ฉ์ ํ์ธํ ์ ์๋ค.
๋ฐ์ดํฐ๋ฅผ ํ ์ด๋ธ๋ก ์ฌ์ฉํ๊ธฐ ์ํด ์ ์ธํด์ค๋ค.
data.createOrReplaceTempView("player")
๊ธฐ์กด ๋ฐ์ดํฐ ๋ด์ฉ์ ํ ๋๋ก ๊ณต๊ฒฉ ์ฑ๊ณต๋ฅ , ๋ฆฌ์๋ธ ํจ์จ, ์ธํธ๋น ์๋ธ/์ธํธ/๋ธ๋กํน/๋๊ทธ ๋ฅผ ๊ณ์ฐํด์ ๊ตฌํ๊ณ ํ์ธํด๋ณธ๋ค.
# ๊ณต์ฑ, ๊ณตํจ(๋ณด๋ฅ), ๋ฆฌํจ, ์ธํธ๋น ์๋ธ/์ธํธ/๋ธ๋กํน/๋๊ทธ ๊ณ์ฐ
player_rate = spark.sql("""
select name,team,pos,score,error,setcount,round(at_succ/at_try,2) as at_srate,
round((rs_corr-rs_fail)/rs_try,2) as rs_rate, round(sv_succ/setcount,2) as sv,
round(set_succ/setcount,2) as set,round(bl_succ/setcount,2) as bl,
round(dg_succ/setcount,2) as dg
from player
""")
player_rate.show()
๋ง์ฐฌ๊ฐ์ง๋ก ๊ฐ๊ณตํ ๋ฐ์ดํฐ๋ฅผ ํ ์ด๋ธ๋ก ์ฌ์ฉํ๊ธฐ ์ํด ์ ์ธํด์ฃผ์๋ค.
player_rate.createOrReplaceTempView("player_rate")
SQL์ WHERE ์ ์ ์ด์ฉํด ๊ฐ ๊ตฌ๋จ๋ณ๋ก ๋ฐ์ดํฐ๋ฅผ ๋๋ ์ฃผ์๋ค.
hgun_rate = spark.sql("""
select *
from player_rate
where team like 'ํ๋๊ฑด์ค'
""")
hgun_rate.show(5)
๊ฐ๊ณตํ ์ ์ ๋ฐ์ดํฐ์ ๊ตฌ๋จ ๋ฐ์ดํฐ๋ฅผ ํ๋ก์ csv ํ์ผ๋ก ์ ์ฅํ๊ณ ํ์ธํด๋ณธ๋ค.
# ์ ์ ๋ฐ์ดํฐ ์ ์ฅ - ํ๋ก
player_rate.write.format('csv').save('hdfs://localhost:9000/data/player_r.csv',header='true')
data = spark.read.csv("hdfs://localhost:9000/data/player_r.csv", header='true', inferSchema='true')
data.show(5)
# ๊ตฌ๋จ๋ณ ๋ฐ์ดํฐ ์ ์ฅ - ํ๋ก
hgun_rate.write.format('csv').save('hdfs://localhost:9000/data/hgun_r.csv',header='true')
dogong_rate.write.format('csv').save('hdfs://localhost:9000/data/dogong_r.csv',header='true')
gscal_rate.write.format('csv').save('hdfs://localhost:9000/data/gscal_r.csv',header='true')
insam_rate.write.format('csv').save('hdfs://localhost:9000/data/insam_r.csv',header='true')
ibk_rate.write.format('csv').save('hdfs://localhost:9000/data/ibk_r.csv',header='true')
hguk_rate.write.format('csv').save('hdfs://localhost:9000/data/hguk_r.csv',header='true')
pepper_rate.write.format('csv').save('hdfs://localhost:9000/data/pepper_r.csv',header='true')
# ๋ฐ์ดํฐ ์ ์ ์ฅ๋๋์ง ํ์ธ
data = spark.read.csv("hdfs://localhost:9000/data/pepper_r.csv", header='true', inferSchema="true")
data.show(5)
๋ฐ์ดํฐ๊ฐ ์ ์ ์ฅ๋์๋์ง ํ๋ก์์ ํ์ธํด๋ณด์๋ค.
์ด๋ ๊ฒ ๋ฐ์ดํฐ๋ฅผ ๊ฐ๊ณตํด์ ํ๋ก์ ์ ์ฅํ๋ ๊ฒ๊น์ง ์๋ฃํ๊ณ , ์์ผ๋ก๋ ์ด ๋ฐ์ดํฐ๋ค์ ํตํด ๊ฒฝ๊ธฐ ๊ฒฐ๊ณผ๋ฅผ ์์ธกํ ์ ์๋ ๋ชจ๋ธ์ ๋ง๋ค์ด ๋ณผ ์์ ์ด๋ค.
'ํ๋ก์ ํธ > [DA] ๋ฐ์ดํฐ ๋ถ์ : ๋ฐฐ๊ตฌ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
๋ฐ์ดํฐ ์์ง (์น ํฌ๋กค๋ง) (3) - ๊ฒฝ๊ธฐ ์ธ๋ถ ๋ฐ์ดํฐ (0) | 2022.03.27 |
---|---|
๋ฐ์ดํฐ ์์ง (์น ํฌ๋กค๋ง) (2) - ๊ฒฝ๊ธฐ ๋ฐ์ดํฐ (0) | 2022.03.26 |
๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ - ๊ฒฐ์ธก๊ฐ ์ฒ๋ฆฌ (0) | 2022.03.13 |
ํ๋ก์ ํธ ์ค๋ช & ๋ฐ์ดํฐ ์์ง (์น ํฌ๋กค๋ง), ์ ์ฅ - ์ ์ ๋ฐ์ดํฐ (0) | 2022.03.11 |