百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

JUC并发编程与高性能内存队列disruptor实战

toyiye 2024-05-25 20:11 23 浏览 0 评论

JUC并发实战

Synchonized与Lock

区别

  • Synchronized是Java的关键字,由JVM层面实现的,Lock是一个接口,有实现类,由JDK实现。
  • Synchronized无法获取锁的状态,Lock可以判断是否获取到了锁。
  • Synchronized自动释放锁,lock一般在finally中手动释放,如果不释放锁,会死锁。
  • Synchronized 线程1(获得锁,阻塞),线程2(等待,傻傻的等); lock锁不一定会等待下去(lock.tryLock())
  • Synchronized是可重入的,不可中断的,非公平锁。Lock, 可重入锁,可以判断锁,非公平锁。
  • Synchronized 适合锁少量的代码同步问题,Lock适合锁大量的同步代码。

代码示例

高铁票类synchronized实现TicketS.java,对于线程来说也属于资源类

package cn.itxs.synchronize;

public class TicketS {
    private int quantify = 20;
    public synchronized void sale(){
        if (quantify > 0) {
            System.out.println("当前线程"+Thread.currentThread().getName() + "卖出了第" + quantify-- + "张高铁票,剩余票数量为" + quantify);
        }
    }
}

高铁票类Lock实现TicketL.java

package cn.itxs.synchronize;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
?
public class TicketL {
    private int quantify = 20;
    private Lock lock = new ReentrantLock();
    public void sale(){
        lock.lock();
        try {
            if (quantify > 0) {
                System.out.println("当前线程"+Thread.currentThread().getName() + "卖出了第" + quantify-- + "张高铁票,剩余票数量为" + quantify);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

测试类,下面线程的使用采用lambda表达式写法,属于JDK8的特性之一

package cn.itxs.synchronize;
?
public class ThreadMain {
    public static void main(String[] args) {
        TicketS ticketS = new TicketS();
        System.out.println("ticketS-----------");
        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketS.sale();
            }
        },"第一个线程").start();
?
        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketS.sale();
            }
        },"第二个线程").start();
?
        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketS.sale();
            }
        },"第三个线程").start();
?
        System.out.println("ticketL-----------");
?
        TicketL ticketL = new TicketL();
        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketL.sale();
            }
        },"第一个线程").start();
?
        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketL.sale();
            }
        },"第二个线程").start();
?
        new Thread(() -> {
            for (int i = 1; i < 30; i++) {
                ticketL.sale();
            }
        },"第三个线程").start();
    }
}

虚假唤醒

概述

  • 虚假唤醒是指当一定的条件触发时会唤醒很多在阻塞态的线程,但只有部分的线程唤醒是有用的,其余线程的唤醒是多余的;比如说卖货,如果本来没有货物,突然进了一件货物,这时所有的顾客都被通知了,但是只能一个人买,所以对其他人都是做了无用的通知。

代码示例

计算类,提供加一减一的0和1结果,Counter.java

package cn.itxs.counter;
?
public class Counter {
    private int count = 0;
?
    public synchronized void addCount() throws InterruptedException {
        if (count > 0){
            //线程开始等待
            this.wait();
        }
        count++;
        System.out.println("当前线程为" + Thread.currentThread().getName() + ",count=" + count);
        //通知其他线程
        this.notifyAll();
    }
?
    public synchronized void subtractCount() throws InterruptedException {
        if (count == 0){
            //线程开始等待
            this.wait();
        }
        count--;
        System.out.println("当前线程为" + Thread.currentThread().getName() + ",count=" + count);
        //通知其他线程
        this.notifyAll();
    }
}

测试类

package cn.itxs.counter;
?
public class CounterMain {
    public static void main(String[] args) {
        Counter counter = new Counter();
        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counter.addCount();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第一个线程").start();
?
        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counter.subtractCount();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第二个线程").start();
?
        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counter.addCount();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第三个线程").start();
?
        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counter.subtractCount();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第四个线程").start();
    }
}

从上面分析可以知道导致虚假唤醒的原因主要就是一个线程直接在if代码块中被唤醒了,这时它已经跳过了if判断。我们只需要将if判断改为while,这样线程就会被重复判断而不再会跳出判断代码块,从而不会产生虚假唤醒这种情况了。

Callable

Callable任务可拿到一个Future对象,表示异步计算的结果,它提供了检查是否计算完成的方法,以等待计算的完成,并检索计算的结果,通过Future对象可以了解任务执行情况,可以取消任务的执行,还可以获取执行结果。

  • Runnable和Callable的区别Callable规定的方法是call(),Runnable规定的接口是run();Callable的任务执行后可返回值,而Runnable的任务是不能有返回值的;call方法可以抛出异常,run方法不可以

实现Callable接口资源类MessageThread.java

package cn.itxs.collection;
?
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
?
public class MessageThread implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("hello callable!");
        TimeUnit.SECONDS.sleep(3);
        return 100;
    }
}

测试类

package cn.itxs.collection;
?
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
?
public class CallableMain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MessageThread messageThread = new MessageThread();
        FutureTask futureTask = new FutureTask(messageThread);
        new Thread(futureTask,"FutureTaskTest").start();
        Integer res = (Integer)futureTask.get();
        System.out.println(res);
    }
}

异步回调

