Kotlin协程应用的一些知识点
一、Kotlin协程与Java线程池的对比
说到这一点也许很多人就得出了结论了,Kotlin协程就是线程池,本质就是线程池,没什么大不了的,就是对线程池的封装。
额...这么说确实没错,但是不够全面,我觉得应该这么说:Kotlin协程基于Java线程池,但是高于Java线程池。协程的内部对线程池的操作有一些特有的优化策略。
关键点是尾部调用和任务偷取。
尾部调用:
场景:登录->获取用户信息->保存用户信息到DB
如果线程池的线程资源比较充裕,一个任务到来就分配到一个线程,上述的场景中明显是上下关联的依赖关系,需要先登录,再获取到用户信息,然后保存到数据库。
如果三个任务同时到来,分为ABC三个线程来分别执行,那么首先就需要执行A,BC线程就会被阻塞,从而导致资源的浪费。
而协程执行此类的场景就不会,它可以最大限度的保证一个连续的任务在同一个线程中执行,尽可能的节省线程资源。
核心代码在这里:kotlinx.coroutines.scheduling.CoroutineScheduler

内部的核心代码又调用到这里:kotlinx.coroutines.scheduling.WorkQueue

如果是添加到全局的任务队列中,也是一样的尾部调用

注:上面的全局队列与本地队列是 CoroutineScheduler 类与 Worker 中定义的,CoroutineScheduler中有两个全局队列,Worker中一个本地队列。
协程线程池会优先复用已有的线程任务,如果有就会把任务加到已有的work任务的本地队列里。否则会重新唤起或者创建线程。
这就是尾部调用机制,也被大家简称为尾调。
任务偷取:
上面的队列执行的方式定义与添加之后就可以使用队列while循环执行了。
代码如下:kotlinx.coroutines.scheduling.CoroutineScheduler.Worker
runWorker()

这些代码都容易看,就是遍历找任务执行的逻辑。关键就是内部findTask的骚操作:

看方法名就知道了,尝试偷取任务,这,协程你是真的骚。

逻辑为:如果自己的本地队列没任务了,自己对应的全局队列也没有任务了,去其他的Work里面找任务执行。
这就是任务偷取机制。这也是Kotlin协程与Java线程池有所区别的最大两个点。
二、Kotlin协程如何去除回调
为什么讲到这个,是因为真的很多人并不知道这一点,把Kotlin写出了Java的感觉,在协程里面还到处搞一些回调/高阶函数之类的,破坏了协程的作用域。
协程的优雅之处就是把异步调用变成类似同步的效果,去除了回调的逻辑,看起来也更容易理解,也不会破坏协程的作用域。
suspendCoroutine:
例如我们写一个接口回调:
interface OnSingleMethodCallback {
fun onValueCallback(value: String)
}
使用回调:
fun runMethodTask(callback: OnSingleMethodCallback) {
Thread {
Thread.sleep(1000)
callback.onValueCallback("abc")
}.start()
}
fun runMethodTask(){
runMethodTask(object : OnSingleMethodCallback {
override fun onValueCallback(value: String) {
YYLogUtils.w("value:$value")
}
})
}
如果我们使用 suspendCoroutine:
suspend fun runMethodTaskWithSuspend(): String {
return suspendCoroutine { continuation ->
runMethodTask(object : OnSingleMethodCallback {
override fun onValueCallback(value: String) {
continuation.resume(value)
}
})
}
}
但是我们也需要注意,这是协程的写法,方法也标明了 suspend ,所以只能在协程中使用。
如果是一个网络请求有成功与失败的回调,那么我们也能使用 suspendCancellableCoroutine 来达到效果:
interface ICallBack {
fun onSuccess(data: String)
fun onFailure(t: Throwable)
}
private fun request(callback: ICallBack) {
thread {
try {
callback.onSuccess("success")
} catch (e: Exception) {
callback.onFailure(e)
}
}
}
private fun requestDefault() {
request(object : ICallBack {
override fun onSuccess(data: String) {
// ...
}
override fun onFailure(t: Throwable) {
// ...
}
})
}
如果使用 suspendCancellableCoroutine 的话就变成这样:
private suspend fun requestWithSuspend(): String {
return suspendCancellableCoroutine { cancellableContinuation->
request(object : ICallBack {
override fun onSuccess(data: String) {
cancellableContinuation.resume(data)
}
override fun onFailure(t: Throwable) {
cancellableContinuation.resumeWithException(t)
}
})
}
}
同样需要注意是,这是协程的写法,方法也标明了 suspend ,所以只能在协程中使用。
其实为什么Retrofit的请求看似把异步的网络请求用成了同步一样,Retrofit的内部也是同样的处理。
Retrofit最终的处理逻辑在此:KotlinExtensions.awaitResponse

