欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > SynchronousQueue 与 LinkedBlockingQueue区别及应用场景

SynchronousQueue 与 LinkedBlockingQueue区别及应用场景

2025/2/5 11:27:13 来源:https://blog.csdn.net/cl939974883/article/details/145433308  浏览:    关键词:SynchronousQueue 与 LinkedBlockingQueue区别及应用场景

文章目录

  • 前言
  • 认识SynchronousQueue
  • 基本对比及比较
    • 1. **基本特性**
    • 2. **内部实现**
    • 3. **性能特点**
    • 4. **使用场景**
    • 5. **总结对比**
  • SynchronousQueue案例
    • JDK应用案例
    • 案例1:SynchronousQueue的简单用例
    • 案例2:SynchronousQueue公平锁、非公平锁案例
    • 案例3:搭配线程池使用
      • 线程池多线程+SynchronousQueue(公平无效)
      • 线程池多线程+SynchronousQueue(公平有效)
  • 资料获取

稿定智能设计202502032102

前言

博主介绍:✌目前全网粉丝3W+,csdn博客专家、Java领域优质创作者,博客之星、阿里云平台优质作者、专注于Java后端技术领域。

涵盖技术内容:Java后端、大数据、算法、分布式微服务、中间件、前端、运维等。

博主所有博客文件目录索引:博客目录索引(持续更新)

视频平台:b站-Coder长路

本章节配套源码:

  • gitee:https://gitee.com/changluJava/demo-exer/tree/master/JUC/src/main/java/demo11

认识SynchronousQueue

SynchronousQueue 是一个特殊的队列,它的核心特点是 不存储元素,每个插入操作必须等待一个对应的移除操作,反之亦然。这种特性使得它非常适合一些特定的应用场景。

这种机制非常适合处理大量短期异步任务的场景,例如 Web 服务器处理请求、短时计算任务等。

SynchronousQueueLinkedBlockingQueue 是 Java 并发包中两种不同的队列实现,它们的设计目标和使用场景有显著区别。以下是它们的主要区别。

关于SynchronousQueue的公平与非公平体现在多线程场景情况

  • 公平:多个线程进行put阻塞的话,按照谁先进行put会优先去执行任务。(底层是队列)
  • 非公平:多个线程进行put阻塞的话,并非是排队的,而是会以栈的结构先进后出。(底层是栈)

关于公平非公平源码解析:Java阻塞队列中的异类,SynchronousQueue底层实现原理剖析

BlockingQueue接口说明

BlockingQueue设计了很多的放数据和取数据的方法

操作抛出异常返回特定值阻塞阻塞一段时间
放数据addofferputoffer(e, time, unit)
取数据removepolltakepoll(time, unit)
查看数据(不删除)element()peek()不支持不支持

这几组方法的不同之处就是:

  1. 当队列满了,再往队列中放数据,add方法抛异常,offer方法返回false,put方法会一直阻塞(直到有其他线程从队列中取走数据),offer(e, time, unit)方法阻塞指定时间然后返回false。
  2. 当队列是空,再从队列中取数据,remove方法抛异常,poll方法返回null,take方法会一直阻塞(直到有其他线程往队列中放数据),poll(time, unit)方法阻塞指定时间然后返回null。
  3. 当队列是空,再去队列中查看数据(并不删除数据),element方法抛异常,peek方法返回null。

工作中使用最多的就是offer、poll阻塞指定时间的方法。


基本对比及比较

1. 基本特性

特性SynchronousQueueLinkedBlockingQueue
容量容量为 0,不存储元素。容量可配置(默认 Integer.MAX_VALUE),存储元素。
阻塞行为插入操作必须等待对应的移除操作,反之亦然。队列满时插入操作阻塞,队列空时移除操作阻塞。
适用场景直接传递任务,适合线程池任务调度。缓冲任务,适合生产者-消费者模式。

2. 内部实现

实现SynchronousQueueLinkedBlockingQueue
数据结构无存储结构,直接传递元素。基于链表实现,存储元素。
锁机制使用 CAS 或锁实现线程间的直接匹配。使用两把锁(putLocktakeLock)分离插入和移除操作。
公平性支持公平模式(FIFO)和非公平模式(默认)。默认非公平,但可以通过锁实现公平性。

3. 性能特点

性能SynchronousQueueLinkedBlockingQueue
吞吐量高吞吐量,适合直接传递任务的场景。吞吐量较低,因为需要维护队列结构。
延迟低延迟,任务直接传递给消费者。延迟较高,任务需要先入队再出队。
内存占用内存占用低,不存储元素。内存占用高,需要存储队列中的元素。

