1. Giới thiệu


Là một design pattern được thiết kế dựa trên pattern Master/Slave.

Trong đó:

  • Producer(nhà sản xuất): tạo ra các items, các data. Các items, data sẽ được cho vào một buffer hay 1 queue.
  • Consumer (nhà tiêu thụ): Cùng thời điểm trên nhà tiêu thụ lấy các items, xử lý các data từ trong buffer hoặc queue.

Producer giống như là Master, Consumer thì đóng vai trò như slave. Số lượng Consumer thường là số nhiều.

Ví dụ về Producer – Consumer và một số proplem trong mô hình:

hình ảnh sau (Sưu tầm ảnh trên internet):

producer-consumer

Trong hình ảnh ở trên thì producer là máy bán bia, items là bia, customer là người uống.

Theo như hình số 1: Như bình thường máy rót bia sẽ rót đầy ly và người tiêu dùng lấy ly bia và uống. Nhưng nếu người tiêu dùng muốn uống nhiều nếu theo hình 1 thì uống xong người tiêu dùng phải chờ tiếp để rót bia sau đó uống. Như vậy thấy mất thời gian.

Theo như hình số 2: có 5 ly bia để rót, Producer sẽ rót liên tục vào các ly bia còn rỗng. Consumer sẽ lấy ly bia đã được rót và sử dụng. Ly nào sử dụng xong thì lai có ly mới sẽ được rót vào và luôn luôn trạng thái của số ly bia được rót là 5.

hình số 3, 4, 5 chính là các problem có trong mô hình producer – consumer.

  • hình số 3: Consumer sử dụng items, xử lý dữ liệu khi mà cái đó đang được sản xuất bởi producer.
  • hình số 4: Consumer phải chờ để sử dụng cho tới khi mà Producer sản xuất. Nếu Producer mà bị lỗi thì Consumer chờ vô hạn. Thật là kinh khủng khi mà người tiêu dùng phải chờ không biết tới khi nào có được sản phẩm để sử dụng. Poor Consumer!
  • hình số 5: Ngược lại nếu Consumer mà bận không tiêu thụ được ngay. Trong khi đó sản phẩm tồn đọng sẽ đứng ở dây truyền mãi. Producer lúc này phải chờ để tạo items hoặc data theo kế hoạch. Việc chờ lại trở lên vô tận nếu không có Consumer. Poor Producer!

2.Thiết lập mô hình Producer Consumer với JAVA.


Trong Java thì đã hỗ trợ mô hình Producer – Consumer. Đồng thời cung các class có khả năng tránh được các problem của mô hình Producer – Consumer.

Một bài toán như sau: tạo một chương trình theo mô hình Producer – Consumer trong đó producer có chức năng tạo ra các số tăng dần từ 1->100 tương tự data được sinh ra từ producer số lượng tối đa tồn tại số được sinh ra từ producer là 15 số. Consumer sẽ có 5 consumer. Mỗi Consumer sẽ lấy data được sinh ra từ producer và nhân với 2 sau đó in  ra màn hình.

2.1 Code Sample:

