欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 文旅 > 旅游 > 数据结构与算法——Java实现 31.阻塞队列

数据结构与算法——Java实现 31.阻塞队列

2025/4/20 20:48:13 来源:https://blog.csdn.net/m0_73983707/article/details/142712648  浏览:    关键词:数据结构与算法——Java实现 31.阻塞队列

                        —— 24.10.8

一、问题提出

目前队列存在的问题

1.很多场景要求分离生产者、消费者两个角色、它们需要由不同的线程来担当,而之前的实现根本没有考虑线程安全问题

2.poll方法,队列为空,那么在之前的实现里会返回null,如果就是硬要拿到一个元素呢?以现在的实现只能不断循环尝试

3.offer方法,队列为满,那么在之前的实现里会返回false,如果就是硬要塞入一个元素呢?以现在的实现只能不断循环尝试

4.指令交错,多个线程会造成混乱效果

二、解决方法

 为解决线程不安全问题,需要给线程加锁,使线程局部阻塞

用条件变量让 poll 或 offer 线程进入等待 状态,而不是不断循环尝试,让CPU空转


三、单锁实现

Java中两种锁的选择

synchronized:关键字,功能少

ReentrantLock:可重入锁,功能丰富

lock() 加锁

unlock() 解锁

lockInterruptibly() 加锁(可在阻塞时打断,提前唤醒


offer方法实现

if判断

问题

tailWaits 中唤醒的线程,会与新来的 offer 的线程争抢锁,谁能抢到是不一定的,如果后者先抢到,就会导致条件又发生变化

这种情况称之为虚假唤醒,唤醒后应该重新检查条件,看是不是得重新进入等待

    public void offer(String e) throws InterruptedException {// 加锁,可重入锁阻塞时可打断方法(可被强制唤醒)lock.lockInterruptibly();try {// 判断是否为满if (isFull()){// 队列满时,使offer线程阻塞,直到poll线程取走后,有位置时再恢复运行// tail.signal() 唤醒线程tailWaits.await();}array[tail] = e;if (++tail == array.length){tail = 0;}size++;}finally {// 解锁lock.unlock();}}
import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class TestThreadUnsafe {private final String[] array = new String[10];// 尾指针private int tail = 0;// 元素个数private int size = 0;// 创建一个可重入锁对象ReentrantLock lock = new ReentrantLock();// 条件变量对象(集合线程)Condition tailWaits = lock.newCondition();public void offer(String e) throws InterruptedException {// 加锁,可重入锁阻塞时可打断方法(可被强制唤醒)lock.lockInterruptibly();try {// 判断是否为满if (isFull()){// 队列满时,使offer线程阻塞,直到poll线程取走后,有位置时再恢复运行// tail.signal() 唤醒线程tailWaits.await();}array[tail] = e;if (++tail == array.length){tail = 0;}size++;}finally {// 解锁lock.unlock();}}public void poll(String e) throws InterruptedException {}@Overridepublic String toString() {return Arrays.toString(array);}private boolean isFull(){return size == array.length;}private boolean isEmpty(){return size == 0;}public static void main(String[] args) throws InterruptedException{TestThreadUnsafe queue = new TestThreadUnsafe();for (int i = 0; i < 10; i++) {queue.offer("e"+i);}new Thread(()->{try {System.out.println(Thread.currentThread().getName()+"添加元素之前");queue.offer("e10");System.out.println(Thread.currentThread().getName()+"添加元素成功");} catch (InterruptedException e) {throw new RuntimeException(e);}},"t1").start();new Thread(()->{System.out.println("开始唤醒");try{queue.lock.lockInterruptibly();queue.tailWaits.signal();} catch (InterruptedException e) {throw new RuntimeException(e);}finally {queue.lock.unlock();}},"t2").start();}
}

while判断 

解决了虚假唤醒的问题

    @Overridepublic void offer(E e) throws InterruptedException {    // poll 等待队列非空lock.lockInterruptibly();try{while (isFull()){// 放在条件变量等待tailWaits.await();}array[tail] = e;if (++tail == array.length){tail = 0;}size++;// 唤醒等待线程headWaits.signal();}finally {lock.unlock();}}
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class BlockingQueue1<E> implements BlockingQueue<E> {private final E[] array;private int head;private int tail;private int size;// 根据容量创造一个数组public BlockingQueue1(int capacity) {array = (E[]) new Object[capacity];}// 加可重入锁private ReentrantLock lock = new ReentrantLock();// 配合poll方法条件变量,在队列头部删除private Condition headWaits = lock.newCondition();// 配合offer方法条件变量,在队列尾部加入private Condition tailWaits = lock.newCondition();// 判空private boolean isEmpty(){return head == tail;}// 判满private boolean isFull(){return size == array.length;}@Overridepublic String toString() {return "array=" + Arrays.toString(array);}@Overridepublic void offer(E e) throws InterruptedException {    // poll 等待队列非空lock.lockInterruptibly();try{while (isFull()){// 放在条件变量等待tailWaits.await();}array[tail] = e;if (++tail == array.length){tail = 0;}size++;// 唤醒等待线程headWaits.signal();}finally {lock.unlock();}}@Overridepublic boolean offer(E e, long timeout) throws InterruptedException {lock.lockInterruptibly();try{// 将毫秒时间转换为纳秒时间long t = TimeUnit.MILLISECONDS.toNanos(timeout);while (isFull()){if (t<=0){return false;}// 最多等待多少纳秒tailWaits.awaitNanos(t);}array[tail] = e;if (++tail == array.length){tail = 0;}size++;// 唤醒等待线程headWaits.signal();return true;}finally {lock.unlock();}}@Overridepublic E poll() throws InterruptedException {lock.lockInterruptibly();try {while (isEmpty()){headWaits.await();}E e = array[head];array[head] = null;if (++head == array.length){head = 0;}size--;tailWaits.signal();return e;}finally {lock.unlock();}}
}

 main函数

public class TestBlockingQueue1 {public static void main(String[] args) throws InterruptedException {BlockingQueue1<String> queue = new BlockingQueue1<>(3);Thread t1 = new Thread(() -> {try {System.out.println(System.currentTimeMillis()+" begin ");queue.offer("任务1");System.out.println(queue);queue.offer("任务2");System.out.println(queue);queue.offer("任务3");System.out.println(queue);queue.offer("任务4",5000);System.out.println(queue);System.out.println(System.currentTimeMillis() + " end ");} catch (InterruptedException e) {throw new RuntimeException(e);}},"生产者");t1.start();Thread.sleep(2000);queue.poll();}
}

 


四、双锁实现

单锁问题:

单锁实现的缺陷:两个线程用了同一把锁,一个执行时,另一个就需阻塞,而offer方法添加元素和poll方法取走元素使用了同一把锁,这样两个线程不能同时执行,两方法相互阻塞

解决方法:

offer方法主要操作尾指针,poll方法主要操作头指针,将offer方法和poll方法分别添加一个锁,用两把锁分别保护头指针和尾指针,从而分别保护offer和poll两个方法

代码实现

import java.util.Arrays;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class BlockingQueue2<E> implements BlockingQueue<E> {private final E[] array;private int head;private int tail;private int size;// tailLock给offer方法入队用,Condition分别创建两个等待的条件变量private ReentrantLock tailLock = new ReentrantLock();private Condition notEmpty = tailLock.newCondition();// headLock给poll方法出队用private ReentrantLock headLock = new ReentrantLock();private Condition notFull = headLock.newCondition();public BlockingQueue2(int capacity) {this.array = (E[]) new Object[capacity];}// 判空private boolean isEmpty(){return size == 0;}// 判满private boolean isFull(){return size == array.length;}@Overridepublic String toString() {return "array=" + Arrays.toString(array);}@Overridepublic void offer(E e) throws InterruptedException {// 加锁tailLock.lockInterruptibly();try{while (isFull()) {notEmpty.await();}array[tail] = e;if (++tail == array.length) {tail = 0;}size++;}finally {tailLock.unlock();}}@Overridepublic boolean offer(E e, long timeout) throws InterruptedException {return false;}@Overridepublic E poll() throws InterruptedException {// 加锁headLock.lockInterruptibly();try{while (isEmpty()){notEmpty.await();}E e = array[head];array[head] = null;if (++head == array.length) {head = 0;}size--;return e;}finally {headLock.unlock();}}public static void main(String[] args) throws InterruptedException {BlockingQueue2<String> queue = new BlockingQueue2<>(3);queue.offer("任务1");new Thread(()->{try {queue.offer("任务2");} catch (InterruptedException e) {throw new RuntimeException(e);}},"offer").start();new Thread(()->{try {System.out.println(queue.poll());} catch (InterruptedException e) {throw new RuntimeException(e);}},"poll").start();}
}

size自增/自减问题

size的自增自减不能保障安全,size自增自减在多个线程同时执行时可能遇到冲突

解决方法

用原子变量AtomicInteger类型保证安全

getAndIncrement 自增方法,能保证线程安全

getAndDecrement 自减方法,能保证线程安全

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词