Lập trình concurrency cơ bản với Go

Intro

Lập trình concurrency (đa luồng) là một chủ đề khó nhằn đối với mọi ngôn ngữ lập trình (đã có kha khá đầu sách viết về chủ đề này đối với mỗi ngôn ngữ). Với một ngôn ngữ sinh ra để xử lý các vấn đề về backend cho Google như Go, concurrency là một trong những vấn đề được chú trọng hàng đầu và được gói gọn trong câu slogan sau:

Do not communicate by sharing memory; instead, share memory by communicating.

Go khuyến khích cách tiếp cận mỗi thread chỉ access đến giá trị chia sẻ tại đúng một thời điểm nhất định và giá trị chia sẻ này được truyền giữa các thread thông qua các kênh giao tiếp. Có thể hình dung cơ chế này giống như việc chạy hai chương trình single-threaded trên 1 CPU và để 2 chương trình này trao đổi thông tin với nhau, quá trình trao đổi thông tin sẽ đóng vai trò đồng bộ hoá dữ liệu giữa 2 chương trình này.

Bài viết này sẽ giới thiệu các khái niệm cơ bản về concurrency trong Go

Goroutine

Goroutine là một lightweight thread model được quản lý bởi Go runtime. Cơ chế của goroutine khá là đơn giản: 1 function tồn tại một cách đa luồng với các goroutine khác trên cùng một không gian bộ nhớ.

Để khởi tạo một goroutine ta chỉ cần thêm phía trước một function call hay method call từ khoá go:

go f(x, y, z)

Dòng code phía trên sẽ khởi tạo một goroutine thực hiện lời gọi function f(x, y, z). Giá trị x, y, z sẽ được khởi tạo tại goroutine hiện tại (main thread) còn việc thực hiện hàm f sẽ được xảy ra tại goroutine mới được tạo ra. Khi function đã hoàn tất các công việc của mình, goroutine cũng sẽ tự động exit theo. Ta có thể tham khảo ví dụ vô cùng đơn giản dưới đây:

go list.Sort() // => Hàm sort sẽ được chạy song song với thread hiện tại

Ngoài ra, ta có thể khởi tạo một goroutine sử dụng một function literal như sau

go func() {
        time.Sleep(100)
        fmt.Println('Waiting...')
}() 

sync.Mutex

Các goroutine đều tồn tại trên cùng một không gian bộ nhớ, do đó việc truy cập đến một gía trị chung trong bộ nhớ cần phải được đồng bộ. Dù go không khuyến khích cách làm này, nhưng có những trường hợp đặc biệt khi ta chỉ muốn có goroutine duy nhất truy cập đến một giá trị trong một thời điểm để tránh conflict và không quan tâm đến việc communication với các goroutine khác.

Ta gọi cơ chế như vậy và mutual exclusion hay mutex. Tương tự như Java, Go cung cấp các công cự có sẵn để implment cơ chế mutex: sync.Mutex và 2 hàm LockUnlock của nó. Ta có thể tham khảo ví dụ sau:

package main

import (
	"fmt"
	"sync"
	"time"
)

type SafeCounter struct {
	v   map[string]int
	mux sync.Mutex
}

func (c *SafeCounter) Inc(key string) {
	c.mux.Lock()
	c.v[key]++
	c.mux.Unlock()
}

func (c *SafeCounter) Value(key string) int {
	c.mux.Lock()
	defer c.mux.Unlock()
	return c.v[key]
}

func main() {
	c := SafeCounter{v: make(map[string]int)}
	for i := 0; i < 1000; i++ {
		go (&c).Inc("test")
	}

	time.Sleep(time.Second)
	fmt.Println(c.Value("test"))
}

Ta đã tạo 1000 goroutine cùng update giá trị counter cho key test, và sử dụng cơ chế mutex để đảm bảo trong một thời điểm có duy nhất 1 goroutine đọc và ghi giá trị counter (thông qua LockUnlock).

Channel

Channel là một cấu trúc dữ liệu với type được định sẵn dùng để gửi và nhận giá trị thông qua toán tử channel <-

ch <- v    // Gửi v tới channel ch.
v := <-ch  // Nhận giá trị từ channel ch và gán nó vào biến v

