JAVA并发编程--4.1理解Condition
创始人
2024-02-23 20:58:09
0

背景:Condition 多线程条件并发控制,与Lock配合可以实现等待/通知模式;

1 condition 使用demo(生产者与消费者模型):

package org.lgx.bluegrass.bluegrasscoree.util.testcondition;import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** @Description TODO* @Date 2022/11/25 16:19* @Author lgx* @Version 1.0*/
public class TestCondition {public static void main(String[] args) {// 声明一把lock锁Lock lock = new ReentrantLock();// 声明队列不为空的条件Condition notEmpty = lock.newCondition();// 声明队列不满的条件Condition notFull = lock.newCondition();// 声明队列的最大长度int maxSize = 10;List msg = new ArrayList<>();// 构造生产者Producer producer = new Producer(msg, lock, notEmpty, notFull, maxSize);// 构造消费者Consumer Consumer = new Consumer(msg, lock, notEmpty, notFull, maxSize);new Thread(producer).start();new Thread(Consumer).start();}}
// 生产者
class Producer implements Runnable {private List msg;private Lock lock;private Condition notEmpty;private Condition notFull;private Integer maxSize;public Producer(List msg, Lock lock, Condition notEmpty, Condition notFull, Integer maxSize) {this.msg = msg;this.lock = lock;this.notEmpty = notEmpty;this.notFull = notFull;this.maxSize = maxSize;}/*** 生产者产生数据模型**/@Overridepublic void run() {while (true) {// 获取lock 锁,如果获取失败则进入到AQS 同步阻塞队列(双向队列)lock.lock();try {while (msg.size() >= maxSize) {// 消息已满-- 需要阻塞System.out.println(" 消息已满-- 需要阻塞");notFull.await();}String msgStr = "写入消息" + UUID.randomUUID();msg.add(msgStr);System.out.println(msgStr);Thread.sleep(1000);// 生产者产生消息后通知对应的消费者notEmpty.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {// 生产者释放锁lock.unlock();}}}}
/*** 消费者产生数据模型**/
class Consumer implements Runnable {private List msg;private Lock lock;private Condition notEmpty;private Condition notFull;private Integer maxSize;public Consumer(List msg, Lock lock, Condition notEmpty, Condition notFull, Integer maxSize) {this.msg = msg;this.lock = lock;this.notEmpty = notEmpty;this.notFull = notFull;this.maxSize = maxSize;}@Overridepublic void run() {while (true) {// 获取lock 锁,如果获取失败则进入到AQS 同步阻塞队列(双向队列)lock.lock();try {while (msg.isEmpty()) {// 消息队列为空-- 需要阻塞System.out.println("消息队列为空-- 需要阻塞:");notEmpty.await();}System.out.println("获取消息:" + msg.get(0));msg.remove(0);Thread.sleep(1000);// 消费者消费消息后通知对应的生产者notFull.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {// 消费者释放锁lock.unlock();}}}}

2 生产者与消费者模型过程分析:
线程获取锁的过程,参考:JAVA并发编程–4.1理解ReentrantLock
2.1 生产者获取lock 锁, 生产消息,当队列满时,调用awaitt() 释放当前线程持有的锁,并阻塞当前线程:
AbstractQueuedSynchronizer.await():

 public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 构建Condition单向链表,将当前节点加入到此单向链表中Node node = addConditionWaiter();//  // 完全释放锁,返回当前线程对锁的重入次数int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {// 如果当前node 节点只在Condition单向链表 不在AQS 同步阻塞队列中,则返回false,进入此while 循环LockSupport.park(this);// 挂起档当前的线程if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;.// 当前线程中断则跳出循环}//  在AQS 同步队列中唤醒的node 节点去抢占锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();//  将Condition单向链表中年已经是取消状态的线程从队列中剔除if (interruptMode != 0)reportInterruptAfterWait(interruptMode);// 线程中断标记}

addConditionWaiter:

