Sử dụng PySpark để cập nhật dữ liệu cho bảng có cấu trúc nested trên Hive
Trong quy trình xử lý big data hiện tại, Spark và Hive thường được sử dụng chung với nhau:
- Spark đóng vai trò là engine xử lý data
- Hive là datawarehouse lưu trữ dữ liệu đã xử lý từ Spark
Thông thường, việc cập nhật dữ liệu từ Spark vào bảng trên Hive khá đơn giản, chỉ mất 1 vài dòng code, tuy nhiên trong trường hợp khi bảng trên Hive có cấu trúc phức tạp như 1 trường có cấu trúc nested thì việc xử lý sẽ cần tốn công một chút. Bài viết này sẽ cung cấp 2 cách để giải quyết vấn đề này sử dụng Python interface của Spark (PySpark)
Cấu trúc bảng tham khảo
- Trên Hive, ta tạo một bảng với định nghĩa như sau. Có thể thấy trường
transaction_settings
có cấu trúc nested khá phức tạp: 1 array của struct, bên trong struct lại có 1 array của struct nữa.
CREATE TABLE transaction_history(
currency string
,description string
,transaction_settings array<struct<trans_condition:int, trans_params:array<struct<apply_time:string, amount:double>>, other_params:map<string, string>>>
,update_datetime timestamp
)
STORED AS ORC
TBLPROPERTIES('orc.compress'='SNAPPY')
- Nếu input trực tiếp dữ liệu vào bảng này theo kiểu thông thường (trực tiếp put data từ dataframe vào bảng hive) như ở dưới đây thì chắc chắn lỗi sẽ xảy ra, do spark không map được giữa cấu trúc và type của dataframe với bảng trong hive.
from datetime import datetime
l = [('USD/JPY', 'First transaction', [(1, [('20:00:00', 1000.0), ('21:00:00', 2000.0)], {'return_val': 2000}, datetime.now())]
l_df = sqlContext.createDataFrame(l, ['currency', 'description', 'transaction_settings', 'update_datetime'])
l_df.registerTempTable('l_db')
sqlContext.sql('insert into transaction_history select * from transaction_history')
Phương pháp 1: Tạo dataframe với cấu trúc nested một cách thủ công
- Ở phương pháp này, với mỗi một nested data structure ta sẽ tạo dựng một
Row
riêng. Trước tiên ta tạo hàm để chuyển đổi cấu trúc dữ liệu:
def map_structure(r):
Setting = Row('trans_condition', 'trans_param', 'other_params')
Param = Row('apply_time', 'amount')
currency = r[0]
description = r[1]
update_time = r[3]
trans_setting = r[2]
processed_trans_setting = []
for t in trans_settings:
params = map(lambda x: Param(*r), t[1])
processed_t = Setting(t[0], params, t[2])
processed_trans_setting.append(processed_t)
return (currency, description, processed_trans_setting, update_time)
- Dùng hàm đã tạo, ta có thể chuyển đổi cấu trúc dữ liệu của data thành dataframe có cấu trúc nested giống như yêu cầu và lưu vào bảng Hive một cách đơn giản
from datetime import datetime
l = [('USD/JPY', 'First transaction', [(1, [('20:00:00', 1000.0), ('21:00:00', 2000.0)], {'return_val': 2000}, datetime.now())]
process_l = map(map_structure, l)
l_df = sqlContext.createDataFrame(l, ['currency', 'description', 'transaction_settings', 'update_datetime'])
l_df.registerTempTable('l_db')
sqlContext.sql('insert into transaction_history select * from transaction_history')
Phương pháp 2: Định nghĩa schema của dataframe trước rồi sử dụng schema này khởi tạo dataframe
- Ở phương pháp này, ta sẽ định nghĩa schema của dataframe trước như sau
schema = StructType([
StructField("currency", StringType(), False),
StructField("description", StringType(), True),
StructField("transaction_settings", ArrayType(StructType([
StructField("trans_condition", IntegerType(), False),
StructField("trans_param", ArrayType(StructType([
StructField("apply_time", StringType(), False),
StructField("amount", DoubleType(), False)
]), False)
StructField("other_params", MapType(StringType(), StringType(), True))
]))),
StructField("update_datetime", TimestampType(), False)
])
- Sử dụng schema đã khởi tạo, từ dữ liệu đầu vào ta tạo dataframe và insert dữ liệu của dataframe vào bảng Hive
from datetime import datetime
l = [('USD/JPY', 'First transaction', [(1, [('20:00:00', 1000.0), ('21:00:00', 2000.0)], {'return_val': 2000}, datetime.now())]
l_rdd = sparkContext.parallelize(l)
l_df = sqlContext.createDataFrame(l_rdd, schema)
l_df.registerTempTable('l_db')
sqlContext.sql('insert into transaction_history select * from transaction_history')
Kết luận
Khi làm việc với dữ liệu thì việc phải xử lý những cấu trúc dữ liệu phức tạp tuy không thường gặp nhưng khó tránh khỏi. Bài viết cung cấp 2 phương pháp để xử lý những trường hợp như vậy khi sử dụng PySpark và Hive. Phương pháp thứ 2 giúp ta kiểm soát cấu trúc của dataframe 1 cách trực quan hơn và cũng dễ hiểu hơn nên là phương pháp được khuyến khích sử dụng.