Lập trình bất đồng bộ trong Scala với Future

Scala có thể sử dụng được tất cả các thư viện của Java nên ta hoàn toàn có thể sử dụng các tính năng về lập trình song song của Java (Thread, Runnable…) để phục vụ cho việc xử lý bất đồng bộ khi code Scala. Tuy nhiên, bản thân Scala cũng có những tính năng phục vụ riêng cho việc lập trình bất đối xứng với API ở mức abstract hơn, đồng thời dễ dàng tích hợp với các API mà Scala cung cấp.

Một tính năng có sẵn trong standard library của Scala là Future, ngoài ra còn có Async, thư viện cung cấp tính năng bổ trợ cho Future, và Akka, một thư viện rất mạnh phục vụ cho việc lập trình song song. Vì Akka khá phức tạp, xứng đáng có một bài viết riêng về nó, còn Async thì mới chỉ ở dưới dạng SIP (Scala Improvement Process) và chỉ ở dạng optional module,  nên nội dùng bài viết này sẽ chỉ tập trung giới thiệu về Future.

Future là gì?

Future là gì? Một object Future là một object mà giá trị của nó sẽ được khỏi tại tại một thời điểm nào đó trong tương lại. Giá trị này thường là kết quả của một quá trình tính toán nào đó.

Quá trình tính toán này nếu trả về giá trị kết quả thì ta nói là object Future này đã hoàn thành với giá trị đó, còn ngược lại nếu một Exception được trả về thì object Future được gọi là đã thất bại với Exception kia. Một đặc tính của Future là khi một Future khi đã hoàn thành dù là thành công hay thất bại thì nó sẽ không thể thay đổi được nữa (trở nên immutable).

Việc khởi tạo một Future cũng khá đơn giản như sau.

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val f: Future[Int] = Future {
  16 * 8
}

Ở đây ta đã đóng gọi một phép nhân đơn giản vào trong một object Future với type Future[Int], dù trong thực tế không ai đóng gói một phép tính đơn giản như vậy mà thường là một quá trình tính toán mất thời gian như IO, network… Để sử dụng Future thì chúng ta cần có một ExecutionContext.

Ở đây ExecutionContext.global đã được sử dụng, ExecutionContext này sử dụng java.util.concurrent.ForkJoinPool để quản lý một thread pool thực hiện những công việc tính toán được đóng gói trong Future. Với một số tinh chỉnh, ta có thể sử dụng ExecutionContext.global trong hầu hết các trường hợp. Một số trường hợp đặc biệt (như là long running blocking code trong Future) thì một Execution Context tuỳ biến (wrapper cho Java Executor) có thể được sử dụng.

Future in Action

Ta hãy cùng xem một ví dụ đơn giản về Future:

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global

def factorial(n: BigInt): BigInt = {
  if (n <= 1) 1
  else n * factorial(n - 1)
}

val futures = (0 to 9) map {
  i => Future {
    println(s"Input: $i")
    val s = factorial(i)
    println(s"Result for $i: $s")
    s.toString()
  }
}


val f = Future.reduce(futures) { (s1, s2) => s1 + ',' + s2 }

val n = Await.result(f, Duration.Inf)

println(n)

Đoạn code trên sẽ cho ra kết quả như sau:

Input: 1
Input: 0
Result for 0: 1
Result for 1: 1
Input: 2
Result for 2: 2
Input: 4
Input: 3
Result for 3: 6
Input: 5
Result for 4: 24
Result for 5: 120
Input: 6
Result for 6: 720
Input: 8
Result for 8: 40320
Input: 9
Result for 9: 362880
Input: 7
Result for 7: 5040
1,1,2,6,24,120,720,5040,40320,362880

Ở đây ta đã dùng 10 Future để thực hiện tính factorial cho mười số từ 0 đến 9. Quá trình tính cho mỗi số đều được chạy bất đồng bộ, như đã thấy ở output ở trên. Mỗi lần chạy thứ tự các dòng Input và Result sẽ là khác nhau, chỉ có thứ tự in của kết quả cuối là không đổi do hàm reduce xét lần lượt thứ tự các phần tử của chuỗi futures theo thứ tự khởi tạo. Ngoài ra, ta còn sử dụng Await.result  để block tiến trình chính cho đến khi Future f hoàn thành.