package cn.itxs.asyncall;
?
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
?
public class AsynCallMain {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + ",async call void");
        });
        System.out.println("等待线程异步执行");
        completableFuture.get();
?
        CompletableFuture<String> completableFutureR = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //int i = 1/0; //取消注释后下面e输出详细信息,返回值为bad
            System.out.println(Thread.currentThread().getName() + ",async call return");
            return "good";
        });
        System.out.println("等待线程异步执行");
        System.out.println(completableFutureR.whenComplete((s, e) -> {
            System.out.println("s=" + s + ",e=" + e);
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return "bad";
        }).get());
    }
}

Lock+Condition

代码示例

先将上一小节改造为Lock+Condition版本下CounterL.java

package cn.itxs.counter;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CounterL {
    private int count = 0;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    public void addCount() throws InterruptedException {
        lock.lock();
        try {
            while (count > 0){
                //线程开始等待
                condition.await();
            }
            count++;
            System.out.println("当前线程为" + Thread.currentThread().getName() + ",count=" + count);
            //通知其他线程
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void subtractCount() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0){
                //线程开始等待
                condition.await();
            }
            count--;
            System.out.println("当前线程为" + Thread.currentThread().getName() + ",count=" + count);
            //通知其他线程
            condition.signalAll();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

和上面的执行结果是一样,但Lock+Condition可以实现精准的唤醒

CounterA.java

package cn.itxs.counter;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CounterA {
    private int count = 1;
    private Lock lock = new ReentrantLock();
    private Condition condition1 = lock.newCondition();
    private Condition condition2 = lock.newCondition();
    private Condition condition3 = lock.newCondition();

    public void testMethod1() throws InterruptedException {
        lock.lock();
        try {
            while (count != 1){
                //线程开始等待
                condition1.await();
            }
            count = 2;
            System.out.println("当前线程为" + Thread.currentThread().getName() + ",testMethod1 count=" + count);
            //通知其他线程
            condition2.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void testMethod2() throws InterruptedException {
        lock.lock();
        try {
            while (count != 2){
                //线程开始等待
                condition2.await();
            }
            count = 3;
            System.out.println("当前线程为" + Thread.currentThread().getName() + ",testMethod2 count=" + count);
            //通知其他线程
            condition3.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public void testMethod3() throws InterruptedException {
        lock.lock();
        try {
            while (count != 3){
                //线程开始等待
                condition3.await();
            }
            count = 1;
            System.out.println("当前线程为" + Thread.currentThread().getName() + ",testMethod3 count=" + count);
            //通知其他线程
            condition1.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
}

测试类

package cn.itxs.counter;

public class CounterAMain {

    public static void main(String[] args) {
        CounterA counterA = new CounterA();
        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counterA.testMethod1();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第一个线程").start();

        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counterA.testMethod2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第二个线程").start();

        new Thread(() -> {
            for (int i = 1; i < 10; i++) {
                try {
                    counterA.testMethod3();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"第三个线程").start();
    }
}

锁的常识

package cn.itxs.lock;

import java.util.concurrent.TimeUnit;

public class Sport {
    public synchronized void playBasketBall(){
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "打篮球");
    }

    public synchronized void swimming(){
        System.out.println(Thread.currentThread().getName() + "去游泳");
    }

    //普通方法
    public void dancing(){
        System.out.println(Thread.currentThread().getName() + "去跳舞");
    }

    public synchronized void singing(){
        System.out.println(Thread.currentThread().getName() + "去K歌");
    }

    //静态同步方法
    public static synchronized void skating(){
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "去滑冰");
    }

    //静态同步方法
    public static synchronized void climbing(){
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + "去登山");
    }

    public synchronized void shooting(){
        System.out.println(Thread.currentThread().getName() + "去射击");
    }
}

测试类

package cn.itxs.lock;

public class LockDemo {
    public static void main(String[] args) {
        Sport sport = new Sport();
        Sport sport1 = new Sport();
        Sport sport2 = new Sport();
        Sport sport3 = new Sport();
        Sport sport4 = new Sport();
        new Thread(() -> {
            sport.playBasketBall();
        },"第一个线程").start();

        new Thread(() -> {
            sport.swimming();
        },"第二个线程").start();

        new Thread(() -> {
            sport.dancing();
        },"第三个线程").start();

        new Thread(() -> {
            sport1.swimming();
        },"第四个线程").start();

        new Thread(() -> {
            sport2.skating();
        },"第五个线程").start();

        new Thread(() -> {
            sport3.climbing();
        },"第六个线程").start();

        new Thread(() -> {
            sport3.shooting();
        },"第七个线程").start();
    }
}

从上面的结果我们可以知道synchronized锁的是方法的调用者,对于同一对象同步方法谁先拿到锁先执行,而不同对象如sport和sport1是两个对象相当于两把锁,互不相干;对于static同步方法锁的是class,两个对象的类class只有一个,相同对象的类的静态同步方法也是谁先拿到锁先执行;对于同一对象的静态同步方法和同步方法属于class和对象也是两把锁,互不相干。

并发集合类

CopyOnWriteArrayList

package cn.itxs.collection;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;

public class ListDemo {
    public static void main(String[] args) {
        //ArrayList不是线程安全
        //List<String> list = new ArrayList<String>();
        //List<String> list = new Vector<>(); //第一种方法,这种是集合方法加了synchronized变为同步方法
        //List<String> list = Collections.synchronizedList(new ArrayList<String>()); //第二种方法,将ArrayList通过Collections工具类转为同步集合
        List<String> list = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 50; i++) {
            new Thread(() -> {
                list.add(UUID.randomUUID().toString());
                System.out.println(list);
            },String.valueOf(i)+"线程").start();
        }
    }
}

CopyOnWriteArraySet

package cn.itxs.collection;

import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;

public class SetDemo {
    public static void main(String[] args) {
        //HashSet不是线程安全
        //Set<String> set = new HashSet<>();
        //Set<String> set = Collections.synchronizedSet(new HashSet<String>()); //将HashSet通过Collections工具类转为同步集合
        Set<String> set= new CopyOnWriteArraySet<>();
        for (int i = 0; i < 50; i++) {
            new Thread(() -> {
                set.add(UUID.randomUUID().toString());
                System.out.println(set);
            },String.valueOf(i)+"线程").start();
        }
    }
}

ConcurrentHashMap

package cn.itxs.collection;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

public class MapDemo {
    public static void main(String[] args) {
        //HashMap不是线程安全
        //Map<String,String> map = new HashMap<>();
        Map<String,String> map = new ConcurrentHashMap<>();
        for (int i = 0; i < 50; i++) {
            new Thread(() -> {
                map.put(UUID.randomUUID().toString(),UUID.randomUUID().toString());
                System.out.println(map);
            },String.valueOf(i)+"线程").start();
        }
    }
}

并发编程辅助类

CountDowmLatch

CountDownLatch一般被称作"计数器",当数量达到了某个点之后计数结束,才能继续往下走,可用于流程控制**,大流程分成多个子流程,然后大流程在子流程全部结束之前不动(子流程最好是相互独立的,除非能很好的控制两个流程的关联关系),子流程全部结束后大流程开始操作。**

package cn.itxs.tool;

import java.util.concurrent.CountDownLatch;

public class CDLMain {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 1; i <= 10; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "进入核酸检测排队区域");
                countDownLatch.countDown();
            },String.valueOf(i)).start();
        }
        countDownLatch.await();
        System.out.println("开始进行一组核酸监测");
    }
}

CyclicBarrier

CyclicBarrier通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

package cn.itxs.tool;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CBMain {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,() -> {
            System.out.println("集齐五福兑取大奖");
        });

        for (int i = 1; i < 6; i++) {
            final int count = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + ",获取到第" + count + "种福");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        TimeUnit.SECONDS.sleep(5);

        for (int i = 1; i < 6; i++) {
            final int count = i;
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + ",获取到第" + count + "种福");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

Semaphore

Semaphore通常叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源.通常用于那些资源有明确访问数量限制的场景,常用于限流 。比如:数据库连接池,同时进行连接的线程有数量限制,连接不能超过一定的数量,当连接达到了限制数量后,后面的线程只能排队等前面的线程释放了数据库连接才能获得数据库连接。

package cn.itxs.tool;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreMain {
    public static void main(String[] args) {
        //最多同时处理4个请求
        Semaphore semaphore = new Semaphore(4);
        for (int i = 1; i <= 20; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "请求处理开始");
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName() + "请求处理结束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }
}

队列

集合接口包含队列接口,常见的队列有阻塞队列和同步队列。

阻塞队列

阻塞队列存在四组API,分别对应着四种队列的阻塞情况。

package cn.itxs.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockIngQueueMain {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("异常抛出测试----------");
        BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(3);
        arrayBlockingQueue.add("hello");
        arrayBlockingQueue.add("world");
        arrayBlockingQueue.add("java");
        //arrayBlockingQueue.add("queue"); //这里取消注释则会抛Queue full异常
        System.out.println(arrayBlockingQueue.element()); //获取队顶元素但不出队
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        System.out.println(arrayBlockingQueue.remove());
        //System.out.println(arrayBlockingQueue.remove()); //这里取消注释且队列没有元素会抛NoSuchElementException异常
        //System.out.println(arrayBlockingQueue.element());  //这里取消注释会抛且队列没有元素NoSuchElementException异常

        System.out.println("返回值测试----------");
        BlockingQueue<String> arrayBlockingQueueR = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueueR.offer("hello"));
        System.out.println(arrayBlockingQueueR.offer("world"));
        System.out.println(arrayBlockingQueueR.offer("java"));
        System.out.println(arrayBlockingQueueR.offer("queue"));
        System.out.println(arrayBlockingQueueR.peek());
        System.out.println(arrayBlockingQueueR.poll());
        System.out.println(arrayBlockingQueueR.poll());
        System.out.println(arrayBlockingQueueR.poll());
        System.out.println(arrayBlockingQueueR.poll());
        System.out.println(arrayBlockingQueueR.peek());

        System.out.println("超时等待timeoout时间测试----------");
        BlockingQueue<String> arrayBlockingQueueT = new ArrayBlockingQueue<>(3);
        System.out.println(arrayBlockingQueueT.offer("hello"));
        System.out.println(arrayBlockingQueueT.offer("world"));
        System.out.println(arrayBlockingQueueT.offer("java"));
        System.out.println(arrayBlockingQueueT.offer("queue",3, TimeUnit.SECONDS));
        System.out.println(arrayBlockingQueueT.poll());
        System.out.println(arrayBlockingQueueT.poll());
        System.out.println(arrayBlockingQueueT.poll());
        System.out.println(arrayBlockingQueueT.poll(3, TimeUnit.SECONDS));

        System.out.println("一直阻塞测试----------");
        BlockingQueue<String> arrayBlockingQueueB = new ArrayBlockingQueue<>(3);
        arrayBlockingQueueB.put("hello");
        arrayBlockingQueueB.put("world");
        arrayBlockingQueueB.put("java");
        //arrayBlockingQueueB.put("queue");  //这里取消注释会一直阻塞
        System.out.println(arrayBlockingQueueB.take());
        System.out.println(arrayBlockingQueueB.take());
        System.out.println(arrayBlockingQueueB.take());
        System.out.println(arrayBlockingQueueB.take());  //当元素为空时一直阻塞
    }
}

同步队列

在同步队列中只有出队以后才允许入队,否则一直处于阻塞状态。

package cn.itxs.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueMain {
    public static void main(String[] args) {
        BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+"put hello");
                synchronousQueue.put("hello");
                System.out.println(Thread.currentThread().getName()+"put world");
                synchronousQueue.put("world");
                System.out.println(Thread.currentThread().getName()+"put java");
                synchronousQueue.put("java");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"入队线程").start();

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+"take hello");
                System.out.println(synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"take world");
                System.out.println(synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"take java");
                System.out.println(synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"出队线程").start();
    }
}

