Synchronized.kt 的源码:
/*** Executes the given function [block] while holding the monitor of the given object [lock].*/
public inline fun <R> synchronized(lock: Any, block: () -> R): R {contract {callsInPlace(block, InvocationKind.EXACTLY_ONCE)}@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")monitorEnter(lock)try {return block()}finally {@Suppress("NON_PUBLIC_CALL_FROM_PUBLIC_INLINE", "INVISIBLE_MEMBER")monitorExit(lock)}
JvmFlagAnnotations.kt 的源码:
/*** Marks the JVM backing field of the annotated property as `volatile`, meaning that writes to this field* are immediately made visible to other threads.*/
public actual annotation class Volatile/*** Marks the JVM backing field of the annotated property as `transient`, meaning that it is not* part of the default serialized form of the object.*/
public actual annotation class Transient/*** Marks the JVM method generated from the annotated function as `strictfp`, meaning that the precision* of floating point operations performed inside the method needs to be restricted in order to* achieve better portability.*/
public actual annotation class Strictfp/*** Marks the JVM method generated from the annotated function as `synchronized`, meaning that the method* will be protected from concurrent execution by multiple threads by the monitor of the instance (or,* for static methods, the class) on which the method is defined.*/
public actual annotation class Synchronized
如何使用 Synchronized 同步锁:
在Java中,给一个方法加锁 ,需要给方法加 synchronized 关键字
public synchronized void doSomething() {}
kotlin 中没有 synchronized 关键之,取而代之的是 @Synchronized 注解
class MyUtil {@Synchronizedfun doSomething() {}
Kotlin 在方法内,可以使用 block 块
class Util {val lock = Any()fun main() {synchronized(lock) {}}
Volatile 关键字
在 kotlin 中没有 volatile
关键字,但是有 @Volatile
class Util {@Volatilevar lock = Any()
在 kotlin 中的 Any 和 java 中的 Object 相似,每一个类都是从 Any 继承过来的,但是 Any 并没有声明 wait() , notify() 和 notifyAll() 方法,这就意味着,你不能在kotlin类中调用这些方法。但是你仍然能够使用java.lang.Object的实例作为lock,并且调用相关的方法。下面将会展示一个使用 Object 做为 lock 解决生产者和消费者的问题。
private val lock = Object()fun produce() = synchronized(lock) {while(items>=maxItems) {lock.wait()}Thread.sleep(Random.nextInt(100).toLong())items++println("Produced, count is$items:${Thread.currentThread()}")lock.notifyAll()}fun consume() = synchronized(lock) {while(items<=0) {lock.wait()}Thread.sleep(Random.nextInt(100).toLong())items--println("Consumed, count is$items:${Thread.currentThread()}")lock.notifyAll()}
- volatile变量具有可见性
- volatile不保证原子性
- volatile保证有序性
class TestVolatile1 {@Volatilevar stop = false;fun onInit(){if(stop){}}fun onStop(){stop = true;}}
场景2: 双重检查模式(DCL)
在 Java 中,我们的单例模式经常会这样写,第一次判空是为了不必要的同步操作,第二次判断是只有在MyLock实例==null的时候才会去new一个实例出来,当多线程调用时,当进行这两次判空时,我们需要保证instance的可见性。
public class MyLock {private static volatile MyLock instance = null;public static MyLock getInstance() {if(instance == null){synchronized (MyLock.class){if(instance == null){instance = new MyLock();}}}return instance;}}
在 Kotlin 中实现多线程同步的方式
“ 现有 Task1、Task2 等多个并行任务,如何等待全部执行完成后,执行 Task3。”
在 Kotlin 中我们有多种实现方式:
- Thread.join
- Synchronized
- ReentrantLock
- BlockingQueue
- CountDownLatch
- CyclicBarrier
- Future
- CompletableFuture
- Rxjava
- Coroutine
- Flow
我们先定义三个Task,模拟上述场景, Task3 基于 Task1、Task2 返回的结果拼接字符串,每个 Task 通过 sleep 模拟耗时:
val task1: () -> String = {sleep(2000)"Hello".also { println("task1 finished: $it") }
}val task2: () -> String = {sleep(2000)"World".also { println("task2 finished: $it") }
}val task3: (String, String) -> String = { p1, p2 ->sleep(2000)"$p1 $p2".also { println("task3 finished: $it") }
1. Thread.join()
Kotlin 兼容 Java,Java 的所有线程工具默认都可以使用。其中最简单的线程同步方式就是使用 Thread 的 join() :
fun test_join() {lateinit var s1: Stringlateinit var s2: Stringval t1 = Thread { s1 = task1() }val t2 = Thread { s2 = task2() }t1.start()t2.start()t1.join()t2.join()task3(s1, s2)
2. Synchronized
使用 synchronized 锁进行同步
@Testfun test_synchrnoized() {lateinit var s1: StringThread {synchronized(Unit) {s1 = task1()}}.start()val s2: String = task2()synchronized(Unit) {task3(s1, s2)}}
但是如果超过三个任务,使用 synchrnoized 这种写法就比较别扭了,为了同步多个并行任务的结果需要声明n个锁,并嵌套n个 synchronized。
3. ReentrantLock
ReentrantLock 是 JUC 提供的线程锁,可以替换 synchronized 的使用
@Testfun test_ReentrantLock() {lateinit var s1: Stringval lock = ReentrantLock()Thread {lock.lock()s1 = task1()lock.unlock()}.start()val s2: String = task2()lock.lock()task3(s1, s2)lock.unlock()}
ReentrantLock 的好处是,当有多个并行任务时是不会出现嵌套 synchrnoized 的问题,但仍然需要创建多个 lock 管理不同的任务。
4. BlockingQueue
阻塞队列内部也是通过 Lock 实现的,所以也可以达到同步锁的效果
@Testfun test_blockingQueue() {lateinit var s1: Stringval queue = SynchronousQueue<Unit>()Thread {s1 = task1()queue.put(Unit)}.start()val s2: String = task2()queue.take()task3(s1, s2)}
5. CountDownLatch
UC 中的锁大都基于 AQS 实现的,可以分为独享锁和共享锁。ReentrantLock 就是一种独享锁。相比之下,共享锁更适合本场景。 例如 CountDownLatch,它可以让一个线程一直处于阻塞状态,直到其他线程的执行全部完成:
@Testfun test_countdownlatch() {lateinit var s1: Stringlateinit var s2: Stringval cd = CountDownLatch(2)Thread() {s1 = task1()cd.countDown()}.start()Thread() {s2 = task2()cd.countDown()}.start()cd.await()task3(s1, s2)}
6. CyclicBarrier
CyclicBarrier 是 JUC 提供的另一种共享锁机制,它可以让一组线程到达一个同步点后再一起继续运行,其中任意一个线程未达到同步点,其他已到达的线程均会被阻塞。
与 CountDownLatch 的区别在于 CountDownLatch 是一次性的,而 CyclicBarrier 可以被重置后重复使用,这也正是 Cyclic 的命名由来,可以循环使用。
@Testfun test_CyclicBarrier() {lateinit var s1: Stringlateinit var s2: Stringval cb = CyclicBarrier(3)Thread {s1 = task1()cb.await()}.start()Thread() {s2 = task1()cb.await()}.start()cb.await()task3(s1, s2)}
7. CAS
AQS 内部通过自旋锁实现同步,自旋锁的本质是利用 CompareAndSwap 避免线程阻塞的开销。 因此,我们可以使用基于 CAS 的原子类计数,达到实现无锁操作的目的。
@Testfun test_cas() {lateinit var s1: Stringlateinit var s2: Stringval cas = AtomicInteger(2)Thread {s1 = task1()cas.getAndDecrement()}.start()Thread {s2 = task2()cas.getAndDecrement()}.start()while (cas.get() != 0) {}task3(s1, s2)}
While 循环空转看起来有些浪费资源,但是自旋锁的本质就是这样,所以 CAS 仅仅适用于一些cpu密集型的短任务同步。
8. Future
上面无论有锁操作还是无锁操作,都需要定义两个变量s1、s2记录结果非常不方便。 Java 1.5 开始,提供了 Callable 和 Future ,可以在任务执行结束时返回结果。
@Testfun test_future() {val future1 = FutureTask(Callable(task1))val future2 = FutureTask(Callable(task2))Executors.newCachedThreadPool().execute(future1)Executors.newCachedThreadPool().execute(future2)task3(future1.get(), future2.get())}
通过 future.get(),可以同步等待结果返回,写起来非常方便。
9. CompletableFuture
future.get() 虽然方便,但是会阻塞线程。 Java 8 中引入了 CompletableFuture ,他实现了 Future 接口的同时实现了 CompletionStage 接口。 CompletableFuture 可以针对多个 CompletionStage 进行逻辑组合、实现复杂的异步编程。 这些逻辑组合的方法以回调的形式避免了线程阻塞:
@Testfun test_CompletableFuture() {CompletableFuture.supplyAsync(task1).thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->task3(p1, p2)}.join()}
10. RxJava
RxJava 提供的各种操作符以及线程切换能力同样可以帮助我们实现需求: zip 操作符可以组合两个 Observable 的结果;subscribeOn 用来启动异步任务
@Testfun test_Rxjava() {,Observable.fromCallable(Callable(task2)).subscribeOn(Schedulers.newThread()),BiFunction(task3)).test().await()}
11. Coroutine
前面那么多方式,其实都是 Java 的工具。 Coroutine 终于算得上是 Kotlin 特有的工具了:
@Testfun test_coroutine() {runBlocking {val c1 = async(Dispatchers.IO) {task1()}val c2 = async(Dispatchers.IO) {task2()}task3(c1.await(), c2.await())}}
12. Flow
Flow 就是 Coroutine 版的 RxJava,具备很多 RxJava 的操作符,例如 zip:
@Testfun test_flow() {val flow1 = flow<String> { emit(task1()) }val flow2 = flow<String> { emit(task2()) }runBlocking { { t1, t2 ->task3(t1, t2)}.flowOn(Dispatchers.IO).collect()}}
FlowOn 使得 Task 在异步计算并发射结果。
作为结论,在 Kotlin 上最好用的线程同步方案首推协程。