且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

JAVA并发处理经验(四)并行模式与算法1:生产者消费与单例模式

更新时间:2022-06-02 05:44:12

一、前言

并行模式与算法是并发中常见的问题。例如一些常用的模式化处理,我们也会接触。但是在现在多核下,合理安排好并发保证数据安全一些基本的常识我们还是需要了解的。

二、并行模式与算法

2.1单例模式

借助内部类实现单例,只是在类加载的时候被创建一次,达到单例模式。
适应场景:工具类,实体初始化等
/**
 * Created by ycy on 16/1/15.
 * 单例模式
 */
public class SingletonDemo {
    private SingletonDemo() {
        System.out.println("singleton is create");

    }
    private static class SinggletonHodel {
        private static SingletonDemo instance = new SingletonDemo();
    }

    public static synchronized SingletonDemo getInstance() {

        return SinggletonHodel.instance;
    }
    public static void main(String[] args) {
                    System.out.println(SingletonDemo.getInstance());
    }
}

2.2 不变模式

将一些恒定的值在第一次加载类的时候固化,保证多线程环境下的安全;
适应 场景:我们java的基本类型都是这么玩耍的
public final class Product{
    private final String no;
    private final String name;
    private final double price;
    
    public Product(String no,String name,double price){
        super();
        this.no=no;
        this.name=name;
        this.price=price;
    }

    public String getNo() {
        return no;
    }

    public String getName() {
        return name;
    }

    public double getPrice() {
        return price;
    }
}


2.3 生产者消费者模式 

具体研究google的disrupt框架,专业处理并发。这里给出阻塞的queue实现。(阻塞的性能没有disrupt快)

2.3.1 生产者与消费用用的数据模型

package pattern;

import javax.print.attribute.standard.RequestingUserName;

/**
 * Created by ycy on 16/1/15.
 */
public final class PCData {
    private final int intDate;//数据
    public PCData(int d){
        intDate=d;
    }
    public PCData(String d){
        intDate=Integer.valueOf(d);
    }

    public int getIntDate() {
        return intDate;
    }
   @Override
    public String toString(){
        return "date:"+intDate;
    }


}

2.3.2 生产者 通过block queue的offer插入数据进入队列

package pattern;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by ycy on 16/1/15.
 */
public class Producer implements  Runnable{
<h3>
</h3>    private volatile  boolean isRunning=true;
    private BlockingQueue<PCData> queue;//内存缓冲区
    private  static AtomicInteger count=new AtomicInteger();//总数.原子操作
    private static final int SLEEPTIME=1000;

    public Producer(BlockingQueue<PCData> queue){
        this.queue=queue;
    }
    public void run() {
    PCData data=null;
        Random r=new Random();
        System.out.println("start produer id=" +Thread.currentThread().getId());
        try{
            while (true){
                Thread.sleep(r.nextInt(SLEEPTIME));
                data=new PCData(count.incrementAndGet());//构造任务数据
                System.out.println(data+" is put into queue");
                if (!queue.offer(data,2, TimeUnit.SECONDS)){
                    System.out.println("failed to put data:"+data);//提交数据到缓冲区

                }
            }
        }catch (InterruptedException e){
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
    public void stop(){
        isRunning=false;
    }
}

2.3.3 通过block的take提取数据模型,处理数据
package pattern;

import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.locks.Condition;

/**
 * Created by ycy on 16/1/15.
 */
public class Consumer implements  Runnable{
private BlockingQueue<PCData> queue;//缓冲 去
private static final int SLEEPTIME=1000;
    public Consumer(BlockingQueue<PCData> queue){
        this.queue=queue;
    }

    public void run() {
        System.out.println("start Consume id ="+Thread.currentThread().getId());
        Random r=new Random();//随机等待时间

        try{
            while (true){
                PCData data=queue.take();//提前任务
                if (null!=data){
                    int re=data.getIntDate()*data.getIntDate();//计算平方
                    System.out.println(MessageFormat.format("{0}*{1}={2}",data.getIntDate(),data.getIntDate(),re));
                }
            }
        }catch (InterruptedException e){
            e.printStackTrace();;
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //建立缓冲区
        BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10);
        Producer p1 = new Producer(queue);
        Producer p2 = new Producer(queue);
        Producer p3 = new Producer(queue);

        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        Consumer consumer3 = new Consumer(queue);

        ExecutorService service = Executors.newCachedThreadPool();//建立线程池
        service.execute(p1);
        service.execute(p2);
        service.execute(p3);

        service.execute(consumer1);
        service.execute(consumer2);
        service.execute(consumer3);

        Thread.sleep(10*1000);

        p1.stop();
        p2.stop();
        p3.stop();

        Thread.sleep(3000);
        service.shutdown();

    }
}