所以我们照着Retrofit学就行了。
三、Kotlin协程分发器
有没有同学全部用 http://Dispatchers.IO 切换线程调度的。
http://Dispatchers.IO / Dispatchers.Default 的异同:
两者都是协程分发器,http://Dispatchers.IO 侧重于任务本身是阻塞型的,比如文件、数据库、网络等操作等。并不那么占用CPU
而Dispatchers.Default 则偏向那些可能会长时间占用CPU的任务。比如人脸特征提取,图片压缩处理,视频的合成等。
他们的线程池的实现也是不同的
协程线程池在设计的时候,针对两者在线程的调度策略上有所不同。
所有任务分成纯CPU任务和非纯CPU任务两种,对应着核心线程和非核心线程。
入队的逻辑是 http://Dispatchers.IO 的任务放入 globalBlockingQueue 队列,而 Dispatchers.Default 的任务放入的是 globalBlockingQueue 队列。
所有线程在执行前都先尝试成为核心线程,核心线程可以从两种任务中任意选择执行,非核心线程只能执行非纯CPU任务。核心线程如果选择执行非纯CPU任务会变成非核心线程。
所以真的有人从来没用过 Dispatchers.Default 吗?
四、使用协程有什么好处?怎么用?
看到过网上的一些Java线程池比协程线程池执行逻辑更快的文章,其实意义不大,协程最大的优势是会更加的方便,可以很方便的把一些碎片化的方法加入协程,同时它可以去掉回调地狱还能更加方便的实现并发与排队执行的效果。
比如这样的一个场景,在主线程计算薪水,我们根据时薪与工作时长计算总共的薪水,内部有复杂的判断,是否是签约员工,是否迟到了,迟到了扣钱,扣除五险一金,连续工作的奖励,推荐的奖励,顾客打赏,等等一系列的复杂逻辑,我们就可以随意加入协程中。
private fun calculateSalary(): String {
// 省略100行代码
return "1000"
}
private suspend fun calculateSalary2() = withContext(Dispatchers.Default) {
// 省略100行代码
"2000"
}
private suspend fun calculateSalary3() = coroutineScope {
// 省略100行代码
"3000"
}
下面看看代码的优化:
class CalculateFaceUtil private constructor() : CoroutineScope by MainScope() {
//... 单例
/**
* 计算并找到最匹对的人脸信息
*
* 使用协程异步的并发的双端遍历查询最大值
*/
fun getTopFace(
list: List<FaceRegisterInfo>,
faceEngine: FaceEngine,
faceFeature: FaceFeature,
action: (similar: Float, index: Int) -> Unit
) {
// 、、、其他逻辑
val middlePosition = list.size / 2
launch(Dispatchers.IO) {
val topface1 = async {
val tempFaceFeature = FaceFeature()
val faceSimilar = FaceSimilar()
var maxSimilar = 0f
var maxSimilarIndex = -1
for (i in 0 until middlePosition) {
tempFaceFeature.featureData = list[i].featureData
//调用SDK比对两个 FaceFeature 人脸特征,返回相似度
faceEngine.compareFaceFeature(faceFeature, tempFaceFeature, faceSimilar)
//拿到相似度的对象,获取得分(每一次都会全部遍历,如果有相同的图片还是会取到最后的)
if (faceSimilar.score > maxSimilar) {
maxSimilar = faceSimilar.score
maxSimilarIndex = i
}
}
TopFace(maxSimilar, maxSimilarIndex)
}
val topface2 = async {
val tempFaceFeature = FaceFeature()
val faceSimilar = FaceSimilar()
var maxSimilar = 0f
var maxSimilarIndex = -1
for (i in middlePosition until list.size) {
tempFaceFeature.featureData = list[i].featureData
//调用SDK比对两个 FaceFeature 人脸特征,返回相似度
faceEngine.compareFaceFeature(faceFeature, tempFaceFeature, faceSimilar)
//拿到相似度的对象,获取得分(每一次都会全部遍历,如果有相同的图片还是会取到最后的)
if (faceSimilar.score > maxSimilar) {
maxSimilar = faceSimilar.score
maxSimilarIndex = i
}
}
TopFace(maxSimilar, maxSimilarIndex)
}
//并发查找并找到最大值
val face1 = topface1.await()
val face2 = topface2.await()
if (face1 != null && face2 != null) {
//回调到主线程
withContext(Dispatchers.Main) {
//优先返回后面的数据
if (face2.similar > face1.similar) {
action(face2.similar, face2.index)
} else {
action(face1.similar, face1.index)
}
}
}
}
}
}
场景:ViewModel中调用这个工具类,查找较大集合中最匹配的人脸,使用头尾双端遍历找到最大值。
我们以这一个使用场景为例,逻辑没问题,但是协程的使用有优化的空间。
1.计算最好使用 Dispatchers.Default , 这是小问题。 2.viewModel中viewModelScope协程作用域中调用全局的协程作用域,这...感觉不太好,推荐使用下面的方式,继承父布局的协程作用域。
suspend fun getTopFace() = coroutineScope {
async(Dispatchers.Default) {
}
async(Dispatchers.Default) {
}
// 。。。
}
3.使用高阶函数回调,如果是协程中最好是可以铺平回调
suspend fun getTopFace(
list: List<FaceRegisterInfo>,
faceEngine: FaceEngine,
faceFeature: FaceFeature,
): TopFace = suspendCoroutine { continuation ->
async(Dispatchers.Default) {
}
async(Dispatchers.Default) {
}
// 。。。
continuation.resume(TopFace(maxSimilar, maxSimilarIndex))
}
之前的viewModel中使用:
private fun searchFace(
frFace: FaceFeature, requestId: Int,
orignData: ByteArray?, faceInfo: FaceInfo?, width: Int, height: Int
) {
viewModelScope.launch {
//通过FaceServer找到最匹配的人脸
CalculateFaceUtil.getInstance().getTopFace(frFace) { compareResult ->
// 。。。逻辑
}
}
}
在回调里面写逻辑,后面的逻辑就破坏了协程作用域,那么又要使用工具类开启一个新的协程,这样就很不好。
现在的viewModel中使用:
private fun searchFace(
frFace: FaceFeature, requestId: Int,
orignData: ByteArray?, faceInfo: FaceInfo?, width: Int, height: Int
) {
viewModelScope.launch {
//通过FaceServer找到最匹配的人脸
val topFace = CalculateFaceUtil.getInstance().getTopFace(frFace)
// 。。。逻辑
withContext(Dispatchers.IO){
//上传到网络逻辑
}
}
}
修改之后就可以直接在一个作用域中切换线程的调度。
还有一个比较典型的例子就是网络请求用的很多的协程处理类,很多人喜欢把网络请求的结果再封装一层,指定成功或失败。例如:
sealed class OkResult<out T : Any> {
data class Success<out T : Any>(val data: T) : OkResult<T>()
data class Error(val exception: Exception) : OkResult<Nothing>()
//检测成功与失败
fun checkResult(success: (T) -> Unit, error: (String?) -> Unit) {
if (this is Success) {
success(data)
} else if (this is Error) {
error(exception.message)
}
}
//只是检测成功
fun checkSuccess(success: (T) -> Unit) {
if (this is Success) {
success(data)
}
}
}
由于也是使用高阶函数回调的,那么就会遇到同样的问题。
viewModelScope.launch {
val result = mSyncRepository.syncAttendance(attendance, token)
result.checkSuccess {
//数据库操作
AttendanceDBHelper.updateAttendance(attendance)
}
}
场景是网络请求提交考勤数据,然后保存到数据库里,那么这样的方法数据库的操作就只能在主线程了,不能切换线程了,所以推荐使用去除回调的方式:
//检查并返回是成功还是失败(在协程中使用直接返回铺平回调)
suspend fun isSuccess(): Boolean {
return suspendCoroutine { continuation ->
continuation.resume(this is Success)
}
}
修改之后
val result = mSyncRepository.syncAttendance(attendance, token)
if(result.isSuccess()){
//数据库操作
withContext(Dispatchers.IO){
AttendanceDBHelper.updateAttendance(attendance)
}
}
这样修改之后相对就会好一些。
总结
Java线程池与Kotlin协程线程池的总结:
Java线程池:核心线程+队列+其他线程
首先使用核心线程执行任务,一旦核心线程满了,就把任务加到队列中,内部根据不同的调度实现来判断是否开启其他线程来执行队列的任务。
协程线程池:全局队列+本地队列
先尝试添加到本地队列(尾部调用机制),再添加到全局队列,协程线程池从队列中找任务(任务偷取机制)执行,PS:内部又一系列的CUP任务与非CUP任务的转换逻辑
Java线程池与Kotlin协程线程池的区别:
Java线程池比较开发,可以选择系统不同的线程策略,也可以自定义线程池,不同的组合可以实现不同的效果,没有区分任务是否阻塞的属性。
协程的线程池是专供协程使用,没有那么开放,内部的任务区分是否阻塞的属性,会放到不同的队列中,CoroutineScheduler类中的两个全局队列 globalCpuQueue(存非阻塞的任务) , globalBlockingQueue(存阻塞的任务)。感觉调度会更加的合理。
suspendCoroutine 与 http://Dispatchers.IO 的总结:
在协程中善用去除回调的方式,尽量把异步的逻辑同步化,不破坏协程的作用域,同时善用线程调度器,区分CUP任务与非CUP任务。最大化的优化线程池效率。