Tuy nhiên việc block tiến trình chính trong nhiều trường hợp có thể làm ảnh hưởng đến chương trình. Trong những trường hợp như vậy, ta có thể sử dụng callback.

Callback với Future

Để có thể xử lý kết quả của Future một cách bất đồng bộ (không làm block tiến trình chính) ta có thể đăng ký callback với Future. Khi Future hoàn thành thì hàm callback này sẽ được gọi một cách bất đồng bộ. Cách đăng ký callback thông dụng nhất là cung cấp một function với type Try[T] => Unit cho method onComplete của Future.

Dưới đây là một ví dụ về calback với onComplete:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}

def factorial(n: BigInt): BigInt = {
  if (n <= 1) 1
  else n * factorial(n - 1)
}

val futures = (0 to 9) map {
  i => Future { factorial(i) }
}

val fs = futures map {
  _ flatMap { x =>
    if (x <= 10000) Future.successful(x)
    else Future.failed(new RuntimeException(s"$x > 10000"))
  }
}

def doComplete: Function[Try[BigInt], Unit] = {
  case s @ Success(_) => println(s)
  case f @ Failure(_) => println(f)
}

fs map (_ onComplete doComplete)

Kết quả của đoạn code trên sẽ là như sau (mỗi lần thứ tự các dòng kết quá sẽ khác nhau do tính chất bất đồng bộ của Future).

Success(1)
Success(2)
Success(6)
Success(24)
Success(120)
Success(720)
Success(5040)
Failure(java.lang.RuntimeException: 40320 > 10000)
Failure(java.lang.RuntimeException: 362880 > 10000)
Success(1)

Ở đây ta sử dụng lại đoạn code tính factorial cho 10 số dùng Future, tuy nhiên có thêm phần check kết quả nếu như lớn hơn 10000 thì ta sẽ cho kết quả của Future là thất bại (đóng gói một Exception rồi trả về). Trong đoạn code ở trên, với mỗi Future trong chuỗi fs, chúng ta đều đăng ký một callback cho method onComplete là hàm doComplete. Ngoài ra, ta cũng thấy Future, tương tự như Option, Try, Either... hay các collection như List... là một cấu trúc dữ liệu mang tính chất Monad nên Future cung cấp những hàm như flatMap, filter...; một Future cũng thể được sử dụng với cấu trúc for comprehension của Scala.

Ngoài method onComplete sử dụng để đăng ký callback xử lý cả 2 trường hợp Future thành công lẫn thất bại, Future còn cung cấp 2 method onSuccess và onFailure để đăng ký callback xử lý riêng cho từng trường hợp. Sử dụng onSuccessonFailure thì đoạn code ở trên sẽ trở thành như sau:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

def factorial(n: BigInt): BigInt = {
  if (n <= 1) 1
  else n * factorial(n - 1)
}

val futures = (0 to 9) map {
  i => Future { factorial(i) }
}

val fs = futures map {
  _ flatMap { x =>
    if (x <= 10000) Future.successful(x)
    else Future.failed(new RuntimeException(s"$x > 10000"))
  }
}

fs map { f =>
  f.onSuccess { case res => println(res) }
  f.onFailure { case err => println("Error: " + err.getMessage) }
}

Và do onSuccessonFailure khác với onComplete sẽ lấy trực tiếp các kết quả và exception được đóng gói trong Success(...)Failure(...) nên ta sẽ có kết quả như sau:

1
2
6
24
120
720
5040
Error: 40320 > 10000
Error: 362880 > 10000
1

Kết luận

Với API dễ sử dụng, Future là một tính năng rất hữu dụng khi lập trình song song, bất đồng bộ trong Scala. Tuy nhiên do thiếu những công cụ để quản lý các tiến trình bất đồng bộ, xử lý Exception một cách hiệu quả... nên Future không thích hợp với những hệ thống song song lớn. Với những hệ thống này, Akka là một lựa chọn tốt hơn.

Bài viết đã giới thiệu các tính năng cơ bản về Future, tuy nhiên còn một số tính năng chưa được đề cập đến như sử dụng Promise với Future, optimize global Execution Context hay blocking code trong Future... Bạn đọc có thể tìm hiểu thêm ở các đường link reference đi kèm với bài viết này.

Tham khảo