1. Lựa chọn RDD Operation thích hợp

  • RDD hỗ trợ 2 loại operation: transformationaction. Khi kết hợp các transformationaction khác nhau, có thể cho cùng một kết quả. Tuy nhiên hiệu năng của chúng lại khác nhau, nên việc lựa chọn operation một cách thích hợp có thể cải thiện hiệu năng của chương trình.

  • Sử dụng reduceByKey hoặc aggregateByKey sẽ cho hiệu năng tốt hơn groupByKey

Ví dụ với bài toán đếm từ, sau đây là 2 cách dùng groupByKeyreduceByKey

val words = Array("one", "two", "two", "three", "three", "three")
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _).collect()
val wordCountsWithGroup = wordPairsRDD.groupByKey() .map(t => (t._1, t._2.sum)) .collect()

Cả 2 cách đều cho kết quả đúng, nhưng reduceByKey sẽ hoạt động tốt hơn trên tập dữ liệu lớn.

reduceByKey sẽ kết hợp kết quả trên mỗi partition trước khi shuffle dữ liệu (đọc từ tất cả partition để tìm tất cả các value của các key, gộp dữ liệu cùng key vào cùng 1 partition và tính toán kết quả). Shuffle operation là một phép toán phức tạp, liên quan đến disk I/O và network I/O.

  • Tránh sử dụng pattern flatMap-join-groupBy

Khi bạn có 2 tập dữ liệu đã group theo key và muốn join chúng và giữ chúng vẫn được group thì có thể dùng cogroup

  • Đừng sử dụng collect() với tập dữ liệu lớn

Collect sẽ copy tất cả element của tập dữ liệu vào driver program và sẽ dễ bị OutOfMemoryError. Hãy *filter *dữ liệu trước và có thể sử dụng take() để chắc chắn số element được trả về.

  • Filter dữ liệu sớm nhất có thể. Nó sẽ giảm được kích thước dữ liệu được shuffle

2. Xác định số partition hợp lý

Khi mà kích thước task quá lớn dễ dẫn đến OutOfMemoryError, để giảm kích thước mỗi task ta có thể tăng số lượng task cùng xử lý. Có thể tăng số partition bằng cách thiết lập thuộc tính spark.default.parallelism hoặc truyền các tham số của các hàm.

1. groupByKey([numTasks])
2. reduceByKey(func, [numTasks])
3. aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
4. sortByKey([ascending], [numTasks])
5. join(otherDataset, [numTasks])
6. cogroup(otherDataset, [numTasks])
7. coalesce(numPartitions)
8. repartition(numPartitions)
9. repartitionAndSortWithinPartitions(partitioner)

Hoặc sử dụng hàm partitionBy

val partitioner = new HashPartitioner(NUM_PARTITIONS)
A.partitionBy(partitioner)

3. Cache

Một trong những khả năng quan trọng của Spark là persisting (hay caching). Khi RDD được persist sau khi tính toán trong action đầu, nó sẽ được lưu trên memory của các node và sẽ được sử dụng lại trong action khác.

Spark sẽ không xử lý cho đến khi action được gọi. Như hình trên, RDD1 và RDD2 sẽ được tính toán 2 lần khi Job1 và Job2. Với dữ liệu lớn thì rất ảnh hưởng đến hiệu năng.

Nếu mình persist RDD2 thì sau Job1 thì RDD2 được tính toán và cache; không phải tính lại trong Job2.

Tham khảo 1 số level cache:RDD Persistence Storage Level

Một số chú ý khi chọn level cache:

  • Với RDD không lớn nên chọn MEMORY_ONLY, nó cho phép operation xử lý nhanh nhất có thể
  • Đừng chọn MEMORY_AND_DISK trừ khi tập dữ liệu đã được tính toán phức tạp (khi mà tính toán lại sẽ chậm hơn đọc từ đĩa). Còn nếu không có thể cho phép 1 số partition có thể không được cache và phải tính toán lại khi cần, có thể sẽ nhanh hơn đọc từ đĩa

Mặc dù Spark tự động persist 1 số dữ liệu trung gian trong các shuffle operation, ngay cả khi người dùng không gọi, nhưng nên dùng hàm persist với các RDD đã được tính toán khi có kế hoạch dùng lại chúng.

4. Hiểu Spark parameter và thiết lập giá trị tốt nhất cho nó

Sau đây là 1 số parameter quan trọng, nếu thiết lập giá trị hợp lý có thể cải thiện hiệu năng

  • num-executors
  • driver-memory
  • executor-memory
  • executor-cores
  • spark.shuffle.consolidateFiles: Mặc định là false (khi hệ thống dùng filesystem là ext4 hay xfs thì nên thiết lập là true)
  • conf spark.shuffle.memoryFraction: Phần Java heap dùng để tập hợp dữ liệu khi shuffle (khi spark.shuffle.spill = true) trên memory
  • conf spark.storage.memoryFraction: Phần Java heap dùng cho Spark memory cache

Tham khảo