[Spark: The Definitive Guide] | April 23, 2022
Spark: The Definitive Guide 내용 정리
./bin/spark-submit \
--master local \
./examples/pi.py 10
Statically typed (정적 타입): 자료형이 고정된 언어. Java, Scala, C, C++ 등.
Dynamically typed (동적 타입): Python, JavaScript 등
정적 데이터셋의 data를 분석해 DataFrame을 생성
이 때 정적 데이터셋의 schema도 함께 생성
# Read static dataset
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/data/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema
staticSchema
>>>
StructType(List(StructField(InvoiceNo,StringType,true),StructField(InvoiceDate,TimestampType,true),...))
# Filter
from pyspark.sql.functions import window, col
staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)
>>>
+----------+--------------------+-----------------+
|CustomerId| window| sum(total_cost)|
+----------+--------------------+-----------------+
| 16057.0|[2011-12-05 00:00...| -37.6|
| 14126.0|[2011-11-29 00:00...|643.6300000000001|
| 13500.0|[2011-11-16 00:00...|497.9700000000001|
| 17160.0|[2011-11-08 00:00...|516.8499999999999|
| 15608.0|[2011-11-11 00:00...| 122.4|
+----------+--------------------+-----------------+
only showing top 5 rows
read
method 대신 readStream
method 사용maxFilesPerTrigger
옵션: 한 번에 읽을 파일 수 설정
아래의 예시를 streaming 답게 만들어 주는 역할이지만, production 환경에서는 적용하는 것을 추천하지 않는다.
# Read streaming data
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("/data/retail-data/by-day/*.csv")
streamingDataFrame
>>> DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]
streamingDataFrame.isStreaming
>>> True
기존 DataFrame 처리와 동일한 business logic을 적용
이 작업 역시 lazy operation이므로 data flow를 실행하기 위해 streaming action을 호출해야 한다.
# Filter (same as static example above)
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")
purchaseByCustomerPerHour.isStreaming
>>> True
Streaming action
Trigger가 실행된 다음, data를 갱신할 in-memory table에 데이터를 저장
아래의 예시에서는 파일마다 trigger 실행 (Production 환경에서 사용하는 것은 좋지 않다.)
Spark는 이전 집계값보다 더 큰 값이 발생한 경우에만 in-memory table을 갱신하므로, 언제나 가장 큰 값을 얻을 수 있다.
# Write to in-memory table
purchaseByCustomerPerHour.writeStream\
.format("memory")\ # "memory": in-memory table에 저장 ("console": 콘솔에 결과 출력)
.queryName("customer_purchases")\ # in-memory에 저장될 table name
.outputMode("complete")\ # 모든 count 수행 결과를 table에 저장
.start()
Stream이 시작되면 쿼리 실행 결과가 어떤 형태로 in-memory table에 기록되는지 확인 가능
더 많은 data를 읽을수록 (각 파일에 있는 data에 따라) 결과가 변경될 수 있고 변경되지 않을 수도 있다.
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)
>>>
+----------+--------------------+------------------+
|CustomerId| window| sum(total_cost)|
+----------+--------------------+------------------+
| 17450.0|[2011-09-20 00:00...| 71601.44|
| null|[2011-11-14 00:00...| 55316.08|
| null|[2011-11-07 00:00...| 42939.17|
| null|[2011-03-29 00:00...| 33521.39999999998|
| null|[2011-12-08 00:00...|31975.590000000007|
+----------+--------------------+------------------+
only showing top 5 rows
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
.setOutputCol("features")
from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])
fittedPipeline = transformationPipeline.fit(trainDataFrame)
transformedTraining = fittedPipeline.transform(trainDataFrame)
transformedTraining.cache()
Spark에서 머신러닝 모델을 학습시키는 과정은 크게 두 단계로 진행된다.
첫 번째 단계는 아직 학습되지 않은 모델을 초기화하고, 두 번째 단계는 해당 모델을 학습시킨다.
MLlib의 DataFrame API에서 제공하는 모든 algorithm은 항상 두 가지 유형으로 구성되어 있으며, 학습 전에는 Algorithm으로 학습 후에는 AlgorithmModel의 명명규칙을 따른다.
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
.setK(20)\
.setSeed(1L) # 1L only works in Python 2
kmModel = kmeans.fit(transformedTraining)
kmModel.computeCost(transformedTraining)
transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computCost(transformedTest)