背景:Condition 多线程条件并发控制,与Lock配合可以实现等待/通知模式;
1 condition 使用demo(生产者与消费者模型):
package org.lgx.bluegrass.bluegrasscoree.util.testcondition;import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;/*** @Description TODO* @Date 2022/11/25 16:19* @Author lgx* @Version 1.0*/
public class TestCondition {public static void main(String[] args) {// 声明一把lock锁Lock lock = new ReentrantLock();// 声明队列不为空的条件Condition notEmpty = lock.newCondition();// 声明队列不满的条件Condition notFull = lock.newCondition();// 声明队列的最大长度int maxSize = 10;List msg = new ArrayList<>();// 构造生产者Producer producer = new Producer(msg, lock, notEmpty, notFull, maxSize);// 构造消费者Consumer Consumer = new Consumer(msg, lock, notEmpty, notFull, maxSize);new Thread(producer).start();new Thread(Consumer).start();}}
// 生产者
class Producer implements Runnable {private List msg;private Lock lock;private Condition notEmpty;private Condition notFull;private Integer maxSize;public Producer(List msg, Lock lock, Condition notEmpty, Condition notFull, Integer maxSize) {this.msg = msg;this.lock = lock;this.notEmpty = notEmpty;this.notFull = notFull;this.maxSize = maxSize;}/*** 生产者产生数据模型**/@Overridepublic void run() {while (true) {// 获取lock 锁,如果获取失败则进入到AQS 同步阻塞队列(双向队列)lock.lock();try {while (msg.size() >= maxSize) {// 消息已满-- 需要阻塞System.out.println(" 消息已满-- 需要阻塞");notFull.await();}String msgStr = "写入消息" + UUID.randomUUID();msg.add(msgStr);System.out.println(msgStr);Thread.sleep(1000);// 生产者产生消息后通知对应的消费者notEmpty.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {// 生产者释放锁lock.unlock();}}}}
/*** 消费者产生数据模型**/
class Consumer implements Runnable {private List msg;private Lock lock;private Condition notEmpty;private Condition notFull;private Integer maxSize;public Consumer(List msg, Lock lock, Condition notEmpty, Condition notFull, Integer maxSize) {this.msg = msg;this.lock = lock;this.notEmpty = notEmpty;this.notFull = notFull;this.maxSize = maxSize;}@Overridepublic void run() {while (true) {// 获取lock 锁,如果获取失败则进入到AQS 同步阻塞队列(双向队列)lock.lock();try {while (msg.isEmpty()) {// 消息队列为空-- 需要阻塞System.out.println("消息队列为空-- 需要阻塞:");notEmpty.await();}System.out.println("获取消息:" + msg.get(0));msg.remove(0);Thread.sleep(1000);// 消费者消费消息后通知对应的生产者notFull.signal();} catch (InterruptedException e) {e.printStackTrace();} finally {// 消费者释放锁lock.unlock();}}}}
2 生产者与消费者模型过程分析:
线程获取锁的过程,参考:JAVA并发编程–4.1理解ReentrantLock
2.1 生产者获取lock 锁, 生产消息,当队列满时,调用awaitt() 释放当前线程持有的锁,并阻塞当前线程:
AbstractQueuedSynchronizer.await():
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 构建Condition单向链表,将当前节点加入到此单向链表中Node node = addConditionWaiter();// // 完全释放锁,返回当前线程对锁的重入次数int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {// 如果当前node 节点只在Condition单向链表 不在AQS 同步阻塞队列中,则返回false,进入此while 循环LockSupport.park(this);// 挂起档当前的线程if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;.// 当前线程中断则跳出循环}// 在AQS 同步队列中唤醒的node 节点去抢占锁if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();// 将Condition单向链表中年已经是取消状态的线程从队列中剔除if (interruptMode != 0)reportInterruptAfterWait(interruptMode);// 线程中断标记}
addConditionWaiter:
/*** Adds a new waiter to wait queue.* @return its new wait node*/private Node addConditionWaiter() {Node t = lastWaiter;// 最后一个等待节点 初始为null,后续线程进入时 t指向行单向链表的尾节点// If lastWaiter is cancelled, clean out.if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();// 清除失效节点t = lastWaiter;}// 构建一个新的节点 static final int CONDITION = -2;Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)// 第一次 t 为nullfirstWaiter = node;// firstWaiter指针指向新创建的nodeelse // 尾节点的下一节点指向新创建的node 节点;即将 Node 节点加入到单向链表中t.nextWaiter = node;lastWaiter = node;// lastWaiter 指针指向新创建的nodereturn node;}
第一次:ThreadA(单向链表构建示意)

第二个ThreadB(单向链表构建示意)

fullyRelease 完全释放锁 :
final int fullyRelease(Node node) {boolean failed = true;try {// 获取当前lock 的state (锁的次数)int savedState = getState();if (release(savedState)) {{// 释放锁failed = false;// 释放锁成功,失败标识置为falsereturn savedState;} else {// 释放失败抛出异常throw new IllegalMonitorStateException();}} finally {if (failed)// 如果释放锁失败,则证明释放锁过程中线程出现异常node.waitStatus = Node.CANCELLED;// 将当前condition 单向链表中的改节点置为取消状态}}
release(int arg):
public final boolean release(int arg) {if (tryRelease(arg)) {// 释放锁成功Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);// 唤醒AQS 中的头部节点去抢占锁return true;}return false;}
unparkSuccessor:
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);// 唤醒线程}
isOnSyncQueue:是否在AQS同步双向链表中:
/*** Returns true if a node, always one that was initially placed on* a condition queue, is now waiting to reacquire on sync queue.* @param node the node* @return true if is reacquiring*/final boolean isOnSyncQueue(Node node) {if (node.waitStatus == Node.CONDITION || node.prev == null)return false;// 当前节点的waitStatus 是CONDITION 或者当前节点的前置节点为空则标明在Condition 单向链表中if (node.next != null) // If has successor, it must be on queue 不在Condition 单向链表中 已定在AQS队列中return true;// 挡圈节点不为尾节点返回true/** node.prev can be non-null, but not yet on queue because* the CAS to place it on queue can fail. So we have to* traverse from tail to make sure it actually made it. It* will always be near the tail in calls to this method, and* unless the CAS failed (which is unlikely), it will be* there, so we hardly ever traverse much.*/return findNodeFromTail(node);}
findNodeFromTail 遍历AQS队列 寻找node 节点:
/*** Returns true if node is on sync queue by searching backwards from tail.* Called only when needed by isOnSyncQueue.* @return true if present*/private boolean findNodeFromTail(Node node) {Node t = tail;for (;;) {if (t == node)return true;if (t == null)return false;t = t.prev;}}
acquireQueued(node, savedState) 当前线程获取锁:
/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {// 是否中断标识boolean interrupted = false;for (;;) {// 当前节点的前置节点是头结点,则尝试去获取锁final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {// 获取锁成功从AQS中移除改node 节点setHead(node);p.next = null; // help GCfailed = false;return interrupted;}// 抢占不到锁则挂起当前线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);// 从AQS 中移除失效节点}}
setHead(node):
private void setHead(Node node) {head = node;node.thread = null;node.prev = null;}
2.2 消费者获取lock 锁 ,在消费消息后,调用signal() 唤醒生产者:
消费者获取lock 锁, 消费消息,当队列为空时,也会调用awaitt() 释放当前线程持有的锁,并阻塞当前线程:
AbstractQueuedSynchronizer:
signal() 将当前condition队列中的一个头部元素转移至AQS队列中:
/*** Moves the longest-waiting thread, if one exists, from the* wait queue for this condition to the wait queue for the* owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}* returns {@code false}*/public final void signal() {if (!isHeldExclusively())// 如果当前线程没有获取锁则抛出异常throw new IllegalMonitorStateException();Node first = firstWaiter;// 获取condition队列中的头部节点if (first != null)doSignal(first);// 转移改节点至AQS队列}
doSignal(Node first):
/*** Removes and transfers nodes until hit non-cancelled one or* null. Split out from signal in part to encourage compilers* to inline the case of no waiters.* @param first (non-null) the first node on condition queue*/private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)// condition队列中只有一个节点lastWaiter = null;first.nextWaiter = null;// 从condition队列中移除改node 节点} while (!transferForSignal(first) &&(first = firstWaiter) != null);}
transferForSignal(first):
/*** Transfers a node from a condition queue onto sync queue.* Returns true if successful.* @param node the node* @return true if successfully transferred (else the node was* cancelled before signal)*/final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/// 设置node 的waitstate为0,设置失败意味改线程已经被取消if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/// 将当前node 加入到同步阻塞队列中并返回之前AQS 中tail 节点Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))// 如果waitStatus >0 (线程取消状态);或者设置node 的waitStatus 为SIGNAL 失败时 则唤醒之前AQS 中tail 节点线程;LockSupport.unpark(node.thread);// 优化方式此时唤醒可以使得AQS队列中及时的清除失效节点
消费者线程调用unlock() 方法从AQS 队列中唤醒线程去抢占锁。
3 await 和signal 过程:
(1)生产者(Producer ) 线程A ,线程B,去抢占锁;线程A获取到锁,线程B没有抢占到锁则进入AQS 队列;消费者线程C 没有抢占到锁则进入AQS 队列;
(2)线程A 执行任务后调用signal()/signalAll();此时condition 队列中中没有元素;
(3)线程A 在执行任务过程中,达到一定条件,则调用await() 方法;将当前的node 节点(new Node(Thread.currentThread(), Node.CONDITION))放入到condition 单向队列中;并且完全释放锁,并且挂起当前的线程;并且从AQS 同步队列中唤醒一个加入时间最早的Node去抢占锁;
(4)线程B 抢占到锁同线程A一样,在达到一定条件,则调用await() 方法;将当前的node 节点(new Node(Thread.currentThread(), Node.CONDITION))放入到condition 单向队列中;并且完全释放锁,并且挂起当前的线程;并且从AQS 同步队列中唤醒一个加入时间最早的Node去抢占锁;
(5)线程C(消费者) 抢占到锁,消费信息后,调用用signal()/signalAll();将位于condition 单向链表中的Node 一个/全部节点转移到AQS 队列中;
(6)线程C(消费者) 业务完成调unlock() 方法,从从AQS 同步队列中唤醒一个加入时间最早的Node去抢占锁;
(7)线程A(生产者) 抢占锁,如果抢占到锁则进行执行任务,抢占不到锁则被park,挂起当前线程,等锁的抢占;