CAS

  • CAS:Compare And Swap,直译为比较并交换;CAS是CPU并发原语,由CPU实现;通过比较当前内存中的值和主内存中的值,如果这个值是期望的,那么则执行,如果不是就一直循环下去。
  • CAS也称为自旋锁****,****在一个(死)循环【for(;;)】里不断进行CAS操作,直到成功为止(自旋操作),实际上CAS也是一种乐观。
  • 缺点循环会耗时。一次只能保证一个共享变量的原子性。ABA问题
package cn.itxs.cas;

import java.util.concurrent.atomic.AtomicInteger;

public class CASMain {
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger(100);
        System.out.println(atomicInteger.getAndIncrement()); //原子递增
        System.out.println(atomicInteger.get());
        //如果我期望的值达到了,那么就更新,否则,就不更新
        System.out.println(atomicInteger.compareAndSet(101, 200));
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(101, 300));
        System.out.println(atomicInteger.get());
        System.out.println("ABA-----");
        System.out.println(atomicInteger.compareAndSet(200, 300));
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(300, 200));
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(200, 0));
        System.out.println(atomicInteger.get());
    }
}

package cn.itxs.cas;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;

public class ARMain {
    public static void main(String[] args) {
        AtomicStampedReference atomicStampedReference = new AtomicStampedReference(101,1);
        new Thread(()->{
            System.out.println(Thread.currentThread().getName() + "版本号为:"+atomicStampedReference.getStamp());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            atomicStampedReference.compareAndSet(101, 102,
                    atomicStampedReference.getStamp(),
                    atomicStampedReference.getStamp()+1);

            System.out.println(Thread.currentThread().getName() + "版本号为:"+atomicStampedReference.getStamp());

            atomicStampedReference.compareAndSet(102, 101,
                    atomicStampedReference.getStamp(),
                    atomicStampedReference.getStamp()+1);

            System.out.println(Thread.currentThread().getName() + "版本号为:"+atomicStampedReference.getStamp());

        },"A").start();


        //和乐观锁的原理相同
        new Thread(()->{
            int stamp = atomicStampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "版本号为:"+stamp);

            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(atomicStampedReference.
                    compareAndSet(101, 105,stamp,stamp+1));

            System.out.println(Thread.currentThread().getName() + "版本号为:"+atomicStampedReference.getStamp());
        },"B").start();
    }
}

