Tăng hiệu năng khi sử dụng join trong Apache Spark

Trong Apache Spark, khi muốn kết hợp 2 RDD (Resilient Distributed Dataset) với định dạng (key, value), tổng hợp những dữ liệu có cùng key rồi tính toán trên những giá trị của dữ liệu đó để có 1 data set cuối cùng cũng với định dạng (key, value) thì sử dụng hàm join là một trong những sự lựa chọn thông dụng nhất. Tuy nhiên, khác với khi sử dụng hàm join trong SQL hay các hệ quản trị cơ sỡ dữ liệu không phân tán khác, vì Spark theo cấu trúc phân tán (data được chia thành các partition ở trên nhiều node khác nhau trong cluster), nên khi sử dụng join khó tránh khỏi việc luân chuyển data giữa các node. Các dữ liệu có cùng key phân tán trên các partition ở các node khác nhau, sẽ được thu thập về các partition trên cùng một node (map) sau đó Spark sẽ thực hiện kết hợp các gía trị và đưa ra kết quả cuối cùng (reduce). Việc luân chuyển dữ liệu này được gọi là shuffle trong Spark và theo tài liệu của Spark thì nên hạn chế sử dụng những hàm gây ra shuffle vì shuffle thường làm chậm tốc độ xử lý và hiệu năng của chương trình (network overhead, memory overhead và disk write/read cho dữ liệu được shuffle). Tuy nhiên, khi xử lý dữ liệu như đã nói ở trên, việc dùng join thường là không tránh khỏi. Để có thể cải thiện hiệu năng và giảm ảnh hưởng của shuffle khi sử dụng join một cách nhiều nhất ta có thể áp dụng một số các phương pháp sau đây

1. Nếu join một RDD lớn với một RDD nhỏ thì ta có thể transform RDD nhỏ thành 1 lookup map rồi broadcast lên tất cả các node và sử dụng map-side join (dù dùng từ join nhưng ở đây ta không hề dùng đến hàm join).

Thay vì viết

bigRDD.join(smallRDD)

thì ta sẽ viết

val bcSmallRDD = sc.broadcast(smallRDD.collect.toMap) bigRDD.map { case(key, value) =>    val otherVal = bcSmallRDD.value.get(key)   (key, (value, otherVal)) }

Số dòng code có thể dài hơn nhưng hiệu năng của chương trình có thể được cải thiện rất nhiều. Trong một số trường hợp này, data của RDD lớn có thể không bị shuffle.

2. Khi join một RDD lớn với một RDD có độ lớn trung bình (không qua lớn nhưng không đủ để có thể lưu trong memory của một node) thì ta có thể sử dụng phương pháp sau.

Khi join thì tất cả những dữ liệu (key, value) trong RDD lớn mà không có dữ liệu với cùng key tương ứng trong RDD nhỏ hơn đều sẽ bị loại bỏ. Ta có thể lợi dùng điều này để dùng filter, tạo ra một RDD mới từ RDD lớn chỉ với những key có tồn tại trong RDD nhỏ. RDD mới này sẽ có lượng dữ liệu nhỏ hơn do vậy sẽ giảm số lượng data bị luân chuyển giữa các node khi join.

val bcKeys = sc.broadcast(mediumRDD.keys.collect.toSet) val filteredRDD = bigRDD.filter { case(key, value) => bcKeys.value.contains(key) } filteredRDD.join(mediumRDD)

Tuy nhiên, phương pháp này phụ thuộc vào số lượng các dữ liệu bị loại bỏ khi filter, nếu con số này không lớn thì phương pháp này cũng không đem lại hiệu quả nhiều

3. Partition data một cách hợp lý trước khi join

a. Chọn số partition cho mỗi RDD một cách hợp lý

