[Spark: The Definitive Guide] | April 17, 2022
Spark: The Definitive Guide 내용 정리
main()
함수를 실행대화형 모드(python의 경우 command: pyspark
)로 spark를 시작하면 SparkSession이 자동으로 생성된다.
spark
>>> <pyspark.sql.session.SparkSession object at 0x7fe4d2281280>
이 숫자들은 distributed collection이다.
my_range = spark.range(1000).toDF("number")
my_range
>>> DataFrame[number: bigint]
my_range.toPandas()
>>>
number
0 0
1 1
2 2
3 3
4 4
.. ...
995 995
996 996
997 997
998 998
999 999
[1000 rows x 1 columns]
아래의 코드를 실행해도 결과는 출력되지 않는다.
추상적인 transformation만 지정한 상태이기 때문에, action을 호출하지 않으면 실제 transformation을 수행하지 않는다.
even_num = my_range.where("number % 2 = 0")
Pipelining이 자동으로 수행된다.
즉, DataFrame에 여러 필터를 지정하는 경우 모든 작업이 메모리에서 일어난다.
Example: DataFrame의 Predicate Pushdown(조건절 푸시다운)
복잡한 spark job이 원시 data에서 하나의 row만 가져오는 filter를 가지고 있다면, 필요한 record 하나만 읽는 것이 가장 효율적이다.
spark는 이 filter를 data source로 위임하는 최적화 작업을 자동으로 수행한다.
예를 들어 데이터 저장소가 database라면 where 절의 처리를 database에 위임하고, spark는 하나의 record만 받는다.
아래의 예시에서 spark Job은 filter (narrow transformation)를 수행한 후 partition 별로 record 수를 count (wide transformation) 한다.
그리고 각 언어에 적합한 native 객체에 결과를 모은다.
이 때 spark가 제공하는 spark UI로 cluster에서 실행 중인 spark job을 monitoring 할 수 있다.
even_num.count()
>>> 500
http://localhost:4040
)inferSchema
: DataFrame의 schema 정보를 알아내는 schema inference 기능 사용 가능
각 column의 data type을 추론하기 위해 적은 양의 data를 읽는다.
header
: true인 경우 파일의 첫 row를 header로 지정data = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("/data/summary.csv")
take
action을 호출하면, head
명령과 같은 결과를 확인할 수 있다.read
⇒ DataFrame ⇒ take(n)
⇒ Array(Row(…), Row(…))data.take(2)
>>>
[Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Romania', count=15),
Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Ireland', count=344)]
sort
method는 DataFrame을 변경하지 않는다.sort
method를 사용하면, 이전 DataFrame을 변환한 새로운 DataFrame을 생성해 return 한다.read (narrow transformation)
⇒ DataFrame ⇒ sort (wide transformation)
⇒ DataFrame ⇒ take(n)
⇒ Array(…)data.sort("count").explain()
>>>
== Physical Plan ==
*(2) Sort [count#130 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#130 ASC NULLS FIRST, 200) # 200은 partition 개수
+- *(1) FileScan csv [DEST_COUNTRY_NAME#128,ORIGIN_COUNTRY_NAME#129,count#130] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
read (narrow transformation)
⇒ DataFrame (1 partition) ⇒ sort (wide transformation)
⇒ DataFrame (5 partitions) ⇒ take(n)
⇒ Array(…)spark.conf.set("spark.sql.shuffle.partitions", "5")
data.sort("count").take(2)
>>>
[Row(DEST_COUNTRY_NAME=u'United States', ORIGIN_COUNTRY_NAME=u'Singapore', count=1),
Row(DEST_COUNTRY_NAME=u'Moldova', ORIGIN_COUNTRY_NAME=u'United States', count=1)]
createOrReplaceTempView
: 모든 DataFrame을 table이나 view(임시 table)로 등록한다.
아래의 코드 실행 하면 SQL query를 실행할 수 있게 된다.
data.createOrReplaceTempView("data_table")
spark.sql
은 새로운 DataFrame을 return (spark
는 SparkSession
의 변수)# SQL
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM data_table
GROUP BY DEST_COUNTRY_NAME
""")
sqlWay.explain()
>>>
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#128], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#128, 5)
+- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#128], functions=[partial_count(1)])
+- *(1) FileScan csv [DEST_COUNTRY_NAME#128] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
# DataFrame
dataFrameWay = data\
.groupBy("DEST_COUNTRY_NAME")\
.count()
dataFrameWay.explain()
>>>
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#128], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#128, 5)
+- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#128], functions=[partial_count(1)])
+- *(1) FileScan csv [DEST_COUNTRY_NAME#128] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/data/flight-data/csv/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>