并发理论

JMM

概述

  • Java Memory Model缩写为JMM,直译为Java内存模型,定义了一套在多线程读写共享数据时(成员变量、数组)时,对数据的可见性、有序性和原子性的规则和保障;JMM用来屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各平台下都能够达到一致的内存访问效果。
  • JMM是一种规范,目的是解决由于多线程通过共享内存进行通信时,存在的本地内存数据不一致、编译器对代码指令重排序、处理器对代码乱序执行、CPU切换线程等带来的问题。

并发与并行

  • 并发:指的是多个事情,在同一时间段内同时发生了; 并发的多个任务之间是互相抢占资源的。
  • 并行:指的是多个事情,在同一时间点上同时发生了;并行的多个任务之间是不互相抢占资源的。
  • 只有在多CPU的情况中,才会发生并行。否则,看似同时发生的事情,其实都是并发执行的。

现代计算机内存模型

现代计算机处理器与存储设备运算速度完全不在同一量级上,至少相差几个数量级,如果让处理器等待计算机存储设备那么这样处理器的优势就不会体现出来。为了提高处理性能实现高并发,在处理器和存储设备之间加入了高速缓存(cache)来作为缓冲。将CPU运算需使用到的数据先复制到缓存中,让CPU运算能够快速进行;当CPU运算完成之后,再将缓存中的结果写回主内存,这样CPU运算就不用依赖等待主内存的读写操作了。

  • 高速缓存设置为多级缓存,其目的为了解决CPU运算速度与内存读写速度不匹配的矛盾;在CPU和内存之间,引入了L1高速缓存、L2高速缓存、L3高速缓存****每一级缓存中所存储的数据全部都是下一级缓存中的一部分,当CPU需要数据时,先从缓存中取,加快读写速度,提高CPU利用率。存储层次金字塔的结构:寄存器 → L1缓存 → L2缓存 → L3缓存 → 主内存 → 本地磁盘 → 远程数据库。越往上访问速度越快、成本越高,空间更小。越往下访问速度越慢、成本越低,空间越大。
  • 每个处理器都有自己的高速缓存,同时又共同操作同一块主内存,当多个处理器同时操作主内存时,可能将导致各自的的缓存数据不一致,为了解决这个问题****主要提供了两种解决办法:总线锁:在多 cpu 下,当其中一个处理器要对共享内存进行操作的时候,在总线上发出一个 LOCK# 信号,这个信号使得其他处理器无法通过总线来访问到共享内存中的数据,总线锁定把 CPU 和内存之间的通信锁住了,这使得锁定期间,其他处理器不能操作其他内存地址的数据,总线锁定的开销比较大,这种机制显然是不合适的。总线锁的力度太大了,最好的方法就是控制锁的保护粒度,只需要保证对于被多个 CPU 缓存的同一份数据是一致的就可以了。缓存锁:相比总线锁,缓存锁即降低了锁的力度。核心机制是基于缓存一致性协议来实现的。为了达到数据访问的一致,需要各个处理器在访问缓存时遵循一些协议,在读写时根据协议来操作,常见的协议有 MSI、MESI、MOSI 等,最常见的为Intel的MESI协议是四种状态的缩写,用来修饰缓存行的状态。M:被修改,该缓存行的数据被修改了,和主存数据不一致。监听所有想要修改此缓存行对应的内存数据的操作,该操作必须等缓存行数据更新到主内存中,状态变成 S (Shared)共享状态之后执行。E:独享,该缓存行和内存数据一致,数据只在本缓存中;监听所有读取此缓存行对应的内存数据的操作,如果发生这种操作,Cache Line 缓存状态从独占转为共享状态。S:分享,缓存行和内存数据一致,数据位于多个缓存中,监听其他缓存使该缓存行失效或者独享该缓存行的操作,如果检测到这种操作,将该缓存行变成无效。I:无效的,该缓存行的数据无效,没有监听,处于失效状态的缓存行需要去主存读取数据。

  • 如何发现数据是否失效?总线的嗅探机制每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据读到处理器缓存里。
  • 嗅探缺点:总线风暴由于Volatile的MESI缓存一致性协议,需要不断的从主内存嗅探和CAS不断循环,无效交互会导致总线带宽达到峰值。所以不要大量使用Volatile,至于什么时候去使用Volatile什么时候使用锁,根据场景区分。

