Đầu tiên, bạn có biết Spark là gì chưa nhỉ.
Nếu chưa biết Spark là gì thì bạn nên tìm hiểu Spark, cũng như làm quen với nó trước nhé.
Bạn có thể tìm hiểu Spark qua link sau: Các bài viết liên quan đến Spark
I. Vậy, Spark DataFrame là gì ấy nhỉ ?
Ngày xửa, ngày xưa, khi Spark ver 1.3 ra đời, Spark đã đẻ thêm tính năng có tên là Spark DataFrame. Vậy nó có gì hay ho nhỉ ?
- Có thể thiết lập Schema cho Spark RDD và có thể tạo Object DataFrame.
Chưa thấy hay lắm nhỉ, dùng Spark RDD có sao đâu...
Thế bạn thao tác dữ liệu chỉ sử dụng RDD thấy gặp vấn đề gì phức tạp không ?
Viết code có khó khăn không ? Rồi vấn đề về hiệu năng ?
- Giống như viết SQL, đầy đủ chức năng như select, where ... đặc biệt là join với các DataFrame khác.
- Sử dụng các method như filter, select để trích xuất dữ liệu theo cột, hàng.
- Xử gọn các loại data như Log ... với groupBy → agg
- Thêm 1 cột dễ dàng với UDF(User Defined Function)
- Giống như SQL, Spark DataFrame đã hỗ trợ Pivot (Spark 1.6 trở lên) rất hữu ích cho việc lập bảng biểu, báo cáo.
Tóm lại là dễ xài, đơn giản hơn RDD mà hiệu suất, khả năng optimized truy vấn tốt hơn RDD.
Chính vì vậy với các trường hợp thông thường, các bạn nên xài DataFrame.
II. Tạo DataFrame như thế nào nhỉ ?
Bài viết này sẽ sử dụng file log này để thực hành nhé. Link dowload. Cấu trúc file Log gồm 3 cột : Date, User_ID, Campaign_ID
click.at user.id campaign.id
4/27/2015 20:40 144012 Campaign077
4/27/2015 00:27 24485 Campaign063
4/27/2015 00:28 24485 Campaign063
4/27/2015 00:33 24485 Campaign038
4/27/2015 01:00 24485 Campaign063
4/27/2015 16:10 145066 Campaign103
Cách 1: Tạo từ RDD
Nếu bạn đã có RDD với tên column và type tương ứng (TimestampType, IntegerType, StringType)thì bạn có thể dễ dàng tạo DataFrame bằng
sqlContext.createDataFrame(my_rdd, my_schema)
Với printSchema(), dtypes
sẽ in thông tin của schema
và count()
trả về số record
Và nếu chỉ muốn in n record đầu tiên thì sử dụng show(n)
fields = [StructField("access_time", TimestampType(), True), StructField("userID", IntegerType(), True), StructField("campaignID", StringType(), True)]
schema = StructType(fields)
whole_log_df = sqlContext.createDataFrame(whole_log, schema)
print whole_log_df.count()
print whole_log_df.printSchema()
print whole_log_df.dtypes
print whole_log_df.show(5)
#327430
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
#
#[('access_time', 'timestamp'), ('userID', 'int'), ('campaignID', 'string')]
#
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 20:40:...|144012|Campaign077|
#|2015-04-27 00:27:...| 24485|Campaign063|
#|2015-04-27 00:28:...| 24485|Campaign063|
#|2015-04-27 00:33:...| 24485|Campaign038|
#|2015-04-27 01:00:...| 24485|Campaign063|
Cách 2: Tạo trực tiếp từ file CSV
from pyspark.shell import spark
from pyspark.sql.types import *
# Định nghĩa Schema
struct = StructType([
StructField('a', StringType(), False),
StructField('b', StringType(), False),
StructField('c', StringType(), False)
])
# Tạo DataFrame từ file CSV
df_data = spark.read.csv('click_data_sample', struct)
Ngoài cách trên còn rất nhiều cách khác, có thể kể đến là gọi cổ 1 thằng thuộc họ nhà Spark Package tên là spark-csv ra xài, hỗ trợ nhiều method hữu ích, dễ chơi, dễ trúng thưởng hơn.
Lưu ý là nếu không định nghĩa schema thì tất cả các column sẽ có kiểu string. Nhưng nếu dùng thêm thằng em inferSchema
thì mọi chuyện sẽ êm xuôi. ^^ Không tin đơn giản như vậy ư, bạn thử như code dưới xem ^^
whole_log_df_2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/user/hadoop/click_data_sample.csv")
print whole_log_df_2.printSchema()
print whole_log_df_2.show(5)
#root
# |-- click.at: string (nullable = true)
# |-- user.id: string (nullable = true)
# |-- campaign.id: string (nullable = true)
#
#+-------------------+-------+-----------+
#| click.at|user.id|campaign.id|
#+-------------------+-------+-----------+
#|2015-04-27 20:40:40| 144012|Campaign077|
#|2015-04-27 00:27:55| 24485|Campaign063|
#|2015-04-27 00:28:13| 24485|Campaign063|
#|2015-04-27 00:33:42| 24485|Campaign038|
#|2015-04-27 01:00:04| 24485|Campaign063|
whole_log_df_3 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("click_data_sample.csv")
print whole_log_df_3.printSchema()
#root
# |-- click.at: timestamp (nullable = true)
# |-- user.id: integer (nullable = true)
# |-- campaign.id: string (nullable = true)
Nhân tiện đây, đôi lúc tên column không được chuẩn như Lê Duẩn thì bạn có thể dễ dàng thay đổi tên column bằng withColumnRenamed
.
Và cũng tiện đây, có 1 lưu ý là về cơ bản DataFrame là imutable
(ông cha ta hay gọi với cái tên thuần Tàu là bất biến
) nên hễ có thay đổi nội dung của DataFrame thì 1 DataFrame mới sẽ được tạo để lưu trữ những thay đổi đó.
whole_log_df_4 = whole_log_df_3.withColumnRenamed("click.at", "access_time")\
.withColumnRenamed("user.id", "userID")\
.withColumnRenamed("campaign.id", "campaignID")
print whole_log_df_4.printSchema()
#root
# |-- access_time: timestamp (nullable = true)
# |-- userID: integer (nullable = true)
# |-- campaignID: string (nullable = true)
Cách 3: Giao lưu trực tiếp từ file json
Bằng cách sử dụng sqlContext.read.json
. Mỗi dòng của file json sẽ được coi là 1 object. Trong trường hợp object thiếu data thì sẽ null tại đó.
# test_json.json gồm 3 dòng như dưới, dòng cuối không có "campaignID"
#
#{"access_time": "2015-04-27 20:40:40", "userID": "24485", "campaignID": "Campaign063"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485", "campaignID": "Campaign038"}
#{"access_time": "2015-04-27 00:27:55", "userID": "24485"}
df_json = sqlContext.read.json("test_json.json")
df_json.printSchema()
df_json.show(5)
#root
# |-- access_time: string (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: string (nullable = true)
#
#+-------------------+-----------+------+
#| access_time| campaignID|userID|
#+-------------------+-----------+------+
#|2015-04-27 20:40:40|Campaign063| 24485|
#|2015-04-27 00:27:55|Campaign038| 24485|
#|2015-04-27 00:27:55| null| 24485|
#+-------------------+-----------+------+
Cách 4: Giao thông trực tiếp từ parquet
Nếu bạn chưa biết parquet là gì thì tham khảo tại đây nhé. Với rất nhiều ưu điểm, mình sẽ giới thiệu vào 1 bài nào đó ở lúc nào đó nhé. ^^
Và cách đọc, đơn giản như đan rổ là sử dụng sqlContext.read.parquet
. Như đoạn code bên dưới, cả folder chứa file parquet sẽ được đọc. Quá nhanh gọn lẹ phải không.
sqlContext.read.parquet("/user/hadoop/parquet_folder/")
III. Thao tác với DataFrame
1. Thử query bằng SQL
Bằng cách sử dụng registerTempTable
, bạn sẽ có một table được tham chiếu đến Dataframe đó, bạn có thể sử dụng tên table này để viết query SQL. Nếu bạn sử dụng sqlContext.sql('query SQL')
thì giá trị trả về cũng là Dataframe.
Có 1 lưu ý là: Bạn cũng có thể viết subquery nhưng subquery cần được gán Alias, nếu không toạch lô (Syntax error) đấy.
#SQL query
whole_log_df.registerTempTable("whole_log_table")
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").count()
#18081
print sqlContext.sql(" SELECT * FROM whole_log_table where campaignID == 'Campaign047' ").show(5)
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:26:...| 14151|Campaign047|
#|2015-04-27 05:27:...| 14151|Campaign047|
#|2015-04-27 05:28:...| 14151|Campaign047|
#+--------------------+------+-----------+
#Trường hợp thêm biến số vào trong câu SQL
for count in range(1, 3):
print "Campaign00" + str(count)
print sqlContext.sql("SELECT count(*) as access_num FROM whole_log_table where campaignID == 'Campaign00" + str(count) + "'").show()
#Campaign001
#+----------+
#|access_num|
#+----------+
#| 2407|
#+----------+
#
#Campaign002
#+----------+
#|access_num|
#+----------+
#| 1674|
#+----------+
#Trường hợp Sub Query:
print sqlContext.sql("SELECT count(*) as first_count FROM (SELECT userID, min(access_time) as first_access_date FROM whole_log_table GROUP BY userID) subquery_alias WHERE first_access_date < '2015-04-28'").show(5)
#+------------+
#|first_count |
#+------------+
#| 20480|
#+------------+
2. Tìm kiếm sử dụng filter, select
Đối với DataFrame , tìm kiếm kèm điều kiện rất đơn giản. Giống với câu query ở trên nhưng filter, select
dễ dàng hơn rất nhiều. Vậy filter
và select
khác nhau thế nào ?
Cùng là để tìm kiếm nhưng filter
trả về những row thoả mãn điều kiện, trong đó select
lấy dữ liệu theo column.
#Ví dụ filter
print whole_log_df.filter(whole_log_df["access_time"] < "2015-04-28").count()
#41434
print whole_log_df.filter(whole_log_df["access_time"] > "2015-05-01").show(3)
#+--------------------+------+-----------+
#| access_time|userID| campaignID|
#+--------------------+------+-----------+
#|2015-05-01 22:11:...|114157|Campaign002|
#|2015-05-01 23:36:...| 93708|Campaign055|
#|2015-05-01 22:51:...| 57798|Campaign046|
#+--------------------+------+-----------+
#Ví dụ select
print whole_log_df.select("access_time", "userID").show(3)
#+--------------------+------+
#| access_time|userID|
#+--------------------+------+
#|2015-04-27 20:40:...|144012|
#|2015-04-27 00:27:...| 24485|
#|2015-04-27 00:28:...| 24485|
#+--------------------+------+
3. Sử dụng groupBy
groupBy có chức năng giống với reduceByKey của RDD, nhưng nó còn cung cấp 1 rổ method. Ở đây mình sẽ run thử code count, agg.
groupBy→count
Ví dụ sau sẽ lấy key là campaignID
và tiến hành groupBy
. Sau đó dùng count()
để lấy số record ứng với mỗi key.
print whole_log_df.groupBy("campaignID").count().sort("count", ascending=False).show(5)
#+-----------+-----+
#| campaignID|count|
#+-----------+-----+
#|Campaign116|22193|
#|Campaign027|19206|
#|Campaign047|18081|
#|Campaign107|13295|
#|Campaign131| 9068|
#+-----------+-----+
print whole_log_df.groupBy("campaignID", "userID").count().sort("count", ascending=False).show(5)
#+-----------+------+-----+
#| campaignID|userID|count|
#+-----------+------+-----+
#|Campaign047| 30292| 633|
#|Campaign086|107624| 623|
#|Campaign047|121150| 517|
#|Campaign086| 22975| 491|
#|Campaign122| 90714| 431|
#+-----------+------+-----+
groupBy→pivot
Pivot thì từ Spark v1.6 trở lên được đưa vào, có chức năng giống với pivot trong SQL. Thử áp dụng xem sao nhỉ :
Trước khi pivot (agged_df
)
-
Số hàng = số
UserID(=75,545) * campainID(=133)
-
Số cột =
3
Sau khi pivot(pivot_df
)
- Số hàng = số
UserID(=75,545)
- Số cột =
UserID + số CampainID = 1 + 133 = 134
Tất nhiên là bạn phải groupBy(cột giữ nguyên).pivot(cột muốn chuyển sang ngang).sum()
agged_df = whole_log_df.groupBy("userID", "campaignID").count()
print agged_df.show(3)
#+------+-----------+-----+
#|userID| campaignID|count|
#+------+-----------+-----+
#|155812|Campaign107| 4|
#|103339|Campaign027| 1|
#|169114|Campaign112| 1|
#+------+-----------+-----+
#Những cell không có giá trị -> null
pivot_df = agged_df.groupBy("userID").pivot("campaignID").sum("count")
print pivot_df.printSchema()
#root
# |-- userID: integer (nullable = true)
# |-- Campaign001: long (nullable = true)
# |-- Campaign002: long (nullable = true)
# ..
# |-- Campaign133: long (nullable = true)
#TH muốn add 0 vào cell NULL
pivot_df2 = agged_df.groupBy("userID").pivot("campaignID").sum("count").fillna(0)
4. Thêm cột sử dụng UDF
Trong Spark DataFrame có thể sử dụng UDF, với ứng dụng chính là thêm cột. Như đã nói ở trên, bản chất DataFrame là immutable nên khi thêm cột thì 1 DataFrame mới sẽ được tạo.
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
def add_day_column(access_time):
return int(access_time.strftime("%Y%m%d"))
my_udf = UserDefinedFunction(add_day_column, IntegerType())
print whole_log_df.withColumn("access_day", my_udf("access_time")).show(5)
#+--------------------+------+-----------+----------+
#| access_time|userID| campaignID|access_day|
#+--------------------+------+-----------+----------+
#|2015-04-27 20:40:...|144012|Campaign077| 20150427|
#|2015-04-27 00:27:...| 24485|Campaign063| 20150427|
#|2015-04-27 00:28:...| 24485|Campaign063| 20150427|
#|2015-04-27 00:33:...| 24485|Campaign038| 20150427|
#|2015-04-27 01:00:...| 24485|Campaign063| 20150427|
#+--------------------+------+-----------+----------+
Cũng có thể sử dụng lambda
my_udf2 = UserDefinedFunction(lambda x: x + 5, IntegerType())
print whole_log_df.withColumn("userID_2", my_udf2("userID")).show(5)
#+--------------------+------+-----------+--------+
#| access_time|userID| campaignID|userID_2|
#+--------------------+------+-----------+--------+
#|2015-04-27 20:40:...|144012|Campaign077| 144017|
#|2015-04-27 00:27:...| 24485|Campaign063| 24490|
#|2015-04-27 00:28:...| 24485|Campaign063| 24490|
#|2015-04-27 00:33:...| 24485|Campaign038| 24490|
#|2015-04-27 01:00:...| 24485|Campaign063| 24490|
#+--------------------+------+-----------+--------+
Ngược lại, muốn xóa cột thì sử dụng df.drop()
print whole_log_df.drop("userID").show(3)
#+--------------------+-----------+
#| access_time| campaignID|
#+--------------------+-----------+
#|2015-04-27 20:40:...|Campaign077|
#|2015-04-27 00:27:...|Campaign063|
#|2015-04-27 00:28:...|Campaign063|
#+--------------------+-----------+
5. Join 2 DataFrame
Tính năng rất quan trọng khi xử lý dữ liệu chính là join.
Mình sẽ sử dụng data đầu vào ban đầu để tạo ra 1 DataFrame mới chứa những userID có count >100
heavy_user_df1 = whole_log_df.groupBy("userID").count()
heavy_user_df2 = heavy_user_df1.filter(heavy_user_df1 ["count"] >= 100)
print heavy_user_df2 .printSchema()
print heavy_user_df2 .show(3)
print heavy_user_df2 .count()
#root
# |-- userID: integer (nullable = true)
# |-- count: long (nullable = false)
#
#+------+-----+
#|userID|count|
#+------+-----+
#| 84231| 134|
#| 13431| 128|
#|144432| 113|
#+------+-----+
#
#177
Được heavy_user_df2
rồi tiến hành join (mặc định là inner join).
Các kiểu join bao gồm : inner, outer, left_outer, rignt_outer
joinded_df = whole_log_df.join(heavy_user_df2, whole_log_df["userID"] == heavy_user_df2["userID"], "inner").drop(heavy_user_df2["userID"]).drop("count")
print joinded_df.printSchema()
print joinded_df.show(3)
print joinded_df.count()
#root
# |-- access_time: timestamp (nullable = true)
# |-- campaignID: string (nullable = true)
# |-- userID: integer (nullable = true)
#None
#+--------------------+-----------+------+
#| access_time| campaignID|userID|
#+--------------------+-----------+------+
#|2015-04-27 02:07:...|Campaign086| 13431|
#|2015-04-28 00:07:...|Campaign086| 13431|
#|2015-04-29 06:01:...|Campaign047| 13431|
#+--------------------+-----------+------+
#
#38729
6. Lấy data theo cột trong DataFrame
- Lấy lable của cột (
df.columns
) -> Trả về list tên cột (not DataFrame ) - Lấy riêng 1 cột (
df.select("userID").map(lambda x: x[0]).collect()
) -> Trả về list userID (not RDD/Dataframe) - Lấy cột distinct, chỉ cần thêm .distinct()
print whole_log_df.columns
#['access_time', 'userID', 'campaignID']
print whole_log_df.select("userID").map(lambda x: x[0]).collect()[:5]
#[144012, 24485, 24485, 24485, 24485]
print whole_log_df.select("userID").distinct().map(lambda x:x[0]).collect()[:5]
#[4831, 48631, 143031, 39631, 80831]
7. Từ DataFrame , tạo RDD
Có 2 cách chính là:
- Sử dụng
map()
: mỗi hàng của DataFrame được chuyển sang RDD theo dạng list - Sử dụng
.rdd
: mỗi hàng của DataFrame được chuyển sang RDD Row Object (Tức là mỗi hàng sẽ là 1 Object) Tiếp theo sử dụng.asDict()
với Row Object để chuyển về RDD Key-Value.
#convert to rdd by ".map"
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).take(5)
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]
# rdd -> normal list can be done with "collect".
print whole_log_df.groupBy("campaignID").count().map(lambda x: [x[0], x[1]]).collect()[:5]
#[[u'Campaign033', 786], [u'Campaign034', 3867], [u'Campaign035', 963], [u'Campaign036', 1267], [u'Campaign037', 1010]]
#convert to rdd by ".rdd" will return "Row" object
print whole_log_df.groupBy("campaignID").rdd.take(3)
#[Row(campaignID=u'Campaign033', count=786), Row(campaignID=u'Campaign034', count=3867), Row(campaignID=u'Campaign035', count=963)]
#`.asDict()` will convert to Key-Value RDD from Row object
print whole_log_df.groupBy("campaignID").rdd.map(lambda x:x.asDict()).take(3)
#[{'count': 786, 'campaignID': u'Campaign033'}, {'count': 3867, 'campaignID': u'Campaign034'}, {'count': 963, 'campaignID': u'Campaign035'}]
IV. Tổng kết
Trên đây mình đã giới thiệu với các bạn những kiến thức cơ bản về DataFrame, và tất nhiên là bạn có thể làm nhiều điều hơn(ngoài phạm vi bài viết này) với DataFrame. Nhưng kiến thức cơ bản không bao giờ thừa phải không.Hẹn gặp lại các bạn ở các bài viết sau.