开发者

Kotlin 协程之Channel的概念和基本使用详解

开发者 https://www.devze.com 2025-09-24 12:06 出处:网络 作者: XeonYu
目录前言launch / async 适合的场景Channel 的概念和基本使用概念Channel 的创建参数概览capacity(容量配置)onBufferOverflow(溢出策略)onUndeliveredElement(未送达回调)参数组合效果CapacityRENDEZVOUS(会合
目录
  • 前言
    • launch / async 适合的场景
  • Channel 的概念和基本使用
    • 概念
    • Channel 的创建
    • 参数概览
      • capacity(容量配置)
      • onBufferOverflow(溢出策略)
      • onUndeliveredElement(未送达回调)
    • 参数组合效果
    • Capacity
      • RENDEZVOUS(会合模式)
        • CONFLATED(只留最新值)
          • UNLIMITED(无限容量)
            • BUFFERED(有限容量)
              • 自定义容量
              • BufferOverflow 策略详解
                • SUSPEND(默认策略)
                  • DROP_LATEST
                    • DROP_OLDEST
                    • onUndeliveredElement 回调
                      • Channel 操作方式
                        • 阻塞操作(send/receive)
                          • 非阻塞操作(trySend/tryReceive)
                            • 操作对比
                              • 返回值类型
                              • Channel 状态管理
                                • API
                                  • Close(关闭操作)
                                    • Cancel(取消操作)
                                    • Channel 异常处理
                                      • ClosedSendChannelException
                                        • ClosedReceiveChannelException
                                          • CancellationException
                                            • 异常与状态关系
                                              • 异常处理技巧
                                                • 使用非阻塞操作避免异常
                                                • 健壮的异常处理
                                            • 总结
                                              • Channel 关键概念对比
                                                • 溢出策略对比
                                                  • 操作方式
                                                    • Channel 状态生命周期

                                                    前言

                                                    在 专栏 之前的文章中,我们已经知道了协程的启动、挂起、取消、异常以及常用的协程作用域等基础应用。

                                                    这些基础应用适合的场景是一次性任务,执行完就结束了的场景。

                                                    launch / async 适合的场景

                                                    • 网络请求
                                                    • 数据库查询
                                                    • 文件读写
                                                    • 并行计算任务
                                                    • 等等

                                                    而对于一些相对复杂的场景,例如:持续的数据流、需要在不同的协程之间传递数据、需要顺序或背压控制等场景,基础的 launch / async

                                                    就不够用了。

                                                    例如:

                                                    • 用户点击、输入等事件流的处理
                                                    • 生产者-消费者模型的需求:任务排队、日志流
                                                    • 高频数据源处理(相机帧、音频流等)

                                                    类似这种持续的、需要顺序控制、或者多个协程配合执行的场景,就需要用到 Channel 了。

                                                    Channel 的概念和基本使用

                                                    概念

                                                    顾名思义,Channel 有管道、通道的意思。Channel 跟 Java 中的 blockingQueue 很相似,区别在于 Channel 是挂起的,不是阻塞的。

                                                    Channel 的核心特点就是能够在不同的协程之间进行数据传递,并且能够控制数据传递的顺序。

                                                    使用起来很简单,基本就分为以下几步:

                                                    1. 创建 Channel
                                                    2. 通过 channel.send 发送数据
                                                    3. 通过 channel.receive 接收数据

                                                    整体的概念也比较简单形象,就是一根管道,一个口子发送数据,一个口子接收数据。

                                                    Channel 的创建

                                                    先来看下 Channel 的源码,可以看到会根据传入的参数选择不同的实现。

                                                    public fun <E> Channel(
                                                        capacity: Int = RENDEZVOUS,
                                                        onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
                                                        onUndeliveredElement: ((E) -> Unit)? = null
                                                    ): Channel<E> =
                                                        when (capacity) {
                                                            RENDEZVOUS -> {
                                                                if (onBufferOverflow == BufferOverflow.SUSPEND)
                                                                    BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel
                                                                else
                                                                    ConflatedBufferedChannel(
                                                                        1,
                                                                        onBufferOverflow,
                                                                        onUndeliveredElement
                                                                    ) // support buffer overflow witpythonh buffered channel
                                                            }
                                                            CONFLATED -> {
                                                                require(onBufferOverflow == BufferOverflow.SUSPEND) {
                                                                    "CONFLATED capacity cannot be used with non-default onBufferOverflow"
                                                                }
                                                                ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
                                                            }
                                                            UNLIMITED -> BufferedChannel(
                                                                UNLIMITED,
                                                                onUndeliveredElement
                                                            ) // ignores onBufferOverflow: it has buffer, but it never overflows
                                                            BUFFERED -> { // uses default capacity with SUSPEND
                                                                if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(
                                                                    CHANNEL_DEFAULT_CAPACITY,
                                                                    onUndeliveredElement
                                                                )
                                                                else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
                                                            }
                                                            else -> {
                                                                if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement)
                                                                else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
                                                            }
                                                        }

                                                    参数概览

                                                    参数类型默认值描述
                                                    capacityIntRENDEZVOUS通道容量,决定缓冲区大小和行为模式
                                                    onBufferOverflowBufferOverflowSUSPEND缓冲区溢出时的处理策略
                                                    onUndeliveredElement((E) -> Unit)?null元素未能送达时的回调函数

                                                    capacity(容量配置)

                                                    capacity 参数决定了 Channel 的缓冲行为和容量大小:

                                                    • RENDEZVOUS(值为 0):无缓冲,发送者和接收者必须同时准备好
                                                    • CONFLATED(值为 -1):只保留最新的元素,旧元素会被覆盖
                                                    • UNLIMITED(值为 Int.MAX_VALUE):理论上就是无限容量,永不阻塞发送
                                                    • BUFFERED(值为 64):默认缓冲大小
                                                    • 自定义正整数:自己指定具体的缓冲区大小

                                                    onBufferOverflow(溢出策略)

                                                    当缓冲区满时的处理策略:

                                                    • SUSPEND:挂起发送操作,等待缓冲区有空间(默认)
                                                    • DROP_OLDEST:丢弃旧的元素,添加新元素
                                                    • DROP_LATEST:丢弃新元素,保留缓冲区中的现有元素

                                                    onUndeliveredElement(未送达回调)

                                                    当元素无法送达时的清理回调函数:

                                                    • null:不执行任何清理操作(默认)
                                                    • 自定义函数:用于资源清理、日志记录等,根据业务需求来定义

                                                    参数组合效果

                                                    capacityonBufferOverflow行为适用场景
                                                    RENDEZVOUSSUSPEND无缓冲,同步通信严格的生产者-消费者同步
                                                    BUFFEREDSUSPEND有限缓冲,满时挂起一般的异步处理,默认的缓冲数量是 64
                                                    UNLIMITEDSUSPEND缓冲长度为 Int.MAX_VALUE高吞吐量场景(生产上不建议使用,有内存方面的风险)
                                                    CONFLATEDDROP_OLDEST无缓冲,只保留最新值状态更新、实时数据
                                                    自定义大小SUSPEND固定大小,满时挂起批量处理、批量任务
                                                    自定义大小DROP_OLDEST固定大小,丢弃旧数据获取最近 N 个元素
                                                    自定义大小DROP_LATEST固定大小,拒绝新数据保护重要历史数据

                                                    Capacity

                                                    RENDEZVOUS(会合模式)

                                                    特点:

                                                    • 容量为 0,无缓冲区
                                                    • 发送者和接收者必须同时准备好才能完成数据传输
                                                    • 提供强同步保证,一手交钱一手交货

                                                    使用示例:

                                                    suspend fun demonstrateRendezvousChannel() {
                                                        // 创建 RENDEZVOUS Channel(默认容量为 0),默认什么都不传就是 rendezvous 模式,Channel<String>()
                                                        val rendezvousChannel = Channel<String>(Channel.RENDEZVOUS)
                                                        // 启动发送者协程
                                                        val senderJob = GlobalScope.launch {
                                                            println("[发送者] 准备发送消息...")
                                                            rendezvousChannel.send("Hello from RENDEZVOUS!")
                                                            println("[发送者] 消息已发送")
                                                            rendezvousChannel.send("Second message")
                                                            println("[发送者] 第二条消息已发送")
                                                            rendezvousChannel.close()
                                                        }
                                                        // 启动接收者协程
                                                        val receiverJob = GlobalScope.launch {
                                                            delay(1000) // 延迟1秒,发送者会等待接收者准备好
                                                            println("[接收者] 开始接收消息...")
                                                            for (message in rendezvousChannel) {
                                                                println("[接收者] 收到消息: $message")
                                                                delay(500) // 模拟处理时间
                                                            }
                                                            println("[接收者] Channel已关闭")
                                                        }
                                                        // 等待所有协程完成
                                                        joinAll(senderJob, receiverJob)
                                                    }

                                                    执行结果

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    CONFLATED(只留最新值)

                                                    特点:

                                                    • 容量为 1,但会丢弃旧值
                                                    • 只保留最新的元素
                                                    • 发送操作永不阻塞
                                                    • 只能使用 BufferOverflow.SUSPEND 策略

                                                    源码分析:

                                                    CONFLATED -> {
                                                        require(onBufferOverflow == BufferOverflow.SUSPEND) {
                                                            "CONFLATED capacity cannot be used with non-default onBufferOverflow"
                                                        }
                                                        ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
                                                    }

                                                    使用示例:

                                                    suspend fun demonstrateConflatedChannel() {
                                                        // 创建 CONFLATED Channel,相当于:Channel<String>(1, BufferOverflow.DROP_OLDEST)
                                                        val conflatedChannel = Channel<String>(Channel.CONFLATED)
                                                        // 快速发送多个消息
                                                        val senderJob = GlobalScope.launch {
                                                            repeat(5) { i ->
                                                                val message = "Update-$i"
                                                                conflatedChannel.send(message)
                                                                println("[发送者] 发送更新: $message")
                                                                delay(100) // 短暂延迟
                                                            }
                                                            conflatedChannel.close()
                                                        }
                                                        // 慢速接收者
                                                        val receiverJob = GlobalScope.launch {
                                                            delay(1000) // 延迟1秒,让发送者发送完所有消息
                                                            println("[接收者] 开始接收(只会收到最新的值)...")
                                                            for (message in conflatedChannel) {
                                                                println("[接收者] 收到: $message")
                                                            }
                                                        }
                                                        joinAll(senderJob, receiverJob)
                                                    }

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    UNLIMITED(无限容量)

                                                    特点:

                                                    • 容量为 Int.MAX_VALUE,理论上无限容量
                                                    • 发送操作永不阻塞,但要注意内存使用
                                                    • 忽略 onBufferOverflow 参数
                                                    • 适用于高吞吐量场景,但生产环境需谨慎使用
                                                    suspend fun demonstrateUnlimitedChannel() {
                                                        val unlimitedChannel = Channel<String>(Channel.UNLIMITED)
                                                        val senderJob = GlobalScope.launch {
                                                            repeat(10) { i ->
                                                                val message = "Message-$i"
                                                                unlimitedChannel.send(message)
                                                                println("[发送者] 立即发送: $message")
                                                            }
                                                            unlimitedChannel.close()
                                                            println("[发送者] 所有消息已发送,Channel已关闭")
                                                        }
                                                        val receiverJob = GlobalScope.launch {
                                                            delay(1000) // 延迟1秒开始接收
                                                            println("[接收者] 开始慢速接收...")
                                                            for (message in unlimitedChannel) {
                                                                println("[接收者] 处理: $message")
                                                                delay(300) // 模拟处理时间
                                                            }
                                                        }
                                                        joinAll(senderJob, receiverJob)
                                                    }

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    BUFFERED(有限容量)

                                                    特点:

                                                    • 使用默认容量 (CHANNEL_DEFAULT_CAPACITY,通常为 64)
                                                    • 在缓冲区满时根据 onBufferOverflow 策略处理

                                                    源码分析:

                                                    BUFFERED -> {
                                                        if (onBufferOverflow == BufferOverflow.SUSPEND)
                                                            BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement)
                                                        else
                                                            ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
                                                    }

                                                    使用示例:

                                                    suspend fun demonstrateBufferedDefaultChannel() {
                                                        // 创建 BUFFERED Channel(默认容量为 64)
                                                        val bufferedChannel = Channel<String>(Channel.BUFFERED)
                                                        val senderJob = GlobalScope.launch {
                                                            repeat(100) { i ->
                                                                bufferedChannel.send("Message-$i")
                                                                println("[发送者] 发送 Message-$i")
                                                            }
                                                            bufferedChannel.close()
                                                        }
                                                        val receiverJob = GlobalScope.launch {
                                                            delay(1000) // 延迟接收
                                                            for (message in bufferedChannel) {
                                                                println("[接收者] 收到: $message")
                                                                delay(50)
                                                            }
                                                        }
                                                        joinAll(senderJob, receiverJob)
                                                    }

                                                    与下面自定义容量效果类似。

                                                    自定义容量

                                                    特点:

                                                    • 指定具体的缓冲区大小
                                                    • 根据 onBufferOverflow 策略处理溢出

                                                    源码分析:

                                                    else -> {
                                                        if (onBufferOverflow === BufferOverflow.SUSPEND)
                                                            BufferedChannel(capacity, onUndeliveredElement)
                                                        else
                                                            ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
                                                    }

                                                    使用示例:

                                                    suspend fun demonstrateBufferedChannel() {
                                                        // 创建容量为3的缓冲Channel
                                                        val bufferedChannel = Channel<Int>(capacity = 3)
                                                        // 启动发送者协程
                                                        val senderJob = GlobalScope.launch {
                                                            repeat(5) { i ->
                                                                println("[发送者] 发送数字: $i")
                                                                bufferedChannel.send(i)
                                                                println("[发送者] 数字 $i 已发送")
                                                            }
                                                            buffphperedChannel.close()
                                                            println("[发送者] Channel已关闭")
                                                        }
                                                        // 启动接收者协程,延迟接收以观察缓冲效果
                                                        val receiverJob = GlobalScope.launch {
                                                            delay(2000) // 延迟2秒开始接收
                                                            println("[接收者] 开始接收数字...")
                                                            for (number in bufferedChannel) {
                                                                println("[接收者] 收到数字: $number")
                                                                delay(800) // 模拟慢速处理
                                                            }
                                                        }
                                                        joinAll(senderJob, receiverJob)
                                                    }

                                                    可以看到,因为默认的溢出策略是 SUSPEND,所以当缓冲区满了时,发送者会被挂起,直到接收者处理完一个元素,才会继续发送。

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    BufferOverflow 策略详解

                                                    当 Channel 的缓冲区满时,BufferOverflow 参数决定了如何处理新的发送请求:

                                                    SUSPEND(默认策略)

                                                    • 行为:当缓冲区满时,挂起发送操作直到有空间可用
                                                    • 特点:提供背压控制,防止生产者过快
                                                    • 使用场景:需要确保所有数据都被处理的场景
                                                    suspend fun demonstrateBasicOperations() {
                                                        //容量为 2,溢出策略为SUSPEND
                                                        val channel = Channel<String>(capacity = 2, onBufferOverflow = BufferOverflow.SUSPEND)
                                                        //发送的速度快
                                                        val job1 = GlobalScope.launch {
                                                            repeat(5) {
                                                                channel.send("Message-$it")
                                                                println("[发送者] 发送 Message-$it")
                                                            }
                                                            channel.close()
                                                        }
                                                        val job2 = GlobalScope.launch {
                                                            //除了用 channel.recrive 外,也可以直接 用 for 循环接收数据
                                                            for (message in channel) {
                                                                //接收的速度慢
                                                                delay(1000)
                                                                println("[接收者] 接收到: $message")
                                                            }
                                                        }
                                                        joinAll(job1, job2)
                                                    }

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    DROP_LATEST

                                                    • 行为:当缓冲区满时,丢弃新元素,保留缓冲区中的现有元素
                                                    • 特点:保护已缓冲的数据不被覆盖
                                                    • 使用场景:保护重要的历史数据,防止新数据覆盖
                                                    • 性能特点:发送操作永不阻塞,但新数据可能被丢弃
                                                    suspend fun demonstrateBasicOperations() {
                                                        val channel = Channel<String>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_LATEST)
                                                        val job1 = GlobalScope.launch {
                                                            repeat(5) {
                                                                channel.send("Message-$it")
                                                                println("[发送者] 发送 Message-$it")
                                                            }
                                                            channel.close()
                                                        }
                                                        val job2 = GlobalScope.launch {
                                                            for (message in channel) {
                                                                delay(1000)
                                                                println("[接收者] 接收到: $message")
                                                            }
                                                        }
                                                        joinAll(job1, job2)
                                                    }

                                                    可以看到,当缓冲区满时,会把新数据丢弃掉,因此,接收端只接收到了旧数据。

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    DROP_OLDEST

                                                    • 行为:当缓冲区满时,丢弃旧的元素,添加新元素
                                                    • 特点:保持固定的内存使用,优先保留新数据
                                                    • 使用场景:实时数据流、最近N个元素
                                                    • 性能特点:发送操作永不阻塞,但可能丢失历史数据
                                                    suspend fun demonstrateBasicOperations() {
                                                        val channel = Channel<String>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST)
                                                        val job1 = GlobalScope.launch {
                                                            repeat(5) {
                                                                channel.send("Message-$it")
                                                                println("[发送者] 发送 Message-$it")
                                                            }
                                                            channel.close()
                                                        }
                                                        val job2 = GlobalScope.launch {
                                                            for (message in channel) {
                                                                delay(1000)
                                                                println("[接收者] 接收到: $message")
                                                            }
                                                        }
                                                        joinAll(job1, job2)
                                                    }

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    需要注意的是,当缓冲区满了之后,1 和 2 被丢弃了,3 和 4 被放进去了。从这里可以看出,丢弃数据时,并不是把最早的旧数据丢掉,这里跟内部的实现有关。

                                                    onUndeliveredElement 回调

                                                    当元素无法送达时(如 Channel 被取消或关闭),会调用此回调函数

                                                    suspend fun demonstrateBasicOperations() {
                                                        val channel = Channel<String>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
                                                            println("[Channel] 缓冲区已满,无法放到缓冲区,值:${it}")
                                                        }
                                                        // 演示基本的send和receive操作
                                                        val job1 = GlobalScope.launch {
                                                            repeat(5) {
                                                                channel.send("Message-$it")
                                                                println("[发送者] 发送 Message-$it")
                                                            }
                                                            channel.close()
                                                        }
                                                        val job2 = GlobalScope.launch {
                                                            for (message in channel) {
                                                                delay(1000)
                                                                println("[接收者] 接收到: $message")
                                                            }
                                                        }
                                                        joinAll(job1, job2)
                                                    }

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    Channel 操作方式

                                                    Channel 提供了两种操作方式:阻塞操作和非阻塞操作。

                                                    阻塞操作(send/receive)

                                                    send()receive() 方法都是挂起方法,它们会阻塞当前协程,直到完成操作。

                                                    非阻塞操作(trySend/tryReceive)

                                                    trySend()tryReceive() 是 Channel 提供的非阻塞操作 API。与阻塞版本不同,这些方法会立即返回结果,不会挂起当前协程,也不会抛出异常。

                                                    操作对比

                                                    操作类型阻塞版本非阻塞版本行为差异
                                                    发送send()trySend()send() 会挂起直到有空间;trySend() 立即返回结果
                                                    接收receive()tryReceive()receive() 会挂起直到有数据;tryReceive() 立即返回结果

                                                    返回值类型

                                                    • trySend() 返回 ChannelResult<Unit>
                                                    • tryReceive() 返回 ChannelResult<T>

                                                    ChannelResult 是一个密封类,通过密封类中的成员 isSuccessgetOrNull() 可以判断操作是否成功。

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    大部分场景下,send / receive + 合理的 Channel 配置就能解决问题,trySend/tryReceive 更多的是想达到如下效果:

                                                    • 避免不必要的协程挂起开销,希望立即得到结果
                                                    • 提供更精细的控制逻辑,如:超时处理、重试机制等
                                                    • 实现更好的错误处理和用户反馈,能更好地处理异常场景
                                                    runBlocking {
                                                        val channel = Channel<Int>(2)
                                                        val sendJob = launch {
                                                            repeat(5) {
                                                                delay(100)
                                                                val sendResult = channel.trySend(it)
                                                                sendResult.onSuccess {
                                                                    println("发送成功")
                                                                }.onFailure {
                                                                    println("发送失败")
                                                                }.onClosed {
                                                                    println("通道已关闭")
                                                                }
                                                            }
                                                        }
                                                        val receiveJob = launch {
                                                            for (i in channel) {
                                                                delay(300)
                                                                println("接收到数据:${i}")
                                                            }
                                                        }
                                                        joinAll(sendJob, receiveJob)
                                                    }

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    Channel 状态管理

                                                    Channel 在其生命周期中会经历以下几个关键状态:

                                                    • 活跃状态(Active):可以正常发送和接收数据
                                                    • 发送端关闭(Closed for Send):不能发送新数据,但可以接收缓冲区中的数据
                                                    • 接收端关闭(Closed for Receive):不能接收数据,缓冲区已清空
                                                    • 取消状态(Cancelled):Channel 被取消,所有操作都会失败

                                                    API

                                                    • channel.close():关闭 Channel
                                                    • channel.isClosedForSend:判断发送端是否已关闭
                                                    • channel.isClosedForReceive:判断接收端是否已关闭
                                                    • channel.cancel():取消 Channel

                                                    Close(关闭操作)

                                                    • 调用 close() 后,isClosedForSend 立即变为 true
                                                    • 此时,缓冲区中的数据仍可被消费
                                                    • 只有当缓冲区清空后,isClosedForReceive 才变为 true

                                                    示例:

                                                        suspend fun demonstrateChannelClose() {
                                                        val channel = Channel<String>(1)
                                                        val producer = GlobalScope.launch {
                                                            try {
                                                                for (i in 1..5) {
                                                                    val message = "Message $i"
                                                                    println("准备发送: $message")
                                                                    channel.send(message)
                                                                    println("成功发送: $message")
                                                                    delay(100)
                                                                }
                                                            } catch (e: ClosedSendChannelException) {
                                                                println("生产者: Channel已关闭,无法发送数据 - ${e.message}")
                                                            }
                                                        }
                                                        val consumer = GlobalScope.launch {
                                                            try {
                                                                for (message in channel) {
                                                                    println("接收到: $message")
                                                                    delay(200)
                                                                }
                                                                println("消费者: Channel已关闭,退出接收循环")
                                                            } catch (e: Exception) {
                                                                println("消费者异常: ${e.message}")
                                                            }
                                                        }
                                                        delay(300) // 模拟让一些数据能够被接收到
                                                        // 检查Channel状态
                                                        println("关闭前状态:")
                                                        println("  isClosedForSend: ${channel.isClosedForSend}")
                                                        println("  isClosedForReceive: ${channel.isClosedForReceive}")
                                                        // 关闭Channel
                                                        println("\n正在关闭Channel...")
                                                        channel.close()
                                                        // 检查关闭后的状态
                                                        println("关闭后状态:")
                                                        println("  isClosedForSend: ${channel.isClosedForSend}")
                                                        println("  isClosedForReceive: ${channel.isClosedForReceive}")
                                                        // 等待协程完成
                                                        producer.join()
                                                        consumer.join()
                                                        println("最终状态:")
                                                        println("  isClosedForSend: ${channel.isClosedForSend}")
                                                        println("  isClosedForReceive: ${channel.isClosedForReceive}")
                                                    }

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    Cancel(取消操作)

                                                    cancel() 方法用于强制取消 Channel,它会:

                                                    • 立即关闭发送和接收端
                                                    • 清空缓冲区中的所有数据
                                                    • 触发 onUndeliveredElement 回调(如果设置了)
                                                    suspend fun demonstrateChannelCancel() {
                                                        val channel = Channel<String>(capacity = 5) {
                                                            println("消息未被接收:${it}")
                                                        }
                                                        val producer = GlobalScope.launch {
                                                            try {
                                                                for (i in 1..8) {
                                                                    val message = "Message $i"
                                                                    println("尝试发送: $message")
                                                                    channel.send(message)
                                                                    println("成功发送: $message")
                                                                    delay(100)
                                                                }
                                                            } catch (e: CancellationException) {
                                                                println("生产者: Channel被取消 - ${e.message}")
                                                            }
                                                        }
                                                        val consumer = GlobalScope.launch {
                                                            try {
                                                                for (message in channel) {
                                                                    println("接收到: $message")
                                                                    delay(300)
                                                                }
                                                            } catch (e: CancellationException) {
                                                                println("消费者: 协程被取消 - ${e.message}")
                                                            }
                                                        }
                                                        delay(400) // 让一些操作执行
                                                        println("\n取消前状态:")
                                                        println("  isClosedForSend: ${channel.isClosedForSend}")
                                                        println("  isClosedForReceive: ${channel.isClosedForReceive}")
                                                        // 取消Channel
                                                        println("\n正在取消Channel...")
                                                        channel.cancel(CancellationException("主动取消Channel"))
                                                        println("取消后状态:")
                                                        println("  isClosedForSend: ${channel.isClosedForSend}")
                                                        println("  isClosedForReceive: ${channel.isClosedForReceive}")
                                                        // 等待协程完成
                                                        producer.join()
                                                        consumer.join()
                                                    }

                                                    Kotlin 协程之Channel的概念和基本使用详解

                                                    Channel 异常处理

                                                    在使用 Channel 的过程中,会遇到各种异常情况。主要包括以下几种类型:

                                                    ClosedSendChannelException

                                                    触发条件:

                                                    • 在已关闭的 Channel 上调用 send() 方法
                                                    • Channel 调用 close() 后,发送端立即关闭

                                                    示例:

                                                    suspehttp://www.devze.comnd fun demonstrateClosedSendException() {
                                                        val channel = Channel<String>()
                                                        // 关闭 Channel
                                                        channel.close()
                                                        try {
                                                            // 尝试在已关闭的 Channel 上发送数据
                                                            channel.send("This will throw exception")
                                                        } catch (e: ClosedSendChannelException) {
                                                            println("捕获异常: ${e.message}")
                                                            println("异常类型: ${e::class.simpleName}")
                                                        }
                                                    }

                                                    ClosedReceiveChannelException

                                                    触发条件:

                                                    • 从已关闭且缓冲区为空的 Channel 调用 receive() 方法
                                                    • isClosedForReceivetrue 时调用 receive()

                                                    示例:

                                                    suspend fun demonstrateClosedReceiveException() {
                                                        val channel = Channel<String>()
                                                        // 关闭 Channel
                                                        channel.close()
                                                        try {
                                                            // 尝试从已关闭且空的 Channel 接收数据
                                                            val message = channel.receive()
                                                            println("收到消息: $message")
                                                        } catch (e: ClosedReceiveChannelException) {
                                                            println("捕获异常: ${e.message}")
                                                            println("异常类型: ${e::class.simpleName}")
                                                        }
                                                    }

                                                    CancellationException

                                                    触发条件:

                                                    • Channel 被 cancel() 方法取消
                                                    • 父协程被取消,导致 Channel 操作被取消
                                                    • 超时或其他取消信号

                                                    示例:

                                                    suspend fun demonstrateCancellationException() {
                                                        val channel = Channel<String>()
                                                        val job = GlobalScope.launch {
                                                            try {
                                                                // 这个操作会被取消
                                                                channel.send("This will be cancelled")
                                                            } catch (e: CancellationException) {
                                                                println("发送操作被取消: ${e.message}")
                                                                throw e // 重新抛出 CancellationException
                                                            }
                                                        }
                                                        delay(100)
                                                        // 取消 Channel
                                                        channel.cancel(CancellationException("手动取消 Channel"))
                                                        try {
                                                            job.join()
                                                        } catch (e: CancellationException) {
                                                            println("协程被取消: ${e.message}")
                                                        }
                                                    }

                                                    异常与状态关系

                                                    Channel 状态send() 行为receive() 行为trySend() 行为tryReceive() 行为
                                                    活跃状态正常发送或挂起正常接收或挂起返回成功/失败结果返回成功/失败结果
                                                    发送端关闭抛出 ClosedSendChannelException正常接收缓冲区数据返回失败结果正常返回结果
                                                    接收端关闭抛出 ClosedSendChannelException抛出 ClosedReceiveChannelException返回失败结果返回失败结果
                                                    已取消抛出 cHXgmqkqCancellationException抛出 CancellationException返回失败结果返回失败结果

                                                    异常处理技巧

                                                    使用非阻塞操作避免异常

                                                    非阻塞操作不会抛出异常,而是返回结果对象:

                                                    suspend fun safeChannelOperations() {
                                                        val channel = Channel<String>()
                                                       编程客栈 // 安全的发送操作
                                                        val sendResult = channel.trySend("Safe message")
                                                        when {
                                                            sendResult.isSuccess -> println("发送成功")
                                                            sendResult.isFailure -> println("发送失败: ${sendResult.exceptionOrNull()}")
                                                            sendResult.isClosed -> println("Channel 已关闭")
                                                        }
                                                        // 安全的接收操作
                                                        val receiveResult = channel.tryReceive()
                                                        when {
                                                            receiveResult.isSuccess -> println("接收到: ${receiveResult.getOrNull()}")
                                                            receiveResult.isFailure -> println("接收失败: ${receiveResult.exceptionOrNull()}")
                                                            receiveResult.isClosed -> println("Channel 已关闭")
                                                        }
                                                    }

                                                    健壮的异常处理

                                                    suspend fun robustChannelUsage() {
                                                        val channel = Channel<String>()
                                                        val producer = GlobalScope.launch {
                                                            try {
                                                                repeat(5) { i ->
                                                                    if (channel.isClosedForSend) {
                                                                        println("Channel 已关闭,停止发送")
                                                                        break
                                                                    }
                                                                    channel.send("Message $i")
                                                                    delay(100)
                                                                }
                                                            } catch (e: ClosedSendChannelException) {
                                                                println("生产者: Channel 已关闭")
                                                            } catch (e: CancellationException) {
                                                                println("生产者: 操作被取消")
                                                                throw e // 重新抛出取消异常
                                                            } finally {
                                                                println("生产者: 清理资源")
                                                            }
                                                        }
                                                        val consumer = GlobalScope.launch {
                                                            try {
                                                                while (!channel.isClosedForReceive) {
                                                                    try {
                                                                        val message = channel.receive()
                                                                        println("消费者: 收到 $message")
                                                                    } catch (e: ClosedReceiveChannelException) {
                                                                        println("消费者: Channel 已关闭且无更多数据")
                                                                        break
                                                                    }
                                                                    delay(200)
                                                                }
                                                            } catch (e: CancellationException) {
                                                                println("消费者: 操作被取消")
                                                                throw e
                                                            } finally {
                                                                println("消费者: 清理资源")
                                                            }
                                                        }
                                                        delay(1000)
                                                        channel.close()
                                                        joinAll(producer, consumer)
                                                    }

                                                    总结

                                                    Channel 关键概念对比

                                                    特性RENDEZVOUSCONFLATEDBUFFEREDUNLIMITED自定义容量
                                                    容量0164Int.MAX_VALUE指定值
                                                    缓冲行为无缓冲,同步只保留最新值有限缓冲无限缓冲有限缓冲
                                                    发送阻塞缓冲满时缓冲满时
                                                    适用场景严格同步状态更新一般异步高吞吐量批量处理
                                                    内存风险中等可控

                                                    溢出策略对比

                                                    策略行为性能特点适用场景
                                                    SUSPEND挂起发送操作提供背压控制确保数据完整性
                                                    DROP_OLDEST丢弃旧元素发送不阻塞实时数据流
                                                    DROP_LATEST丢弃新元素发送不阻塞保护历史数据

                                                    操作方式

                                                    操作类型阻塞版本非阻塞版本异常处理返回值
                                                    发送send()trySend()抛出异常ChannelResult<Unit>
                                                    接收receive()tryReceive()抛出异常ChannelResult<T>
                                                    特点会挂起协程立即返回需要 try-catch通过结果对象判断

                                                    Channel 状态生命周期

                                                    状态描述send()receive()检查方法
                                                    活跃正常工作状态✅ 正常✅ 正常-
                                                    发送关闭调用 close() 后❌ 异常✅ 可接收缓冲区数据isClosedForSend
                                                    接收关闭缓冲区清空后❌ 异常❌ 异常isClosedForReceive
                                                    已取消调用 cancel() 后❌ 异常❌ 异常-

                                                    总体来说,Channel 是一种非常强大的协程通信机制,它可以帮助我们在协程之间进行安全、高效的通信。在使用 Channel时,我们需要注意异常处理、缓冲区容量、溢出策略等问题。

                                                    感谢阅读,如果对你有帮助请三连(点赞、收藏、加关注)支持。有任何疑问或建议,欢迎在评论区留言讨论。如需转载,请注明出处:喻志强的博客

                                                    到此这篇关于Kotlin 协程之Channel的概念和基本使用详解的文章就介绍到这了,更多相关Kotlin 协程Channel使用内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

                                                    0

                                                    精彩评论

                                                    暂无评论...
                                                    验证码 换一张
                                                    取 消

                                                    关注公众号