本地内存与主内存

  • JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存或者叫工作内存,本地内存中存储了该线程以读/写共享变量的副本。
  • 但本地内存是JMM的一个抽象概念,并不真实存在;它涵盖了缓存,写缓冲区,寄存器以及其他的硬件和编译器优化。

  • 主内存和本地内存的交互线程的本地内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在本地内存中进行,而不能直接读写主内存中的变量。不同的线程之间也无法直接访问对方本地内存中的变量,线程间变量值的传递均需要通过主内存来完成。下面AB线程为例:A线程先把本地内存的值写入主内存。B线程从主内存中去读取出A线程写的值。

原子操作

在此交互过程中,Java内存模型定义了8种操作来完成,虚拟机实现必须保证每一种操作都是原子的、不可再拆分的(double和long类型例外)

  • read** 读取,作用于主内存把变量从主内存中读取到本本地内存。**
  • load** 加载,主要作用本地内存,把从主内存中读取的变量加载到本地内存的变量副本中**
  • use** 使用,主要作用本地内存,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。、**
  • assign** 赋值 作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。**
  • store** 存储 作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。**
  • write** 写入 作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。**
  • lock** 锁定 :作用于主内存的变量,把一个变量标识为一条线程独占状态。**
  • unlock** 解锁:作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。**

三大特性