4. 使用场景

场景SynchronousQueueLinkedBlockingQueue
任务调度适合线程池任务调度(如 Executors.newCachedThreadPool)。适合需要缓冲任务的场景。
生产者-消费者适合生产者直接传递任务给消费者。适合生产者-消费者模式,任务需要缓冲。
流量控制无缓冲能力,严格限制生产者和消费者的同步。提供缓冲能力,适合流量控制。

5. 总结对比

对比项SynchronousQueueLinkedBlockingQueue
容量无容量,直接传递任务。有容量,可缓冲任务。
阻塞行为插入和移除操作必须成对出现。队列满时插入阻塞,队列空时移除阻塞。
性能高吞吐量,低延迟。吞吐量较低,延迟较高。
适用场景直接传递任务,适合线程池任务调度。缓冲任务,适合生产者-消费者模式。

选择使用哪种队列取决于具体的应用场景:

  • 如果需要直接传递任务且不需要缓冲,选择 SynchronousQueue
  • 如果需要缓冲任务并控制流量,选择 LinkedBlockingQueue

SynchronousQueue案例

如果你希望你的任务需要被快速处理,就可以使用这种队列。

JDK应用案例

Java线程池中的newCachedThreadPool(带缓存的线程池)底层就是使用SynchronousQueue实现的。

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

弊端:可能会出现oom问题。如果你提交了太多的任务,导致创建了大量的线程,这些线程都在竞争CPU时间片,等待CPU调度,处理任务速度也会变慢,所以在使用过程中也要综合考虑。


案例1:SynchronousQueue的简单用例

image-20250202153336290

package demo10.cpu;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;/*** @apiNote SynchronousQueue示例**/
public class SynchronousQueueDemo {public static void main(String[] args) throws InterruptedException {// 1. 创建SynchronousQueue队列BlockingQueue<Integer> synchronousQueue = new SynchronousQueue<>();// 2. 启动一个线程,往队列中放3个元素new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 入队列 1");synchronousQueue.put(1);Thread.sleep(1);System.out.println(Thread.currentThread().getName() + " 入队列 2");synchronousQueue.put(2);Thread.sleep(1);System.out.println(Thread.currentThread().getName() + " 入队列 3");synchronousQueue.put(3);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 3. 等待1000毫秒Thread.sleep(1000L);// 4. 再启动一个线程,从队列中取出3个元素new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());Thread.sleep(1);System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());Thread.sleep(1);System.out.println(Thread.currentThread().getName() + " 出队列 " + synchronousQueue.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();}}

image-20250202141622478

效果如下:第一个线程Thread-0往队列放入一个元素1后,就被阻塞了。直到第二个线程Thread-1从队列中取走元素1后,Thread-0才能继续放入第二个元素2。


案例2:SynchronousQueue公平锁、非公平锁案例

image-20250202153350415

为什么要多线程场景去测试?