Channel phải được khởi tạo trước khi sử dụng và giá trị gửi nhận của nó phải được định sẵn type

ch := make(chan int) // Khởi tạo một channel với giá trị gửi nhận là int

Mặc định việc gửi và nhận sẽ block cho đến khi qua trình còn lại hoàn tất (gửi rồi mới được nhận, nhận xong rồi thì mới được gửi tiếp). Cơ chế này giúp cho các goroutine có thể sử dụng channel để đồng bộ mà không cần phải sử dụng lock. Ta có thể tham khảo ví dụ sau:

package main

import "fmt"

func sum(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	c <- sum // send sum to c
}

func main() {
	s := []int{7, 2, 8, -9, 4, 0, 10, 12}

	c := make(chan int)
	go sum(s[:len(s)/2], c)
	go sum(s[len(s)/2:], c)
	x, y := <-c, <-c // receive from c

	fmt.Println(x, y, x+y)
}

Đoạn code trên thực hiện tính tổng của một dãy số, bằng cách chia nó làm hai, tính tổng từng phần trong mỗi goroutine riêng biệt, sau đó mỗi goroutine sẽ gửi kết quả vào channel c. Ở main thread, giá trị gửi vào channel c sẽ được nhận và lần lượt gán vào hai biến x và y.

Buffered channel

Mặc định thì channel sẽ chỉ có thể nhận được 1 giá trị rồi sẽ bị block, nó phải gửi giá trị này đi mới có thể nhận tiếp được giá trị khác (unbuffered channel). Tuy nhiên với buffered channel, việc gửi giá trị đến channel sẽ chỉ bị block khi buffer của channel này đã bị đầy, việc nhận giá trị từ buffer sẽ bị block khi buffer bị trống.

Để khởi tạo 1 buffered channel, ta dùng cú pháp như sau

ch := make(chan int, 100) // Tạo 1 channel với buffer có dung lượng 100

Ta có thể đóng 1 channel khi không còn có giá trị nào để gửi vào nữa sử dụng hàm close. Ta cũng có thể test xem channel đã bị đóng hay chưa sử dụng phương thức sau:

v, ok := <-ch // ok sẽ là false nếu như channel đã bị đóng và không còn giá trị nào có thể nhận được nữa

Ngoài ra, ta cũng có thể loop để nhận hết giá trị từ channel cho tới khi nó empty bằng cách như sau

for i := range c {
        fmt.Println(i)
}

Sử dụng buffered channel, ta có thể lập trình cơ chế semaphore (giới hạn truy cập đến giá trị chia sẻ cho nhiều goroutine). Tham khảo ví dụ code sau:

var sem = make(chan int, MaxNumRequest)

func Serve(queue chan *Request) {
        for req := range queue {
                sem <- 1
                go func(req *Request) {
                        process(req)
                        <-sem
                }(req)
    }
}

Đoạn code ở trên có thể hiểu đơn giản có chức năng giới hạn xử lý request cho một server. Khi số request chưa vượt quá ```MaxNumRequest```, với mỗi request ta sẽ khởi tạo một goroutine để xử lý request đó. Nếu số request vượt quá giới hạn này, do ta channel ```sem``` với tác dụng như là một semaphore sẽ block (do buffer bị đầy) cho đến khi quá trình xử lý các request hiện tại hoàn tất, có slot trống (giải phóng từ channel)

Select

Sử dụng select, một goroutine có thể chờ nhận giá trị gửi từ nhiều channel để xử lý.

select {
case c <- x:
	x, y = y, x+y
case <-quit:
	fmt.Println("quit")
	return
default:
	fmt.Println("Waiting...")
	time.Sleep(50 * time.Millisecond)
}

select sẽ block cho đến khi 1 trong những case của nó có thể thực hiện được (có giá trị trả về từ channel). Trong trường hợp các case đều có giá trị trả về thì select sẽ chọn random. Ngược lại, khi không có giá trị gì trả về và block default được định nghĩa thì đoạn code trong block này sẽ được thưc hiện

Kết luận

Bài viết đã lướt qua các khái niệm cơ bản khi lập trình concurrency với Go. Sử dụng các khái niệm cơ bản này, ta có thể implement các cơ chế lập trình phức tạp hơn như là channels of channels, parallelism,...

Tham khảo