一个线程在执行的过程中不仅会用到CPU资源,还会用到IO,IO的速度远远比不上CPU的运算速度;当一个线程要请求IO的时候可以放弃CPU资源,这个时候其他线程就可以使用CPU,这就提高了CPU的利用率,当然线程之间的切换也会有额外的资源消耗,但多线程带来回报更大。而有了多线程就存在线程安全的问题,在Java并发编程中的一种思路就是通过原子性、可见性和有序性这三大特性切入点去考虑;在并发编程中,必须同时保证程序的原子性、有序性和可见性才能够保证程序的正确性。

  • 原子性定义:一个操作或多个操作做为一个整体,要么全部执行并且必定成功执行,要么不执行;简单理解就是程序的执行是一步到位的。一个或者多个操作在 CPU 执行的过程中不被中断的特性。由于线程的切换,导致多个线程同时执行同一段代码,带来的原子性问题。就如我们常见i++也并不是原子操作;i++分为三步,第一步先读取x的值,第二步进行x+1,第三步x+1的结果写入到内存中。还有如我们前面将单例设计模式的双层检测锁时的instance = new DoubleCheckSingleton() 这一行代码jvm内部执行3补步,1先申请堆内存,2对象初始化,3对象指向内存地址;2和3由于jvm有指令重排序优化所以存在3先执行可能会导致instance还没有初始化完成,其他线程就得到了这个instance不完整单例对象的引用值而报错。在java当中,直接的读取操作和赋值(常量)属于原子性操作。对于原本不具有原子性的操作我们可以通过synchronized关键字或者Lock接口来保证同一时间只有一个线程执行同一串代码,从而也具有了原子性。
  • 有序性定义:程序的执行是存在一定顺序的。在Java内存模型中,为了提高性能和程序的执行效率,编译器和处理器会对程序指令做重排序。在单线程中,重排序不会影响程序的正确性;as-if-serial原则是指不管编译器和CPU如何重排序,必须保证单线程情况下程序的结果是正确的;但在并发编程中,却有可能得出错误的结果。在java当中使用volatile关键字、synchronized关键字或Lock接口来保证有序性。
  • 可见性定义:指在共享变量被某一个线程修改之后,另一个线程访问的时候能够立刻得到修改以后的值。如果多个线程同时读取一个变量的时候,会在每个高速缓存中都拷贝一份数据到工作内存,内存彼此之间是不可见的。缓存不能及时刷新导致了可见性问题。JSR-133 内存模型使用happens-before原则来阐释线程之间的可见性。如果一个操作对另一个操作存在可见性,那么他们之间必定符合happens-before原则:程序顺序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作。监视器锁规则:一个unLock操作先行发生于后面对同一个锁的lock操作。volatile域规则:对一个变量的写操作先行发生于后面对这个变量的读操作。传递性规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C。在java当中普通的、未加修饰的共享变量是不能保证可见性的。我们照样可以通过synchronized关键字和Lock接口来保证可见性,同样也能利用volatile实现。

volatile和synchronized

两者区别

  • volatile只能修饰实例变量和类变量,而synchronized可以修饰方法,以及代码块。
  • volatile保证数据的可见性,但是不保证原子性(多线程进行写操作,不保证线程安全);而synchronized是一种排他(互斥)的机制。
  • volatile用于禁止指令重排序:例如可以解决单例双重检查对象初始化代码执行乱序问题;volatile是通过内存屏障去完成的禁止指令重排序。
  • volatile可以看做是轻量版的synchronized,volatile不保证原子性,但是如果是对一个共享变量进行多个线程的赋值,而没有其他的操作,那么就可以用volatile来代替synchronized,因为赋值本身是有原子性的,而volatile又保证了可见性,所以就可以保证线程安全了。

synchronized

  • 在Java中任何一个对象都有一个monitor与之关联,当且一个monitor被持有后,这个对象处于锁定状态;尝试获得锁就是尝试获取对象所对应的monitor的所有权。
  • synchronized主要原理和思路通过monitor里面设计一个计数器,synchronized关键字在底层编译后的jvm指令中会有monitorenter(加锁)和monitorexit(释放锁)两个指令来实现锁的使用,每个对象都有一个关联的monitor,比如一个对象实例就有一个monitor,一个类的Class对象也有一个monitor,如果要对这个对象加锁,那么必须获取这个对象关联的monitor的lock锁;计数器从0开始;如果一个线程要获取monitor的锁,就看看他的计数器是不是0,如果是0的话,那么说明没人获取锁,他就可以获取锁了,然后对计数器加1加锁成功。
  • 而对象头是synchronized实现锁的基础,因为synchronized申请锁、上锁、释放锁都与对象头有关。对象头其中一个重要部分Mark Word存储对象的hashCode、锁信息或分代年龄或GC标志等信息,锁总共有四个状态,分别为无锁状态、偏向锁、轻量级锁、重量级锁,锁的类型和状态在对象头Mark Word中都有记录,在申请锁、锁升级等过程中JVM都需要读取对象的Mark Word数据。java对象主要组成如下:

而对象头Mark Word组成如下:

volatile

volatile原理有volatile修饰的共享变量进行写操作的时候会多出Lock前缀的指令,该指令在多核处理器下会引发两件事情。

  • 将当前处理器缓存行数据刷写到系统主内存。
  • 这个刷写回主内存的操作会使其他CPU缓存的该共享变量内存地址的数据无效。

这样就保证了多个处理器的缓存是一致的,对应的处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器缓存行设置无效状态,当处理器对这个数据进行修改操作的时候会重新从主内存中把数据读取到缓存里。例如在Jdk7的并发包里新增了一个队列集合类LinkedTransferQueue,它在使用volatile变量的时候,会采用一种将字节追加到64字节的方法来提高性能。那为什么追加到64字节能够优化性能呢?这是因为在很多处理器中它们的L1、L2、L3缓存的高速缓存行都是64字节宽,不支持填充缓存行,例如,现在有两个不足64字节的变量AB,那么在AB变量写入缓存行时会将AB变量的部分数据一起写入一个缓存行中,那么在CPU1和CPU2想同时访问AB变量时是无法实现的,也就是想同时访问一个缓存行的时候会引起冲突,如果可以填充到64字节,AB两个变量会分别写入到两个缓存行中,这样就可以并发,同时进行变量访问,从而提高效率。