 /*** Adds a new waiter to wait queue.* @return its new wait node*/private Node addConditionWaiter() {Node t = lastWaiter;// 最后一个等待节点 初始为null,后续线程进入时 t指向行单向链表的尾节点// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();// 清除失效节点t = lastWaiter;}// 构建一个新的节点 static final int CONDITION = -2;Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)// 第一次 t 为nullfirstWaiter = node;// firstWaiter指针指向新创建的nodeelse // 尾节点的下一节点指向新创建的node 节点;即将 Node 节点加入到单向链表中t.nextWaiter = node;lastWaiter = node;// lastWaiter 指针指向新创建的nodereturn node;}

第一次:ThreadA(单向链表构建示意)
在这里插入图片描述
第二个ThreadB(单向链表构建示意)
在这里插入图片描述
fullyRelease 完全释放锁 :

 final int fullyRelease(Node node) {boolean failed = true;try {// 获取当前lock 的state (锁的次数)int savedState = getState();if (release(savedState)) {{// 释放锁failed = false;// 释放锁成功,失败标识置为falsereturn savedState;} else {// 释放失败抛出异常throw new IllegalMonitorStateException();}} finally {if (failed)// 如果释放锁失败,则证明释放锁过程中线程出现异常node.waitStatus = Node.CANCELLED;// 将当前condition 单向链表中的改节点置为取消状态}}

release(int arg):

public final boolean release(int arg) {if (tryRelease(arg)) {// 释放锁成功Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);// 唤醒AQS 中的头部节点去抢占锁return true;}return false;}

unparkSuccessor:

 private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling.  It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);// 唤醒线程}

isOnSyncQueue:是否在AQS同步双向链表中:

/*** Returns true if a node, always one that was initially placed on* a condition queue, is now waiting to reacquire on sync queue.* @param node the node* @return true if is reacquiring*/final boolean isOnSyncQueue(Node node) {if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 当前节点的waitStatus  是CONDITION  或者当前节点的前置节点为空则标明在Condition 单向链表中if (node.next != null) // If has successor, it must be on queue 不在Condition 单向链表中 已定在AQS队列中return true;// 挡圈节点不为尾节点返回true/** node.prev can be non-null, but not yet on queue because* the CAS to place it on queue can fail. So we have to* traverse from tail to make sure it actually made it.  It* will always be near the tail in calls to this method, and* unless the CAS failed (which is unlikely), it will be* there, so we hardly ever traverse much.*/return findNodeFromTail(node);}

findNodeFromTail 遍历AQS队列 寻找node 节点:

 /*** Returns true if node is on sync queue by searching backwards from tail.* Called only when needed by isOnSyncQueue.* @return true if present*/private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}

acquireQueued(node, savedState) 当前线程获取锁:

 /*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 是否中断标识boolean interrupted = false;for (;;) {// 当前节点的前置节点是头结点,则尝试去获取锁final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {// 获取锁成功从AQS中移除改node 节点setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 抢占不到锁则挂起当前线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);// 从AQS 中移除失效节点}}

setHead(node):

 private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}

2.2 消费者获取lock 锁 ,在消费消息后,调用signal() 唤醒生产者:
消费者获取lock 锁, 消费消息,当队列为空时,也会调用awaitt() 释放当前线程持有的锁,并阻塞当前线程:
AbstractQueuedSynchronizer:
signal() 将当前condition队列中的一个头部元素转移至AQS队列中:

/*** Moves the longest-waiting thread, if one exists, from the* wait queue for this condition to the wait queue for the* owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}*         returns {@code false}*/public final void signal() {if (!isHeldExclusively())// 如果当前线程没有获取锁则抛出异常throw new IllegalMonitorStateException();Node first = firstWaiter;// 获取condition队列中的头部节点if (first != null)doSignal(first);// 转移改节点至AQS队列}

doSignal(Node first):

 /*** Removes and transfers nodes until hit non-cancelled one or* null. Split out from signal in part to encourage compilers* to inline the case of no waiters.* @param first (non-null) the first node on condition queue*/private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)// condition队列中只有一个节点lastWaiter = null;first.nextWaiter = null;// 从condition队列中移除改node 节点} while (!transferForSignal(first) &&(first = firstWaiter) != null);}

transferForSignal(first):

 /*** Transfers a node from a condition queue onto sync queue.* Returns true if successful.* @param node the node* @return true if successfully transferred (else the node was* cancelled before signal)*/final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/// 设置node 的waitstate为0,设置失败意味改线程已经被取消if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/// 将当前node 加入到同步阻塞队列中并返回之前AQS 中tail 节点Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))// 如果waitStatus  >0 (线程取消状态);或者设置node 的waitStatus   为SIGNAL 失败时 则唤醒之前AQS 中tail 节点线程;LockSupport.unpark(node.thread);// 优化方式此时唤醒可以使得AQS队列中及时的清除失效节点

消费者线程调用unlock() 方法从AQS 队列中唤醒线程去抢占锁。

3 await 和signal 过程:
(1)生产者(Producer ) 线程A ,线程B,去抢占锁;线程A获取到锁,线程B没有抢占到锁则进入AQS 队列;消费者线程C 没有抢占到锁则进入AQS 队列;
(2)线程A 执行任务后调用signal()/signalAll();此时condition 队列中中没有元素;
(3)线程A 在执行任务过程中,达到一定条件,则调用await() 方法;将当前的node 节点(new Node(Thread.currentThread(), Node.CONDITION))放入到condition 单向队列中;并且完全释放锁,并且挂起当前的线程;并且从AQS 同步队列中唤醒一个加入时间最早的Node去抢占锁;
(4)线程B 抢占到锁同线程A一样,在达到一定条件,则调用await() 方法;将当前的node 节点(new Node(Thread.currentThread(), Node.CONDITION))放入到condition 单向队列中;并且完全释放锁,并且挂起当前的线程;并且从AQS 同步队列中唤醒一个加入时间最早的Node去抢占锁;
(5)线程C(消费者) 抢占到锁,消费信息后,调用用signal()/signalAll();将位于condition 单向链表中的Node 一个/全部节点转移到AQS 队列中;
(6)线程C(消费者) 业务完成调unlock() 方法,从从AQS 同步队列中唤醒一个加入时间最早的Node去抢占锁;
(7)线程A(生产者) 抢占锁,如果抢占到锁则进行执行任务,抢占不到锁则被park,挂起当前线程,等锁的抢占;

相关内容

热门资讯

米兰:警方逮捕一名在地铁站尾随... 据意大利媒体报道,奋斗在意大利综合编译:近日,意大利宪兵破获两起针对未成年女孩的性侵案件,逮捕了一名...
广州海事法院成功执行和解一宗船... “谢谢田法官,这面锦旗我终于送到了。”近日,60多岁的刘奶奶专门来到广州海事法院,将一面印着“秉公执...
每年最高节税5400元!还有这... 每年最多省税5400元,国家鼓励你开一个“养老小金库”!注意,2025年度缴费截至12月31日! 你...
影院取消放映场,镇江消协调解:... 扬子晚报网12月20日讯(通讯员 陈红生 记者 万凌云 姜天圣)近日,消费者董女士投诉反映,在镇江一...
山西运城公安侦破两起燃气公司员... 又到一年取暖季,千家万户燃起天然气化作一股股暖流,守护着冬日里的烟火气。然而,在利益的驱使下,加之法...
120万的保时捷卡宴只卖60万... 12月18日,海南正式宣布封关,随着海南自贸港“零关税”进口车政策正式落地,很多网友发现,海南的进口...
海南封关,税收政策有哪些变化 12月18日,海南正式实施全岛封关。全岛封关运作是海南自贸港建设的标志性工程,是进一步扩大开放的重要...
六代刑侦人十八年追凶 犯罪嫌疑... 寇松 中青报·中青网记者 胡宁 初冬的深夜,首都机场公安局办公大楼前,20多个身影注视着大门的方向。...
关注增值税!2026年继续执行... 2026年执行的增值税优惠政策 基于行政行为的公定力、确定力、拘束力和执行力等四效力原则,税海涛声认...
辅警工作近6年因有纹身被辞退,... 红星新闻记者从一审判决书中看到,原告刘某在诉讼中称,自己于2019年9月入职被告单位,任警务辅助人员...