  • 单线程你没法直接连续put多个,因为put和take操作是相对的,put了一个之后,只有take了另一个才能再进行put操作。
package demo11;import java.util.Arrays;
import java.util.concurrent.SynchronousQueue;/*** 公平 & 非公平 案例测试* 设置初始化的SynchronousQueue参数即可*/
public class SynchronousQueueFairAndNotFairDemo {public static void main(String[] args) throws InterruptedException {// 1. 创建SynchronousQueue队列,可设置是否公平SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>(false);// 放置三个元素 放置过程中各自等待500msfor (int i = 0; i < 10; i++) {// 2. 启动一个线程,往队列中放1个元素int finalI = i;new Thread(() -> {try {System.out.println(Thread.currentThread().getName() + "开始入队列" + finalI);synchronousQueue.put(finalI);
//                    System.out.println(Thread.currentThread().getName() + " 入队列" + finalI + "成功");} catch (InterruptedException e) {e.printStackTrace();}}).start();Thread.sleep(500);}// 3. 等待1000毫秒
//        Thread.sleep(1000L);// 取元素的时候各自间隔500msint[] arr = new int[10];for (int i = 0; i < 10; i++) {// 4. 启动一个线程,往队列中放1个元素int finalI = i;new Thread(() -> {try {arr[finalI] = synchronousQueue.take();System.out.println(Thread.currentThread().getName() + " 出队列 " + arr[finalI]);} catch (InterruptedException e) {e.printStackTrace();}}).start();Thread.sleep(500);}System.out.println(Arrays.toString(arr));// 7. 等待1000毫秒Thread.sleep(1000L);}
}

测试1:当设置非公平情况

SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>(false);

image-20250202153656442

测试2:当设置公平情况

SynchronousQueue<Integer> synchronousQueue = new SynchronousQueue<>(true);

image-20250202153735555


案例3:搭配线程池使用

线程池多线程+SynchronousQueue(公平无效)

image-20250202173128343

配合线程池多线程场景,SynchronousQueue设置为公平无效,测试源码如下:

package demo11;import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** 线程池多线程场景(核心数、最大线程数非1场景)* 无公平可言原因:线程池底层走的是offer操作,并非是put操作(可见案例2中的demo场景,可实现公平是走的put操作实现)* 适合场景:cpu密集型,在多线程场景通过线程池是无法实现按照submit的提交顺序去处理逻辑的(原因如上)。*/
public class SynchronousQueueFairAndNotFairPoolDemo1 {public static void main(String[] args) throws InterruptedException {int cpuCores = Runtime.getRuntime().availableProcessors();ThreadPoolExecutor executor = new ThreadPoolExecutor(cpuCores + 1, cpuCores + 1,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(true),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {// 阻塞等待executor.getQueue().put(r);} catch (InterruptedException var4) {throw new RejectedExecutionException("Unexpected InterruptedException", var4);}}});// 倒计数int jobNum = 100;CountDownLatch countDownLatch = new CountDownLatch(jobNum);final AtomicInteger count = new AtomicInteger(0);// 记录任务开始时间long startTime = System.currentTimeMillis();//设计提交任务new Thread(()->{// 可改为queue队列int i = 0;while (i < jobNum) {int finalI = i;// submitjob的线程(非主线程)会进入到阻塞当中(保证按照顺序来执行)// 线程池底层使用的是队列的offer、pollexecutor.submit(()->{System.out.println("CPU执行任务:" + finalI + ", 计数 =>" + count.incrementAndGet());countDownLatch.countDown();});i++;}}).start();System.out.println("main主线程开始干活");countDownLatch.await();executor.shutdown();System.out.println("任务全部完成");// 记录任务结束时间long endTime = System.currentTimeMillis();// 计算任务执行时间long duration = endTime - startTime;System.out.println("任务执行总耗时: " + duration + " 毫秒");}
}

image-20250202173220376


线程池多线程+SynchronousQueue(公平有效)

image-20250202173234582

package demo11;import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;/*** 线程池单线程场景(核心数、最大线程数1场景)* 可实现公平效果(无论是否设置公平参数)* 适合场景:任务按照submitjob去依次提交任务 & 去除线程池同样也可实现,可见下面处理方式*/
public class SynchronousQueueFairAndNotFairPoolDemo2 {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), // 填写false、true在核心、最大线程数为1 1情况下效果一致new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {executor.getQueue().put(r);} catch (InterruptedException var4) {throw new RejectedExecutionException("Unexpected InterruptedException", var4);}}});// 倒计数int jobNum = 100;CountDownLatch countDownLatch = new CountDownLatch(jobNum);final AtomicInteger count = new AtomicInteger(0);//设计提交任务new Thread(()->{// 可改为queue队列int i = 0;while (i < jobNum) {int finalI = i;// 处理方式一:submitjob的线程(非主线程)会进入到阻塞当中(保证按照顺序来执行)executor.submit(()->{System.out.println("CPU执行任务:" + finalI + ", 计数 =>" + count.incrementAndGet());countDownLatch.countDown();});// or 处理方式二:思考:是否如果原本就在新建线程中,是否无需使用线程池去submitjob提交?因为原本当前就是阻塞进行的
//                System.out.println("执行任务:" + finalI);i++;}}).start();// 7. 等待1000毫秒System.out.println("main主线程开始干活");countDownLatch.await();executor.shutdown();System.out.println("任务全部完成");}
}

效果:

image-20250202174350317

资料获取

大家点赞、收藏、关注、评论啦~

精彩专栏推荐订阅:在下方专栏👇🏻

  • 长路-文章目录汇总(算法、后端Java、前端、运维技术导航):博主所有博客导航索引汇总
  • 开源项目Studio-Vue—校园工作室管理系统(含前后台,SpringBoot+Vue):博主个人独立项目,包含详细部署上线视频,已开源
  • 学习与生活-专栏:可以了解博主的学习历程
  • 算法专栏:算法收录

更多博客与资料可查看👇🏻获取联系方式👇🏻,🍅文末获取开发资源及更多资源博客获取🍅


版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com