线程间通信(线程间的通信方式三种)
线程间通信(线程间的通信方式三种)
前言
开发中不免会遇到需要所有子线程执行完毕通知主线程处理某些逻辑的场景。
或者是线程 A 在执行到某个条件通知线程 B 执行某个操作。
可以通过以下几种方式实现:等待通知机制
等待通知模式是 Java 中比较经典的线程通信方式。
两个线程通过对同一对象调用等待 wait() 和通知 notify() 方法来进行通讯。
如两个线程交替打印奇偶数:publicclassTwoThreadWaitNotify{privateintstart=1;privatebooleanflag=false;publicstaticvoidmain(String[]args){ TwoThreadWaitNotifytwoThread=newTwoThreadWaitNotify(); Threadt1=newThread(newOuNum(twoThread)); t1.setName("A"); Threadt2=newThread(newJiNum(twoThread)); t2.setName("B"); t1.start(); t2.start(); }/** *偶数线程 */ publicstaticclassOuNumimplementsRunnable{privateTwoThreadWaitNotifynumber;publicOuNum(TwoThreadWaitNotifynumber){this.number=number; }@Override publicvoidrun(){while(number.start<=100){synchronized(TwoThreadWaitNotify.class){ System.out.println("偶数线程抢到锁了");if(number.flag){ System.out.println(Thread.currentThread().getName()+"+-+偶数"+number.start); number.start++; number.flag=false; TwoThreadWaitNotify.class.notify(); }else{try{ TwoThreadWaitNotify.class.wait(); }catch(InterruptedExceptione){ e.printStackTrace(); } } } } } }/** *奇数线程 */ publicstaticclassJiNumimplementsRunnable{privateTwoThreadWaitNotifynumber;publicJiNum(TwoThreadWaitNotifynumber){this.number=number; }@Override publicvoidrun(){while(number.start<=100){synchronized(TwoThreadWaitNotify.class){ System.out.println("奇数线程抢到锁了");if(!number.flag){ System.out.println(Thread.currentThread().getName()+"+-+奇数"+number.start); number.start++; number.flag=true; TwoThreadWaitNotify.class.notify(); }else{try{ TwoThreadWaitNotify.class.wait(); }catch(InterruptedExceptione){ e.printStackTrace(); } } } } } } }
输出结果:t2+-+奇数93 t1+-+偶数94 t2+-+奇数95 t1+-+偶数96 t2+-+奇数97 t1+-+偶数98 t2+-+奇数99 t1+-+偶数100
这里的线程 A 和线程 B 都对同一个对象 TwoThreadWaitNotify.class 获取锁,A 线程调用了同步对象的 wait() 方法释放了锁并进入 WAITING 状态。
B 线程调用了 notify() 方法,这样 A 线程收到通知之后就可以从 wait() 方法中返回。
这里利用了 TwoThreadWaitNotify.class 对象完成了通信。
有一些需要注意:
wait() 、notify()、notifyAll() 调用的前提都是获得了对象的锁(也可称为对象监视器)。
调用 wait() 方法后线程会释放锁,进入 WAITING 状态,该线程也会被移动到等待队列中。
调用 notify() 方法会将等待队列中的线程移动到同步队列中,线程状态也会更新为 BLOCKED
从 wait() 方法返回的前提是调用 notify() 方法的线程释放锁,wait() 方法的线程获得锁。
等待通知有着一个经典范式:
线程 A 作为消费者:
获取对象的锁。
进入 while(判断条件),并调用 wait() 方法。
当条件满足跳出循环执行具体处理逻辑。
线程 B 作为生产者:
获取对象锁。
更改与线程 A 共用的判断条件。
调用 notify() 方法。
伪代码如下://ThreadAsynchronized(Object){while(条件){Object.wait(); }//dosomething}//ThreadBsynchronized(Object){ 条件=false;//改变条件 Object.notify(); }join() 方法privatestaticvoidjoin()throwsInterruptedException{ Threadt1=newThread(newRunnable(){@Override publicvoidrun(){ LOGGER.info("running");try{ Thread.sleep(3000); }catch(InterruptedExceptione){ e.printStackTrace(); } } }); Threadt2=newThread(newRunnable(){@Override publicvoidrun(){ LOGGER.info("running2");try{ Thread.sleep(4000); }catch(InterruptedExceptione){ e.printStackTrace(); } } }); t1.start(); t2.start();//等待线程1终止 t1.join();//等待线程2终止 t2.join(); LOGGER.info("mainover"); }
输出结果:2018-03-1620:21:30.967[Thread-1]INFOc.c.actual.ThreadCommunication-running22018-03-1620:21:30.967[Thread-0]INFOc.c.actual.ThreadCommunication-running2018-03-1620:21:34.972[main]INFOc.c.actual.ThreadCommunication-mainover
在 t1.join() 时会一直阻塞到 t1 执行完毕,所以最终主线程会等待 t1 和 t2 线程执行完毕。
其实从源码可以看出,join() 也是利用的等待通知机制:
核心逻辑:while(isAlive()){wait(0); }
在 join 线程完成后会调用 notifyAll() 方法,是在 JVM 实现中调用,所以这里看不出来。volatile 共享内存
因为 Java 是采用共享内存的方式进行线程通信的,所以可以采用以下方式用主线程关闭 A 线程:publicclassVolatileimplementsRunnable{privatestaticvolatilebooleanflag=true;@Override publicvoidrun(){while(flag){ System.out.println(Thread.currentThread().getName()+"正在运行…"); } System.out.println(Thread.currentThread().getName()+"执行完毕"); }publicstaticvoidmain(String[]args)throwsInterruptedException{ VolatileaVolatile=newVolatile();newThread(aVolatile,"threadA").start(); System.out.println("main线程正在运行"); TimeUnit.MILLISECONDS.sleep(100); aVolatile.stopThread(); }privatevoidstopThread(){ flag=false; } }
输出结果:threadA正在运行…threadA正在运行…threadA正在运行…threadA正在运行…threadA执行完毕
这里的 flag 存放于主内存中,所以主线程和线程 A 都可以看到。
flag 采用 volatile 修饰主要是为了内存可见性,更多内容可以查看这里。CountDownLatch 并发工具
CountDownLatch 可以实现 join 相同的功能,但是更加的灵活。privatestaticvoidcountDownLatch()throwsException{intthread=3;longstart=System.currentTimeMillis();finalCountDownLatchcountDown=newCountDownLatch(thread);for(inti=0;i<thread;i++){newThread(newRunnable(){@Override publicvoidrun(){ LOGGER.info("threadrun");try{ Thread.sleep(2000); countDown.countDown(); LOGGER.info("threadend"); }catch(InterruptedExceptione){ e.printStackTrace(); } } }).start(); } countDown.await();longstop=System.currentTimeMillis(); LOGGER.info("mainovertotaltime={}",stop-start); }
输出结果:2018-03-1620:19:44.126[Thread-0]INFOc.c.actual.ThreadCommunication-threadrun2018-03-1620:19:44.126[Thread-2]INFOc.c.actual.ThreadCommunication-threadrun2018-03-1620:19:44.126[Thread-1]INFOc.c.actual.ThreadCommunication-threadrun2018-03-1620:19:46.136[Thread-2]INFOc.c.actual.ThreadCommunication-threadend2018-03-1620:19:46.136[Thread-1]INFOc.c.actual.ThreadCommunication-threadend2018-03-1620:19:46.136[Thread-0]INFOc.c.actual.ThreadCommunication-threadend2018-03-1620:19:46.136[main]INFOc.c.actual.ThreadCommunication-mainovertotaltime=2012
CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 实现的,更多实现参考 ReentrantLock 实现原理
初始化一个 CountDownLatch 时告诉并发的线程,然后在每个线程处理完毕之后调用 countDown() 方法。
该方法会将 AQS 内置的一个 state 状态 -1 。
最终在主线程调用 await() 方法,它会阻塞直到 state == 0 的时候返回。CyclicBarrier 并发工具privatestaticvoidcyclicBarrier()throwsException{ CyclicBarriercyclicBarrier=newCyclicBarrier(3);newThread(newRunnable(){@Override publicvoidrun(){ LOGGER.info("threadrun");try{ cyclicBarrier.await(); }catch(Exceptione){ e.printStackTrace(); } LOGGER.info("threadenddosomething"); } }).start();newThread(newRunnable(){@Override publicvoidrun(){ LOGGER.info("threadrun");try{ cyclicBarrier.await(); }catch(Exceptione){ e.printStackTrace(); } LOGGER.info("threadenddosomething"); } }).start();newThread(newRunnable(){@Override publicvoidrun(){ LOGGER.info("threadrun");try{ Thread.sleep(5000); cyclicBarrier.await(); }catch(Exceptione){ e.printStackTrace(); } LOGGER.info("threadenddosomething"); } }).start(); LOGGER.info("mainthread"); }
CyclicBarrier 中文名叫做屏障或者是栅栏,也可以用于线程间通信。
它可以等待 N 个线程都达到某个状态后继续运行的效果。
首先初始化线程参与者。
调用 await() 将会在所有参与者线程都调用之前等待。
直到所有参与者都调用了 await() 后,所有线程从 await() 返回继续后续逻辑。
运行结果:2018-03-1822:40:00.731[Thread-0]INFOc.c.actual.ThreadCommunication-threadrun2018-03-1822:40:00.731[Thread-1]INFOc.c.actual.ThreadCommunication-threadrun2018-03-1822:40:00.731[Thread-2]INFOc.c.actual.ThreadCommunication-threadrun2018-03-1822:40:00.731[main]INFOc.c.actual.ThreadCommunication-mainthread2018-03-1822:40:05.741[Thread-0]INFOc.c.actual.ThreadCommunication-threadenddosomething2018-03-1822:40:05.741[Thread-1]INFOc.c.actual.ThreadCommunication-threadenddosomething2018-03-1822:40:05.741[Thread-2]INFOc.c.actual.ThreadCommunication-threadenddosomething
可以看出由于其中一个线程休眠了五秒,所有其余所有的线程都得等待这个线程调用 await() 。
该工具可以实现 CountDownLatch 同样的功能,但是要更加灵活。甚至可以调用 reset() 方法重置 CyclicBarrier (需要自行捕获 BrokenBarrierException 处理) 然后重新执行。线程响应中断publicclassStopThreadimplementsRunnable{@Override publicvoidrun(){while(!Thread.currentThread().isInterrupted()){//线程执行具体逻辑 System.out.println(Thread.currentThread().getName()+"运行中…"); } System.out.println(Thread.currentThread().getName()+"退出…"); }publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadthread=newThread(newStopThread(),"threadA"); thread.start(); System.out.println("main线程正在运行"); TimeUnit.MILLISECONDS.sleep(10); thread.interrupt(); } }
输出结果:threadA运行中…threadA运行中…threadA退出…
可以采用中断线程的方式来通信,调用了 thread.interrupt() 方法其实就是将 thread 中的一个标志属性置为了 true。
并不是说调用了该方法就可以中断线程,如果不对这个标志进行响应其实是没有什么作用(这里对这个标志进行了判断)。
但是如果抛出了 InterruptedException 异常,该标志就会被 JVM 重置为 false。线程池 awaitTermination() 方法
如果是用线程池来管理线程,可以使用以下方式来让主线程等待线程池中所有任务执行完毕:privatestaticvoidexecutorService()throwsException{ BlockingQueue<Runnable>queue=newLinkedBlockingQueue<>(10); ThreadPoolExecutorpoolExecutor=newThreadPoolExecutor(5,5,1,TimeUnit.MILLISECONDS,queue); poolExecutor.execute(newRunnable(){@Override publicvoidrun(){ LOGGER.info("running");try{ Thread.sleep(3000); }catch(InterruptedExceptione){ e.printStackTrace(); } } }); poolExecutor.execute(newRunnable(){@Override publicvoidrun(){ LOGGER.info("running2");try{ Thread.sleep(2000); }catch(InterruptedExceptione){ e.printStackTrace(); } } }); poolExecutor.shutdown();while(!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){ LOGGER.info("线程还在执行…"); } LOGGER.info("mainover"); }
求全责备的意思(那些最容易用错的成语,你中招了吗?)求全责备的意思(那些最容易用错的成语,你中招了吗?)几个最容易用错的成语,你中招了吗?首当其冲,危言危行,炙手可热,耳提面命,名噪一时看图有曾经被你错用过的成语吗?登堂入室释义比喻
舌系带手术(舌系带手术有没有必要?)舌系带手术(舌系带手术有没有必要?)收到一个问题,问,现在好多人给孩子做舌系带手术,听说50以上都是多此一举,是真的吗?舌系带俗称舌筋,即孩子张开口翘起舌头时在舌和口底之间的一个薄
石柱南宾中学(石柱县南宾中学好不好)石柱南宾中学(石柱县南宾中学好不好)用户88416043927479202105300929好消息南宾中学迁建甄子坪新区项目正式启动!!!近日,网友让青春暂放发布一则短视频石柱南宾
数码知识RenoAce怎么打开开发者选项开发者选项在哪里打开如今使用IT数码设备的小伙伴们是越来越多了,那么IT数码设备当中是有很多小技巧的,这些技巧很多小伙伴一般都是不知道如何来实用的,就好比最近就有很多小伙伴们想要知道RenoAce怎么
数码知识小米手机还原设置在哪怎么一键还原如今使用IT数码设备的小伙伴们是越来越多了,那么IT数码设备当中是有很多小技巧的,这些技巧很多小伙伴一般都是不知道如何来实用的,就好比最近就有很多小伙伴们想要知道小米手机还原设置在
数码知识小米cc9pro有呼吸灯吗呼吸灯在哪如今使用IT数码设备的小伙伴们是越来越多了,那么IT数码设备当中是有很多小技巧的,这些技巧很多小伙伴一般都是不知道如何来实用的,就好比最近就有很多小伙伴们想要知道小米cc9pro有
急诊科女超人于莺(急诊女超人于莺去哪了)急诊科女超人于莺(急诊女超人于莺去哪了)创36氪2018112810331111月28日,企鹅杏仁集团官方宣布,正式签订收购于莺科技及其北京水岸祐邻诊所,同时创始人于莺及其团队加入
身体塑性(塑形怎么做)身体塑性(塑形怎么做)随着年龄的增长,到了30岁以后肌肉就会以不同的速度流失,而到40岁以后肌肉流失的速度就会更快,如果我们不进行积极的干预,就会在中年以后倾向于变胖,身材也会慢慢
盗墓电影大全(15部盗墓题材电影推荐)盗墓电影大全(15部盗墓题材电影推荐)夺宝奇兵1夺宝奇兵(1981史蒂文斯皮尔伯格)如今看来,夺宝奇兵的种种特效已经非常小儿科了,但这却不能掩盖它的经典。夺宝奇兵可谓是一部完美的冒
怎么加小三对方会通过(加小三的微信该怎么和她聊天)怎么加小三对方会通过(加小三的微信该怎么和她聊天)玛丽莲。梦露曾说道你可以拥有一切,但不能同时。那些在妻子面前接电话嗯嗯啊啊遮遮掩掩时不时地笑出声来大半夜抱着手机不睡觉的男人百分百
彩妆技巧(化妆技巧步骤)彩妆技巧(化妆技巧步骤)MIMICOCO20210518115156我们知道,化妆是一门艺术,很多化妆老手都算不上高手,化妆时经常出错,对于新手来说,根本就是一件非常困难的事,很容