Các RDD trong Spark sẽ được chia thành các partition, mỗi một core CPU trong cluster sẽ có khả năng xử lý một partition trong 1 thời điểm (tương ứng với một task). Nếu không muốn sử dụng giá trị default của Spark, ta có thể tuỳ chỉnh số partition của một RDD. Nếu điều chỉnh số partition quá lớn ta sẽ có quá nhiều task so với số CPU trong cluster dẫn đến việc xử lý chậm, còn nếu để số partition quá nhỏ thì sẽ không tận dụng được tính phân tán của Spark, một CPU sẽ phải xử lý 1 số quá lớn dữ liệu (có thể dẫn đến Out Of Memory) trong khi các CPU khác không có việc để làm. Khi sử dụng join, nếu 1 RDD có n partition, 1 RDD có m partition thì số partition sinh ra trong lúc Spark thực hiện join sẽ là **n*m. **Do vậy, việc lựa chọn số partition cho mỗi table có ảnh hưởng khá lớn đến performance của join.

b. Chọn partitioner hợp lý cho mỗi RDD và thực hiện partition mỗi RDD trước khi join

Sau khi đã chọn được số partition phù hợp thì ta sẽ sử dụng Partitioner để chia data trong RDD thành các partition. Đối với các định dạng dữ liệu (key, value) thì Partitioner mặc định sẽ là HashPartitioner (sử dụng kết quả của method hashCode của giá trị key để phân chia data thành các partition, các partition có cùng kết quả này sẽ được shuffle về cùng 1 partition). Khi thực hiện join, nếu Spark thấy 2 RDD chưa được partition theo partitioner nào thì Spark sẽ hash tất cả các key của cả 2 RDD và di chuyển những dữ liệu cùng key về cùng 1 node, sau đó join chúng lại. Tuy nhiên, nếu các RDD này đã được partition với 1 Partitioner, thì Spark sẽ tận dụng thông tin này và hạn chế việc di chuyển dữ liệu. Nếu chúng ta partition RDD1 với 1 Hash Partitioner và cũng partition RDD 2  với cùng partitioner đó, thì trong trường hợp tốt nhất, các dữ liệu cùng keys sẽ đều đã nằm trên các partition trên cùng 1 node và việc shuffle dữ liệu sẽ không xảy ra. Trong trường hợp xấu hơn, thì sẽ chỉ có dữ liệu của RDD 2 bị shuffle đến các node có chứa partition của RDD 1 (do vậy ta nên để RDD 2 là RDD nhỏ hơn khi join).

val partitioner = new HashPartitioner(64) val finalRDD = largeRDD.partitionBy(partitioner) .join(smallRDD.partitionBy(partitioner))

Tuy nhiên, chúng ta cũng cần phải chú ý đến việc chọn lựa key cho các RDD. Nếu số lượng các key giống nhau quá nhiều, thì dù số lượng các partition đã định có lớn thì các dữ liệu cùng key cũng sẽ chỉ tập trung về 1 vài partition nhất định có thể dẫn đến việc 1 số partition quá lơn, còn các partition khác không có dữ liệu gì. Điều này dễ gây ra lỗi Out Of Memory hoặc khiến tốc độ xử lý bị chậm đi.

4. Sử dùng DataFrame thay vì sử dụng RDD

Trong các phiên bản sau của Spark (từ 1.4.0 trở đi), SparkSQL cùng với DataFrame đã được cải tiến rất nhiều và thường được đánh giá có hiệu năng tốt hơn RDD. Ta có thể convert RDD thành DataFrame sau đó thực hiện join.

val bigDF = bigRDD.toDF("k", "v") val smallDF = smallDF.toDF("k", "v") bigDF.join(smallDF, "k")

Sử dụng DataFrame có thể đem lại hiệu năng tốt hơn, tuy nhiên logic của code sẽ trở nên phức tạp hơn do phải chuyển đổi dạng dữ liệu qua lại giữa RDD và DataFrame. Ngoài ra, khi sử dụng join trong DataFrame, ta cũng sẽ cần phải tìm hiểu các cách tối ưu riêng cho hàm này trên DataFrame.

Kết luận

Join là một tính năng thông dụng và hữu ích với bất kì một công cụ xử lý dữ liệu nào, Spark cũng không phải là ngoại lệ. Tuy nhiên, nếu không được sử dụng một cách hợp lý và đúng phương pháp thì join có thể trở thành bottleneck trong tiến trình Spark. Bài viết đã cố gắng cung cấp các thông tin để người dùng hiểu rõ hơn về join đồng thời cung cấp 1 số các giải pháp để người dùng có thể sử dụng join một cách hiệu quả hơn khi lập trình ứng dụng Spark.

Tham khảo: