[Spark: The Definitive Guide] | May 14, 2022
Spark: The Definitive Guide 내용 정리
StructField
type filed로 구성된 StructType
objectspark.read.format("json").load("/data/flight-data.json").schema
>>> StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))
col
함수 or column
함수: column을 생성하고 참조하는 간단한 방법
Column은 column name을 catalog에 저장된 정보와 비교하기 전까지 unresolved 상태이다.
Analyzer 단계에서 column과 table을 분석한다. (chapter 4 참조)
from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")
>>> Column<someColumnName>
column을 refer 하는 데 사용하는 col
method는 join 시에 유용하다.
예를 들어 DataFrame의 특정 column을 다른 DataFrame의 join 대상 column에서 참조하기 위해 사용한다.
col
method를 사용해 explicit 하게 column을 정의하면, spark는 analyzer 단계에서 column resolve 절차를 생략할 수 있다.// scala
df.col("count")
expr
함수로 가장 간단히 사용할 수 있다. (column reference도 가능)아래의 3가지 모두 동일한 transformation 과정을 거친다.
expr("someCol - 5")
col("someCol") - 5
expr("someCol") - 5
printSchema
method: DataFrame의 전체 column 정보를 확인할 수 있다.columns
property를 사용한다.spark.read.format("json").load("/data/flight-data.json").columns
>>> ['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
Row object를 직접 생성하려면 DataFrame의 schema와 같은 순서로 값을 명시해야 한다.
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)
Row data에 접근하려면 원하는 위치를 지정하면 된다.
Scala나 java에서는 helper method를 사용하거나 명시적으로 data type을 지정해야 하지만, python이나 R에서는 올바른 data type으로 자동 변환된다.
myRow(0) // Any type
myRow(0).asInstanceOf[String]
myRow.getString(0)
myRow[2]
>>> 1
# 방법 1
df = spark.read.format("json").load("/data/flight-data.json")
# 방법 2
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
mySchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], mySchema)
myDf.show()
>>>
+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null| 1|
+-----+----+-----+
select
method는 string column name을 parameter로 받는다.# SELECT col1, col2 FROM table LIMIT 2
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
>>>
+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
| United States| Romania|
| United States| Croatia|
+-----------------+-------------------+
from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)
>>>
+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+
| United States| United States| United States|
| United States| United States| United States|
+-----------------+-----------------+-----------------+
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME")).show(2)
>>>
+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
| United States|
| United States|
+-----------------+
selectExpr
method는 string expressions를 사용한다.# SELECT *, (col1 = col2) as col3 FROM table LIMIT 2
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show(2)
>>>
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
| United States| Romania| 15| false|
| United States| Croatia| 1| false|
+-----------------+-------------------+-----+-------------+
# SELECT avg(col1), count(distinct(col2)) FROM table LIMIT 2
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)
>>>
+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625| 132|
+-----------+---------------------------------+
# SELECT *, 1 as One FROM table LIMIT 2
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)
>>>
+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
| United States| Romania| 15| 1|
| United States| Croatia| 1| 1|
+-----------------+-------------------+-----+---+
withColumn(column_name, expression)
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
.show(2)
>>>
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
| United States| Romania| 15| false|
| United States| Croatia| 1| false|
+-----------------+-------------------+-----+-------------+
withColumnRenamed(original_name, new_name)
df.columns
>>>
['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
>>>
['dest', 'ORIGIN_COUNTRY_NAME', 'count']
-
) 같은 reserved characters를 column name로 사용하려면 backtick(`
)을 사용해 escaping 해야 하는 경우가 있다.# SELECT col1, col2 as col3 FROM table LIMIT 2
dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`")\
.show(2)
>>>
+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
| Romania|Romania|
| Croatia|Croatia|
+---------------------+-------+
대소문자를 구분하게 하려면 아래와 같이 설정한다.
set spark.sql.caseSensitive true
# SELECT *, cast(col1 as string) AS col2 FROM table
df.withColumn("col2", col("col1").cast("string"))
Spark는 filter의 순서와 상관없이 동시에 모든 filtering 작업을 수행하기 때문에 항상 유용한 것은 아니다.
여러 개의 AND filter를 지정하려면 차례대로 filter를 연결해야 하고, 판단은 spark에 맡겨야 한다.
# SELECT * FROM table WHERE col1 < 2 LIMIT 2
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)
>>>
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Croatia| 1|
| United States| Singapore| 1|
+-----------------+-------------------+-----+
# SELECT * FROM table WHERE col1 < 2 AND col2 != 'something' LIMIT 2
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)
df.where("count < 2 and ORIGIN_COUNTRY_NAME != 'Croatia'").show(2)
>>>
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Singapore| 1|
| Moldova| United States| 1|
+-----------------+-------------------+-----+
from pyspark.sql import Row
schema = df.schema
newRows = [
Row("New Country", "Other Country", 5L), # L only works on python 2
Row("New Country 2", "Other Country 3", 1L)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()
>>>
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Croatia| 1|
| United States| Singapore| 1|
| United States| Gibraltar| 1|
| United States| Cyprus| 1|
| United States| Estonia| 1|
| United States| Lithuania| 1|
| United States| Bulgaria| 1|
| United States| Georgia| 1|
| United States| Bahrain| 1|
| United States| Papua New Guinea| 1|
| United States| Montenegro| 1|
| United States| Namibia| 1|
| New Country 2| Other Country 3| 1|
+-----------------+-------------------+-----+
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
>>>
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| Moldova| United States| 1|
| United States| Croatia| 1|
+-----------------+-------------------+-----+
# SELECT * FROM table ORDER BY col1 DESC, col2 ASC LIMIT 2
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
>>>
+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
| United States| United States|370002|
| United States| Canada| 8483|
+-----------------+-------------------+------+
df.rdd.getNumPartitions()
>>> 1
df.repartition(5)
df.repartition(col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
collect
: 전체 DataFrame의 모든 data를 수집take
: 상위 N개의 row returnshow
: 여러 rows를 보기 좋게 출력toLocalIterator
: iterator로, 모든 partition의 data를 driver에 전달 (data set의 partition을 차례로 반복 처리 가능)
그러나 연산을 병렬로 수행하지 않고, 차례로 수행하기 때문에 매우 큰 처리 비용이 발생