Disruptor实战

概述

Disruptor是LMAX公司LMAX Development Team开源的高性能内存队列,是一个高性能线程间消息传递库,提供并发环缓冲数据结构的库;它的设计目的是在异步事件处理体系结构中提供低延迟、高吞吐量的工作队列。它能够让开发人员只需写单线程代码,就能够获得非常强悍的性能表现,同时避免了写并发编程的难度和坑; 其本质思想在于多线程未必比单线程跑的快。

缓存行

  • CPU 为了更快的执行代码,当从内存中读取数据时并不是只读自己想要的部分, 而是读取足够的字节来填入高速缓存行。根据不同的 CPU ,高速缓存行大小不同,有32个字节和64个字节处。这样,当CPU访问相邻的数据时,就不必每次都从内存中读取,提高了速度,这是因为访问内存要比访问高速缓存用的时间多得多。这个缓存是CPU内部自己的缓存,内部的缓存单位是行,叫做缓存行。
  • 当CPU尝试访问某个变量时,会先在L1 Cache中查找,如果命中缓存则直接使用;如果没有找到,就去下一级,一直到内存,随后将该变量所在的一个Cache行大小的区域复制到Cache中。查找的路线越长,速度也越慢,因此频繁访问的数据应当保持在L1Cache中。另外,一个变量的大小往往小于一个Cache行的大小,这时就有可能把多个变量放到一个Cache行中。下面代码举例数组命中缓存行和随机读写执行耗时差异:
package cn.itxs.disruptor;
?
public class CacheMain {
    private static final int ARR_SIZE = 20000;
    public static void main(String[] args) {
        int[][] arrInt = new int[ARR_SIZE][ARR_SIZE];
        long startTime = System.currentTimeMillis();
        // 第一种情况为顺序访问,一次访问后,后面的多次访问都可以命中缓存
        for (int i = 0; i < ARR_SIZE; i++) {
            for (int j = 0; j < ARR_SIZE; j++) {
                arrInt[i][j] = i * j;
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("顺序访问耗时" + (endTime - startTime) + "毫秒");
?
        startTime = System.currentTimeMillis();
        // 第二情况为随机访问,每次都无法命中缓存行
        for (int i = 0; i < ARR_SIZE; i++) {
            for (int j = 0; j < ARR_SIZE; j++) {
                arrInt[j][i] = i * j;
            }
        }
        endTime = System.currentTimeMillis();
        System.out.println("随机访问耗时" + (endTime - startTime) + "毫秒");
?
    }
}

伪共享

当CPU执行完后还需要将数据回写到主内存上以便于其它线程可以从主内存中获取最新的数据。假设两个线程都加载了相同的CacheLine即缓存行数据

  • 数据 A、B、C 被加载到同一个 Cache line,假设线程 1 在 core1 中修改 A,线程 2 在 core2 中修改 B。
  • 线程 1 首先对 A 进行修改,这时 core1 会告知其它 CPU 核,当前引用同一地址的 Cache line 已经无效,随后 core2 发起修改 B,会导致 core1 将数据回写到主内存中,core2 这时会重新从主内存中读取该 Cache line 数据。
  • 可见,如果同一个CacheLine的内容被多个线程读取,就会产生相互竞争,频繁回写主内存,降低了性能。

核心概念

  • Ring Buffer**:环形缓冲区通常被认为是Disruptor的主要点。但从3.0开始Ring Buffer只负责存储和更新通过Disruptor移动的数据(事件),对于一些高级用例,它甚至可以完全被用户替换。**
  • Sequence**:Disruptor使用序列作为一种方法来识别特定组件的位置。每个消费者(事件处理器)维护一个序列,就像中断器本身一样。大多数并发代码依赖于这些Sequence值的移动,因此Sequence支持AtomicLong的许多当前特性。事实上,两者之间唯一的真正区别是,Sequence包含了额外的功能,以防止Sequence和其他值之间的错误共享。**
  • Sequencer**:是Disruptor的真正核心。该接口的两种实现(单一生产者和多生产者)实现了所有并行算法,以便在生产者和消费者之间快速、正确地传递数据。**
  • Sequence Barrier**:产生了一个序列屏障,其中包含了对Sequencer中发布的主序列的引用和任何依赖消费者的序列的引用。它包含确定是否有任何事件可供使用者处理的逻辑。**
  • Wait Strategy**:等待策略决定了消费者将如何等待事件被生产者放置到破坏者,如SleepingWaitStrategyYieldingWaitStrategyBlockingWaitStrategyBusySpinWaitStrategy等。**
  • Event**:从生产者传递到消费者的数据单位。事件没有特定的代码表示,因为它完全由用户定义。**
  • Event Processor**:处理来自Disruptor的事件的主事件循环,并拥有消费者序列的所有权。有一种称为BatchEventProcessor的表示,它包含事件循环的有效实现,并将回调到使用过的EventHandler接口的提供实现。**
  • Event Handler**:由用户实现的接口,代表Disruptor的消费者。**
  • Producer**:这是用户代码调用Disruptor来排队事件。**

设计要点

  • 内存分配更加合理,使用RingBuffer数据结构,数组元素在初始化时一次性全部创建,提升缓存命中率。
  • 对象循环利用,避免频繁 GC。
  • 能够避免伪共享,提升缓存利用率。Disruptor为了解决伪共享问题,使用的方法是缓存行填充,这是一种以空间换时间的策略,主要思想就是通过往对象中填充无意义的变量,来保证整个对象独占缓存行。而JDK8之后也提供了一个@Contended注解,使用它就可以进行自动填充,使用时需要在启动时增加一个JVM参数。
  • 采用无锁算法,避免频繁加锁、解锁的性能消耗。支持批量消费,消费者可以无锁方式消费多个消息。
  • 有相对更多的等待策略实现。

示例代码(多生产者多消费者)

pom文件引入disruptor的依赖

    <dependency>
      <groupId>com.lmax</groupId>
      <artifactId>disruptor</artifactId>
      <version>3.4.4</version>
    </dependency>

事件类LongEvent.java

package cn.itxs.disruptor;
?
public class LongEvent
{
    private long value;
?
    public void set(long value)
    {
        this.value = value;
    }
?
    @Override
    public String toString() {
        return "LongEvent{" +
                "value=" + value +
                '}';
    }
}

事件工厂类EventFactory.java

package cn.itxs.disruptor;
?
import com.lmax.disruptor.EventFactory;
?
public class LongEventFactory implements EventFactory<LongEvent> {
?
    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

事件处理实现类,也即是消费者,这里实现EventHandler接口,也即是每个消费者都消费相同数量的生产者数据,LongEventHandler.java

package cn.itxs.disruptor;
?
import com.lmax.disruptor.EventHandler;
?
public class LongEventHandler implements EventHandler<LongEvent> {
    public static long count = 0;
?
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        count ++;
        System.out.println("[" + Thread.currentThread().getName() + "]" + event + "消费序号:" + sequence + ",event=" + event.toString());
    }
}

测试类

package cn.itxs.disruptor;
?
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
?
import java.util.concurrent.*;
?
public class DisruptorMain {
    public static void main(String[] args) throws InterruptedException {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();
?
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024*1024;
?
        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.MULTI, new SleepingWaitStrategy());
?
        // Connect the handlers
        LongEventHandler h1 = new LongEventHandler();
        LongEventHandler h2 = new LongEventHandler();
        disruptor.handleEventsWith(h1, h2);
?
        // Start the Disruptor, starts all threads running
        disruptor.start();
?
        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
?
        //================================================================================================
        final int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);
        ExecutorService service = Executors.newCachedThreadPool();
        for (long i = 0; i < threadCount; i++) {
            final long threadNum = i;
            service.submit(()-> {
                System.out.printf("Thread %s ready to start!\n", threadNum );
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
?
                for (int j = 0; j < 2; j++) {
                    final int seq = j;
                    ringBuffer.publishEvent((event, sequence) -> {
                        event.set(seq);
                        System.out.println(threadNum + "线程生产了序号为" + sequence + ",消息为" + seq);
                    });
                }
            });
        }                 
        service.shutdown();
        TimeUnit.SECONDS.sleep(3);
        System.out.println(LongEventHandler.count);
    }
}       

