且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

一个使用Java BlockingQueue实现的生产者和消费者

更新时间:2022-10-09 22:43:54

消费者

package consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;


public class Consumer implements Runnable {

    /*
     * 用util.concurrent.BlockingQueue沟通生产者和消费者的桥梁
    */
    BlockingQueue<String> queue;
    String id;
    @SuppressWarnings("unused")
    private volatile boolean      isRunning               = true;
    public Consumer(BlockingQueue<String> queue, String id) {
        this.queue = queue;
        this.id = id;
    }
    
    public void stop() {
        isRunning = false;
    }
    
    @Override
    public void run() {
        System.out.println("Thread: " + id + " Consumer thread is running...");
        boolean isRunning = true;
        try {
            while (isRunning) {
                System.out.println("Thread: " + id + " fetch data from linkedQueue..." + " queue size: " + queue.size());
                /*
                 * 从队列里取出一个元素,2秒超时,如果两秒之后还没有东西可以取,则poll返回null
                 */
                String data = queue.poll(2, TimeUnit.SECONDS);
                if (null != data) {
                    System.out.println("Thread: " + id + " has consumed one data from queue: " + data
                            + "   Queue sise: " + queue.size());
                    // simulate data consumption
                    Thread.sleep(1000);
                } else {
                    isRunning = false;
                    // 消费者准备退出
                    System.out.println("Thread: " + id + " Consumer read queue timeout");
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("Thread: " + id + " consumer thread ends");
            
        }
    }

}

生产者

package consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
 
    BlockingQueue<String> queue;
    String id;
    public Producer(BlockingQueue<String> queue, String id) {
        this.queue = queue;
        this.id = id;
    }
 
    @Override
    public void run() {
        String data = null;
        
        try {
            while (isRunning) {
                System.out.println("PRODUCER: " + id + " is running");
                Thread.sleep(100);
 
                data = "data:" + count.incrementAndGet();
                System.out.println("Thread: " + id + " procedued data into queue: " + data + " ...");
                if (!queue.offer(data, 2, TimeUnit.SECONDS)) {
                    System.out.println("failed to put data into queue: " + data);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } finally {
            System.out.println("Thread: " + id + " quit from producer thread");
        }
    }
 
    public void stop() {
        isRunning = false;
    }
 
    private volatile boolean      isRunning               = true;
    private static AtomicInteger  count                   = new AtomicInteger();
 
}

测试代码

package consumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class ConsumerProducerTest {
 
    public static void main(String[] args) throws InterruptedException {

        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(15);
 
        Producer producer1 = new Producer(queue, "PROD1");
        Producer producer2 = new Producer(queue, "PROD2");
        Producer producer3 = new Producer(queue, "PROD3");
        Consumer consumer1 = new Consumer(queue, "CONSUMER1");
        Consumer consumer2 = new Consumer(queue, "CONSUMER2");
 
        ExecutorService service = Executors.newCachedThreadPool();
        
        service.execute(producer1);
        service.execute(producer2);
        service.execute(producer3);
        service.execute(consumer1);
        service.execute(consumer2);
 
        Thread.sleep(3 * 1000);
        producer1.stop(); // 一定要先关闭生产者
        producer2.stop();
        producer3.stop();
        consumer1.stop();
        consumer2.stop();
 
        Thread.sleep(2000);

        service.shutdown();
    }
}