package testProject; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; class ProducerTest { BlockingQueue dataQueue = null; public ProducerTest(BlockingQueue dataQueue) { this.dataQueue = dataQueue; } // create Integer Object then add to queue. If after 1000ms that data not // used then that data is deleted. public void production() throws InterruptedException { for (int i = 0; i < 100; i++) { dataQueue.offer(new Integer(i), ProducerConsumerTest.MAX_TIME_OUT_OFFER, TimeUnit.MILLISECONDS); } } } class ConsumerTest implements Callable { BlockingQueue dataQueue = null; public ConsumerTest(BlockingQueue dataQueue) { this.dataQueue = dataQueue; } @Override public Boolean call() { System.out.println((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")).format(new Date()) + " Consumer Begin" ); boolean isStop = false; try { while (!isStop) { Integer count = dataQueue.poll(ProducerConsumerTest.MAX_TIME_OUT_POOL, TimeUnit.MILLISECONDS); if (count == null) { isStop = true; } else { System.out.println((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")).format(new Date()) + " " + Thread.currentThread().getName() + " count = " + count * 2); } } return true; } catch (Exception e) { return false; } finally { System.out.println((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")).format(new Date()) + " Consumer End" ); } } } public class ProducerConsumerTest { public static final int MAX_QUEUE = 15; public static final int MAX_CONSUMER = 5; public static final int MAX_TIME_OUT_OFFER = 1000; public static final int MAX_TIME_OUT_POOL = 6000; public static void main(String[] args) { // queue for run block queue is json data BlockingQueue dataQueue = new LinkedBlockingQueue<>(MAX_QUEUE); // initialize consumer threads ExecutorService executor = Executors.newFixedThreadPool(MAX_CONSUMER); // start consumer List<Future> futureList = new ArrayList<>(); try { for (int i = 1; i <= MAX_CONSUMER; i++) { Future f = executor.submit(new ConsumerTest(dataQueue)); futureList.add(f); } // start producer ProducerTest producer = new ProducerTest(dataQueue); producer.production(); // check consumer result boolean hasError = false; for (Future consumerRes : futureList) { if (consumerRes.get() == Boolean.FALSE) { hasError = true; } } if (hasError) { System.out.println("There was an error"); } } catch (ExecutionException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { System.out.println((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")).format(new Date()) + " Finished!"); executor.shutdown(); } } }

2.2 Kết quả:

2016/05/20 23:53:42 Consumer Begin pool-1-thread-5 2016/05/20 23:53:42 Consumer Begin pool-1-thread-4 2016/05/20 23:53:42 pool-1-thread-4 count = 2 2016/05/20 23:53:42 Consumer Begin pool-1-thread-3 2016/05/20 23:53:42 pool-1-thread-4 count = 4 2016/05/20 23:53:42 pool-1-thread-3 count = 6 2016/05/20 23:53:42 Consumer Begin pool-1-thread-2 2016/05/20 23:53:42 pool-1-thread-3 count = 10 2016/05/20 23:53:42 pool-1-thread-3 count = 14 2016/05/20 23:53:42 pool-1-thread-2 count = 12 2016/05/20 23:53:42 pool-1-thread-3 count = 16 2016/05/20 23:53:42 pool-1-thread-3 count = 20 2016/05/20 23:53:42 pool-1-thread-3 count = 22 2016/05/20 23:53:42 pool-1-thread-3 count = 24 2016/05/20 23:53:42 Consumer Begin pool-1-thread-1 .... .... 2016/05/20 23:53:42 pool-1-thread-3 count = 196 2016/05/20 23:53:42 pool-1-thread-5 count = 148 2016/05/20 23:53:42 pool-1-thread-4 count = 198 2016/05/20 23:53:42 pool-1-thread-1 count = 190 2016/05/20 23:53:42 pool-1-thread-2 count = 168 2016/05/20 23:53:48 Consumer End pool-1-thread-1 2016/05/20 23:53:48 Consumer End pool-1-thread-3 2016/05/20 23:53:48 Consumer End pool-1-thread-5 2016/05/20 23:53:48 Consumer End pool-1-thread-2 2016/05/20 23:53:48 Consumer End pool-1-thread-4 2016/05/20 23:53:48 Finished!

Chương trình trên tạo ra 1 Producer và 5 Consumer, Dữ liệu được sinh ra từ Producer là số Integer từ 0 -> 99. Mỗi một item sẽ cho vào 1 Queue với độ lớn 15. Bên Consumer sẽ lấy từng item trong Queue và nhân với 2 rồi in ra màn hình.

Mỗi Consumer là một Thread trong 1 Pool xử lý.

2.3 Xử lý trên giải quyết được các Problem thế nào?

  • Proplem 1: Consumer sử dụng items, xử lý dữ liệu khi mà item, data đó đang được sản xuất bởi producer. Giải quyết: Dùng cơ chế đồng bộ dữ liệu. Cụ thể trong chương trình trên sử dụng: LinkedBlockingQueue - Theo như kết quả trên thì các Items được sinh ra và xử lý không hề bị lỗi. Và các số được in ra từ 0 -> 99 nhân với 2. Vậy dữ liệu được toàn vẹn khi được sinh ra từ Producer.

  • Problem 2: Consumer chờ vô tận khi Producer không sản xuất nữa. Giải quyết: Dùng timeout việc get dữ liệu từ queue hoặc buffer trên Consumer. - Theo kết quả trên thì Consumer kết thúc sau khi count xong là 6s nhờ vào đoạn code sau: - Integer count = dataQueue.poll(ProducerConsumerTest.MAX_TIME_OUT_POOL, TimeUnit.MILLISECONDS);

  • Producer chỉ sinh dữ liệu tới 99. Sau đó kết thúc. Consumer xử lý hết dữ liệu trong queue thì chờ thêm thời gian timeout của poll. Nếu không có dữ liệu tiếp thì kết thúc.

  • Problem 3:  Producer chờ vô tận đến khi Consumer tiêu thụ items hoặc xử lý data. Giải quyết: Producer khi sản sinh ra items hoặc data sẽ thiết lập time out cho việc add vào buffer hoặc queue. - - dataQueue.offer(new Integer(i), ProducerConsumerTest.MAX_TIME_OUT_OFFER, TimeUnit.MILLISECONDS);

  • Sửa code thêm đoạn sau: - if (count == null) { isStop = true; } else { System.out.println((new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")).format(new Date()) + " " + Thread.currentThread().getName() + " count = " + count * 2); } // add to check problem 3 if(count > 40) { return true; }

  • Kết Quả: - 2016/05/21 00:05:06 Consumer End pool-1-thread-5 2016/05/21 00:05:06 Consumer End pool-1-thread-3 2016/05/21 00:05:06 pool-1-thread-4 count = 56 2016/05/21 00:05:06 Consumer End pool-1-thread-2 2016/05/21 00:05:06 pool-1-thread-4 count = 88 2016/05/21 00:05:06 Consumer End pool-1-thread-4 2016/05/21 00:05:06 pool-1-thread-1 count = 74 2016/05/21 00:05:06 pool-1-thread-1 count = 90 2016/05/21 00:05:06 Consumer End pool-1-thread-1 2016/05/21 00:05:45 Finished!

  • Consumer kết thúc lúc 00:05:06 nhưng chương trình kết thúc lúc 00:05:45. Tức là mất khoảng 39s. Lý do: thì đã xử lý tới count = 90. Tức data:45 đã được xử lý. như vậy 46 items đã được xử lý, còn lại 54 item chưa được xử lý, trong queue có 15 item. Vậy producer còn có 54 -15 = 39 items. Mỗi item có time out = 1s -> hệ thống kết thúc sau 39s.

  • Theo như kết quả trên cho dù Consumer bận (đi vắng) thì hệ thống cũng dừng. Producer không phải chờ vô tận.

3. Kết luận:


Phù hợp xây dựng các ứng dụng multi-thread trong JAVA mà không cần phải lo lắng về Synchronized dữ liệu.

Việc sử dụng Producer-Consumer trong trường hợp Producer sẽ tạo ra thành phần giống nhau về cấu trúc của data để cho vào queue, và multi- Consumer sẽ thực hiện xử lý các phần data đó.

Điểm yếu: ngoài trừ Java được đóng gói 1 số class đã có chức năng Synchronized dữ liệu, thì các ngôn ngữ khác để thiết lập mô hình producer- consumer tương đối phức tạp, cần tới các hàm tương tự Synchronized, Notify, Wait để giải quyết các problem trên.

4. Tham Khảo:


https://en.wikipedia.org/wiki/Producer-consumer_problem
[Java Concurrent] Tìm hiểu về BlockingQueue