事件处理实现类实现WorkHandler接口,也即是多个消费者合起来消费一份生产者数据,LongEventHandler.java

package cn.itxs.disruptor;
?
import com.lmax.disruptor.WorkHandler;
?
public class LongEventHandlerWorker implements WorkHandler<LongEvent> {
    public static long count = 0;
?
    @Override
    public void onEvent(LongEvent longEvent) throws Exception {
        count ++;
        System.out.println("[" + Thread.currentThread().getName() + "]" + "event=" + longEvent.toString());
    }
}

测试类

package cn.itxs.disruptor;
?
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
?
import java.util.concurrent.*;
?
public class DisruptorWorkerMain {
    public static void main(String[] args) throws InterruptedException {
        // The factory for the event
        LongEventFactory factory = new LongEventFactory();
?
        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024*1024;
?
        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory(),
                ProducerType.MULTI, new SleepingWaitStrategy());
?
        // Connect the handlers
?
        // 创建10个消费者来处理同一个生产者发的消息(这10个消费者不重复消费消息)
        LongEventHandlerWorker[] longEventHandlerWorkers = new LongEventHandlerWorker[4];
        for (int i = 0; i < longEventHandlerWorkers.length; i++) {
            longEventHandlerWorkers[i] = new LongEventHandlerWorker();
        }
        disruptor.handleEventsWithWorkerPool(longEventHandlerWorkers);
?
        // Start the Disruptor, starts all threads running
        disruptor.start();
?
        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
?
        //================================================================================================
        final int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount);
        ExecutorService service = Executors.newCachedThreadPool();
        for (long i = 0; i < threadCount; i++) {
            final long threadNum = i;
            service.submit(()-> {
                System.out.printf("Thread %s ready to start!\n", threadNum );
                try {
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
?
                for (int j = 0; j < 2; j++) {
                    final int seq = j;
                    ringBuffer.publishEvent((event, sequence) -> {
                        event.set(seq);
                        System.out.println(threadNum + "线程生产了序号为" + sequence + ",消息为" + seq);
                    });
                }
            });
        }
?
        service.shutdown();
        TimeUnit.SECONDS.sleep(3);
        System.out.println(LongEventHandlerWorker.count);
    }
}

文章来自IT小神博客 http://www.itxiaoshen.cn

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码