EventChannel.kt 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780
  1. /*
  2. * Copyright 2019-2023 Mamoe Technologies and contributors.
  3. *
  4. * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
  5. * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
  6. *
  7. * https://github.com/mamoe/mirai/blob/dev/LICENSE
  8. */
  9. @file:JvmMultifileClass
  10. @file:JvmName("EventChannelKt")
  11. package net.mamoe.mirai.event
  12. import kotlinx.coroutines.*
  13. import kotlinx.coroutines.channels.Channel
  14. import kotlinx.coroutines.channels.ClosedSendChannelException
  15. import kotlinx.coroutines.channels.SendChannel
  16. import kotlinx.coroutines.flow.Flow
  17. import kotlinx.coroutines.flow.SharedFlow
  18. import kotlinx.coroutines.flow.filter
  19. import kotlinx.coroutines.sync.Mutex
  20. import net.mamoe.mirai.Bot
  21. import net.mamoe.mirai.IMirai
  22. import net.mamoe.mirai.event.ConcurrencyKind.CONCURRENT
  23. import net.mamoe.mirai.event.ConcurrencyKind.LOCKED
  24. import net.mamoe.mirai.event.events.BotEvent
  25. import net.mamoe.mirai.internal.event.JvmMethodListenersInternal
  26. import net.mamoe.mirai.utils.*
  27. import java.util.function.Consumer
  28. import kotlin.coroutines.CoroutineContext
  29. import kotlin.coroutines.EmptyCoroutineContext
  30. import kotlin.reflect.KClass
  31. /**
  32. * 事件通道.
  33. *
  34. * 事件通道是监听事件的入口, 但不负责广播事件. 要广播事件, 使用 [Event.broadcast] 或 [IMirai.broadcastEvent].
  35. *
  36. * ## 获取事件通道
  37. *
  38. * [EventChannel] 不可自行构造, 只能通过 [GlobalEventChannel], [BotEvent], 或基于一个通道的过滤等操作获得.
  39. *
  40. * ### 全局事件通道
  41. *
  42. * [GlobalEventChannel] 是单例对象, 表示全局事件通道, 可以获取到在其中广播的所有事件.
  43. *
  44. * ### [BotEvent] 事件通道
  45. *
  46. * 若只需要监听某个 [Bot] 的事件, 可通过 [Bot.eventChannel] 获取到这样的 [EventChannel].
  47. *
  48. * ## 通道操作
  49. *
  50. * ### 对通道的操作
  51. * - 过滤通道: 通过 [EventChannel.filter]. 例如 `filter { it is BotEvent }` 得到一个只能监听到 [BotEvent] 的事件通道.
  52. * - 转换为 Kotlin 协程 [Channel]: [EventChannel.forwardToChannel]
  53. * - 添加 [CoroutineContext]: [context], [parentJob], [parentScope], [exceptionHandler]
  54. *
  55. * ### 创建事件监听
  56. * - [EventChannel.subscribe] 创建带条件的一个事件监听器.
  57. * - [EventChannel.subscribeAlways] 创建一个总是监听事件的事件监听器.
  58. * - [EventChannel.subscribeOnce] 创建一个只监听单次的事件监听器.
  59. *
  60. * ### 监听器生命周期
  61. *
  62. * 阅读 [EventChannel.subscribe] 以获取监听器生命周期相关信息.
  63. *
  64. * ## 与 kotlinx-coroutines 交互
  65. *
  66. * mirai [EventChannel] 设计比 kotlinx-coroutines 的 [Flow] 稳定版更早.
  67. * [EventChannel] 的功能与 [Flow] 类似, 不过 [EventChannel] 在 [subscribe] (类似 [Flow.collect]) 时有优先级判定, 也允许[拦截][Event.intercept].
  68. *
  69. * ### 通过 [Flow] 接收事件
  70. *
  71. * 使用 [EventChannel.asFlow] 获得 [Flow], 然后可使用 [Flow.collect] 等操作.
  72. *
  73. * ### 转发事件到 [SendChannel]
  74. *
  75. * 使用 [EventChannel.forwardToChannel] 可将事件转发到指定 [SendChannel].
  76. */
  77. @NotStableForInheritance // since 2.12, before it was `final class`.
  78. public abstract class EventChannel<out BaseEvent : Event> @MiraiInternalApi public constructor(
  79. public val baseEventClass: KClass<out BaseEvent>,
  80. /**
  81. * 此事件通道的默认 [CoroutineScope.coroutineContext]. 将会被添加给所有注册的事件监听器.
  82. */
  83. public val defaultCoroutineContext: CoroutineContext,
  84. ) {
  85. /**
  86. * 创建事件监听并将监听结果转发到 [channel]. 当 [Channel.send] 抛出 [ClosedSendChannelException] 时停止 [Listener] 监听和转发.
  87. *
  88. * 返回创建的会转发监听到的所有事件到 [channel] 的[事件监听器][Listener]. [停止][Listener.complete] 该监听器会停止转发, 不会影响目标 [channel].
  89. *
  90. * 若 [Channel.send] 挂起, 则监听器也会挂起, 也就可能会导致事件广播过程挂起.
  91. *
  92. * 示例:
  93. *
  94. * ```
  95. * val eventChannel: EventChannel<BotEvent> = ...
  96. * val channel = Channel<BotEvent>() // kotlinx.coroutines.channels.Channel
  97. * eventChannel.forwardToChannel(channel, priority = ...)
  98. *
  99. * // 其他地方
  100. * val event: BotEvent = channel.receive() // 挂起并接收一个事件
  101. * ```
  102. *
  103. * @see subscribeAlways
  104. * @see Channel
  105. * @since 2.10
  106. */
  107. public fun forwardToChannel(
  108. channel: SendChannel<@UnsafeVariance BaseEvent>,
  109. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  110. priority: EventPriority = EventPriority.MONITOR,
  111. ): Listener<@UnsafeVariance BaseEvent> {
  112. return subscribe(baseEventClass, coroutineContext, priority = priority) {
  113. try {
  114. channel.send(it)
  115. ListeningStatus.LISTENING
  116. } catch (_: ClosedSendChannelException) {
  117. ListeningStatus.STOPPED
  118. }
  119. }
  120. }
  121. /**
  122. * 通过 [Flow] 接收此通道内的所有事件.
  123. *
  124. * ```
  125. * val eventChannel: EventChannel<BotEvent> = ...
  126. * val flow: Flow<BotEvent> = eventChannel.asFlow()
  127. *
  128. * flow.collect { // it
  129. * //
  130. * }
  131. *
  132. * flow.filterIsInstance<GroupMessageEvent>.collect { // it: GroupMessageEvent
  133. * // 处理事件 ...
  134. * }
  135. *
  136. * flow.filterIsInstance<FriendMessageEvent>.collect { // it: FriendMessageEvent
  137. * // 处理事件 ...
  138. * }
  139. * ```
  140. *
  141. * 类似于 [SharedFlow], [EventChannel.asFlow] 返回的 [Flow] 永远都不会停止. 因此上述示例 [Flow.collect] 永远都不会正常 (以抛出异常之外的) 结束.
  142. *
  143. * 通过 [asFlow] 接收事件相当于通过 [subscribeAlways] 以 [EventPriority.MONITOR] 监听事件.
  144. *
  145. * **注意**: [context], [parentJob] 等控制 [EventChannel.defaultCoroutineContext] 的操作对 [asFlow] 无效. 因为 [asFlow] 并不创建协程.
  146. *
  147. * @see Flow
  148. * @since 2.12
  149. */
  150. public abstract fun asFlow(): Flow<BaseEvent>
  151. // region transforming operations
  152. /**
  153. * 添加一个过滤器. 过滤器将在收到任何事件之后, 传递给通过 [EventChannel.subscribe] 注册的监听器之前调用.
  154. *
  155. * 若 [filter] 返回 `true`, 该事件将会被传给监听器. 否则将会被忽略, **监听器继续监听**.
  156. *
  157. * ## 线性顺序
  158. * 多个 [filter] 的处理是线性且有顺序的. 若一个 [filter] 已经返回了 `false` (代表忽略这个事件), 则会立即忽略, 而不会传递给后续过滤器.
  159. *
  160. * 示例:
  161. * ```
  162. * GlobalEventChannel // GlobalEventChannel 会收到全局所有事件, 事件类型是 Event
  163. * .filterIsInstance<BotEvent>() // 过滤, 只接受 BotEvent
  164. * .filter { event: BotEvent ->
  165. * // 此时的 event 一定是 BotEvent
  166. * event.bot.id == 123456 // 再过滤 event 的 bot.id
  167. * }
  168. * .subscribeAlways { event: BotEvent ->
  169. * // 现在 event 是 BotEvent, 且 bot.id == 123456
  170. * }
  171. * ```
  172. *
  173. * ## 过滤器挂起
  174. * [filter] 允许挂起协程. **过滤器的挂起将被认为是事件监听器的挂起**.
  175. *
  176. * 过滤器挂起是否会影响事件处理,
  177. * 取决于 [subscribe] 时的 [ConcurrencyKind] 和 [EventPriority].
  178. *
  179. * ## 过滤器异常处理
  180. * 若 [filter] 抛出异常, 将被包装为 [ExceptionInEventChannelFilterException] 并重新抛出.
  181. *
  182. * @see filterIsInstance 过滤指定类型的事件
  183. */
  184. @JvmSynthetic
  185. public fun filter(filter: suspend (event: BaseEvent) -> Boolean): EventChannel<BaseEvent> {
  186. return FilterEventChannel(this, filter)
  187. }
  188. /**
  189. * [EventChannel.filter] 的 Java 版本.
  190. *
  191. * 添加一个过滤器. 过滤器将在收到任何事件之后, 传递给通过 [EventChannel.subscribe] 注册的监听器之前调用.
  192. *
  193. * 若 [filter] 返回 `true`, 该事件将会被传给监听器. 否则将会被忽略, **监听器继续监听**.
  194. *
  195. * ## 线性顺序
  196. * 多个 [filter] 的处理是线性且有顺序的. 若一个 [filter] 已经返回了 `false` (代表忽略这个事件), 则会立即忽略, 而不会传递给后续过滤器.
  197. *
  198. * 示例:
  199. * ```
  200. * GlobalEventChannel // GlobalEventChannel 会收到全局所有事件, 事件类型是 Event
  201. * .filterIsInstance(BotEvent.class) // 过滤, 只接受 BotEvent
  202. * .filter(event ->
  203. * // 此时的 event 一定是 BotEvent
  204. * event.bot.id == 123456 // 再过滤 event 的 bot.id
  205. * )
  206. * .subscribeAlways(event -> {
  207. * // 现在 event 是 BotEvent, 且 bot.id == 123456
  208. * })
  209. * ```
  210. *
  211. * ## 过滤器阻塞
  212. * [filter] 允许阻塞线程. **过滤器的阻塞将被认为是事件监听器的阻塞**.
  213. *
  214. * 过滤器阻塞是否会影响事件处理,
  215. * 取决于 [subscribe] 时的 [ConcurrencyKind] 和 [EventPriority].
  216. *
  217. * ## 过滤器异常处理
  218. * 若 [filter] 抛出异常, 将被包装为 [ExceptionInEventChannelFilterException] 并重新抛出.
  219. *
  220. * @see filterIsInstance 过滤指定类型的事件
  221. *
  222. * @since 2.2
  223. */
  224. @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
  225. @kotlin.internal.LowPriorityInOverloadResolution
  226. public fun filter(filter: (event: BaseEvent) -> Boolean): EventChannel<BaseEvent> {
  227. return filter { runBIO { filter(it) } }
  228. }
  229. /**
  230. * 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
  231. * @see filter 获取更多信息
  232. */
  233. @JvmSynthetic
  234. public inline fun <reified E : Event> filterIsInstance(): EventChannel<E> =
  235. filterIsInstance(E::class)
  236. /**
  237. * 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
  238. * @see filter 获取更多信息
  239. */
  240. public fun <E : Event> filterIsInstance(kClass: KClass<out E>): EventChannel<E> {
  241. return filter { kClass.isInstance(it) }.cast()
  242. }
  243. /**
  244. * 过滤事件的类型. 返回一个只包含 [E] 类型事件的 [EventChannel]
  245. * @see filter 获取更多信息
  246. */
  247. public fun <E : Event> filterIsInstance(clazz: Class<out E>): EventChannel<E> =
  248. filterIsInstance(clazz.kotlin)
  249. /**
  250. * 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [coroutineContexts].
  251. * [coroutineContexts] 会覆盖 [defaultCoroutineContext] 中的重复元素.
  252. *
  253. * 此操作不会修改 [`this.coroutineContext`][defaultCoroutineContext], 只会创建一个新的 [EventChannel].
  254. */
  255. public abstract fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent>
  256. /**
  257. * 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [this.coroutineContext][defaultCoroutineContext] 和添加的 [coroutineExceptionHandler]
  258. * @see context
  259. */
  260. @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
  261. @kotlin.internal.LowPriorityInOverloadResolution
  262. public fun exceptionHandler(coroutineExceptionHandler: CoroutineExceptionHandler): EventChannel<BaseEvent> {
  263. return context(coroutineExceptionHandler)
  264. }
  265. /**
  266. * 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [coroutineExceptionHandler]
  267. * @see context
  268. */
  269. public fun exceptionHandler(coroutineExceptionHandler: (exception: Throwable) -> Unit): EventChannel<BaseEvent> {
  270. return context(CoroutineExceptionHandler { _, throwable ->
  271. coroutineExceptionHandler(throwable)
  272. })
  273. }
  274. /**
  275. * 创建一个新的 [EventChannel], 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [coroutineExceptionHandler]
  276. * @see context
  277. * @since 2.12
  278. */
  279. @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
  280. @kotlin.internal.LowPriorityInOverloadResolution
  281. public fun exceptionHandler(coroutineExceptionHandler: Consumer<Throwable>): EventChannel<BaseEvent> {
  282. return exceptionHandler { coroutineExceptionHandler.accept(it) }
  283. }
  284. /**
  285. * 将 [coroutineScope] 作为这个 [EventChannel] 的父作用域.
  286. *
  287. * 实际作用为创建一个新的 [EventChannel],
  288. * 该 [EventChannel] 包含 [`this.coroutineContext`][defaultCoroutineContext] 和添加的 [CoroutineScope.coroutineContext],
  289. * 并以 [CoroutineScope] 中 [Job] (如果有) [作为父 Job][parentJob]
  290. *
  291. * @see parentJob
  292. * @see context
  293. *
  294. * @see CoroutineScope.globalEventChannel `GlobalEventChannel.parentScope()` 的扩展
  295. */
  296. public fun parentScope(coroutineScope: CoroutineScope): EventChannel<BaseEvent> {
  297. return context(coroutineScope.coroutineContext)
  298. }
  299. /**
  300. * 指定协程父 [Job]. 之后在此 [EventChannel] 下创建的事件监听器都会成为 [job] 的子任务, 当 [job] 被取消时, 所有的事件监听器都会被取消.
  301. *
  302. * 注意: 监听器不会失败 ([Job.cancel]). 监听器处理过程的异常都会被捕获然后交由 [CoroutineExceptionHandler] 处理, 因此 [job] 不会因为子任务监听器的失败而被取消.
  303. *
  304. * @see parentScope
  305. * @see context
  306. */
  307. public fun parentJob(job: Job): EventChannel<BaseEvent> {
  308. return context(job)
  309. }
  310. // endregion
  311. // region subscribe
  312. /**
  313. * 创建一个事件监听器, 监听事件通道中所有 [E] 及其子类事件.
  314. *
  315. * 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
  316. *
  317. *
  318. * ## 创建监听
  319. * 调用本函数:
  320. * ```
  321. * eventChannel.subscribe<E> { /* 会收到此通道中的所有是 E 的事件 */ }
  322. * ```
  323. *
  324. * ## 生命周期
  325. *
  326. * ### 通过协程作用域管理监听器
  327. * 本函数将会创建一个 [Job], 成为 [parentJob] 中的子任务. 可创建一个 [CoroutineScope] 来管理所有的监听器:
  328. * ```
  329. * val scope = CoroutineScope(SupervisorJob())
  330. *
  331. * val scopedChannel = eventChannel.parentScope(scope) // 将协程作用域 scope 附加到这个 EventChannel
  332. *
  333. * scopedChannel.subscribeAlways<MemberJoinEvent> { /* ... */ } // 启动监听, 监听器协程会作为 scope 的子任务
  334. * scopedChannel.subscribeAlways<MemberMuteEvent> { /* ... */ } // 启动监听, 监听器协程会作为 scope 的子任务
  335. *
  336. * scope.cancel() // 停止了协程作用域, 也就取消了两个监听器
  337. * ```
  338. *
  339. * 这个函数返回 [Listener], 它是一个 [CompletableJob]. 它会成为 [parentJob] 或 [parentScope] 的一个 [子任务][Job]
  340. *
  341. * ### 停止监听
  342. * 如果 [handler] 返回 [ListeningStatus.STOPPED] 监听器将被停止.
  343. *
  344. * 也可以通过 [subscribe] 返回值 [Listener] 的 [Listener.complete]
  345. *
  346. * ## 监听器调度
  347. * 监听器会被创建一个协程任务, 语义上在 [parentScope] 下运行.
  348. * 通过 Kotlin [默认协程调度器][Dispatchers.Default] 在固定的全局共享线程池里执行, 除非有 [coroutineContext] 指定.
  349. *
  350. * 默认在 [handler] 中不能处理阻塞任务. 阻塞任务将会阻塞一个 Kotlin 全局协程调度线程并可能导致严重问题.
  351. * 请通过 `withContext(Dispatchers.IO) { }` 等方法执行阻塞工作.
  352. *
  353. * ## 异常处理
  354. *
  355. * **监听过程抛出的异常是需要尽可能避免的, 因为这将产生不确定性.**
  356. *
  357. * 当参数 [handler] 处理事件抛出异常时, 只会从监听方协程上下文 ([CoroutineContext]) 寻找 [CoroutineExceptionHandler] 处理异常, 即如下顺序:
  358. * 1. 本函数参数 [coroutineContext]
  359. * 2. [EventChannel.defaultCoroutineContext]
  360. * 3. 若以上步骤无法获取 [CoroutineExceptionHandler], 则只会在日志记录异常.
  361. * 因此建议先指定 [CoroutineExceptionHandler] (可通过 [EventChannel.exceptionHandler]) 再监听事件, 或者在监听事件中捕获异常.
  362. *
  363. * 因此, 广播方 ([Event.broadcast]) 不会知晓监听方产生的异常, 其 [Event.broadcast] 过程也不会因监听方产生异常而提前结束.
  364. *
  365. * ***备注***: 在 2.11 以前, 发生上述异常时还会从广播方和有关 [Bot] 协程域获取 [CoroutineExceptionHandler]. 因此行为不稳定而在 2.11 变更为上述过程.
  366. *
  367. * 事件处理时抛出异常不会停止监听器.
  368. *
  369. * 建议在事件处理中 (即 [handler] 里) 处理异常,
  370. * 或在参数 [coroutineContext] 中添加 [CoroutineExceptionHandler], 或通过 [EventChannel.exceptionHandler].
  371. *
  372. * ## 并发安全性
  373. * 基于 [concurrency] 参数, 事件监听器可以被允许并行执行.
  374. *
  375. * - 若 [concurrency] 为 [ConcurrencyKind.CONCURRENT], [handler] 可能被并行调用, 需要保证并发安全.
  376. * - 若 [concurrency] 为 [ConcurrencyKind.LOCKED], [handler] 会被 [Mutex] 限制, 串行异步执行.
  377. *
  378. * ## 衍生监听方法
  379. *
  380. * 这些方法仅 Kotlin 可用.
  381. *
  382. * - [syncFromEvent]: 挂起当前协程, 监听一个事件, 并尝试从这个事件中**获取**一个值
  383. * - [nextEvent]: 挂起当前协程, 直到监听到特定类型事件的广播并通过过滤器, 返回这个事件实例.
  384. *
  385. * @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext].
  386. * @param concurrency 并发类型. 查看 [ConcurrencyKind]
  387. * @param priority 监听优先级,优先级越高越先执行
  388. * @param handler 事件处理器. 在接收到事件时会调用这个处理器. 其返回值意义参考 [ListeningStatus]. 其异常处理参考上文
  389. *
  390. * @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
  391. *
  392. *
  393. * @see selectMessages 以 `when` 的语法 '选择' 即将到来的一条消息.
  394. * @see whileSelectMessages 以 `when` 的语法 '选择' 即将到来的所有消息, 直到不满足筛选结果.
  395. *
  396. * @see subscribeAlways 一直监听
  397. * @see subscribeOnce 只监听一次
  398. *
  399. * @see subscribeMessages 监听消息 DSL
  400. */
  401. @JvmSynthetic
  402. public inline fun <reified E : Event> subscribe(
  403. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  404. concurrency: ConcurrencyKind = LOCKED,
  405. priority: EventPriority = EventPriority.NORMAL,
  406. noinline handler: suspend E.(E) -> ListeningStatus,
  407. ): Listener<E> = subscribe(E::class, coroutineContext, concurrency, priority, handler)
  408. /**
  409. * 与 [subscribe] 的区别是接受 [eventClass] 参数, 而不使用 `reified` 泛型. 通常推荐使用具体化类型参数.
  410. *
  411. * @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
  412. * @see subscribe
  413. */
  414. @JvmSynthetic
  415. public fun <E : Event> subscribe(
  416. eventClass: KClass<out E>,
  417. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  418. concurrency: ConcurrencyKind = LOCKED,
  419. priority: EventPriority = EventPriority.NORMAL,
  420. handler: suspend E.(E) -> ListeningStatus,
  421. ): Listener<E> = subscribeInternal(
  422. eventClass,
  423. createListener0(coroutineContext, concurrency, priority) { it.handler(it); }
  424. )
  425. /**
  426. * 创建一个事件监听器, 监听事件通道中所有 [E] 及其子类事件.
  427. * 每当 [事件广播][Event.broadcast] 时, [handler] 都会被执行.
  428. *
  429. * 可在任意时候通过 [Listener.complete] 来主动停止监听.
  430. *
  431. * @param concurrency 并发类型默认为 [CONCURRENT]
  432. * @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext]
  433. * @param priority 处理优先级, 优先级高的先执行
  434. *
  435. * @return 监听器实例. 此监听器已经注册到指定事件上, 在事件广播时将会调用 [handler]
  436. *
  437. * @see subscribe 获取更多说明
  438. */
  439. @JvmSynthetic
  440. public inline fun <reified E : Event> subscribeAlways(
  441. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  442. concurrency: ConcurrencyKind = CONCURRENT,
  443. priority: EventPriority = EventPriority.NORMAL,
  444. noinline handler: suspend E.(E) -> Unit,
  445. ): Listener<E> = subscribeAlways(E::class, coroutineContext, concurrency, priority, handler)
  446. /**
  447. * @see subscribe
  448. * @see subscribeAlways
  449. */
  450. @JvmSynthetic
  451. public fun <E : Event> subscribeAlways(
  452. eventClass: KClass<out E>,
  453. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  454. concurrency: ConcurrencyKind = CONCURRENT,
  455. priority: EventPriority = EventPriority.NORMAL,
  456. handler: suspend E.(E) -> Unit,
  457. ): Listener<E> = subscribeInternal(
  458. eventClass,
  459. createListener0(coroutineContext, concurrency, priority) { it.handler(it); ListeningStatus.LISTENING }
  460. )
  461. /**
  462. * 创建一个事件监听器, 监听事件通道中所有 [E] 及其子类事件, 只监听一次.
  463. * 当 [事件广播][Event.broadcast] 时, [handler] 会被执行.
  464. *
  465. * 可在任意时候通过 [Listener.complete] 来主动停止监听.
  466. *
  467. * @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext]
  468. * @param priority 处理优先级, 优先级高的先执行
  469. *
  470. * @see subscribe 获取更多说明
  471. */
  472. @JvmSynthetic
  473. public inline fun <reified E : Event> subscribeOnce(
  474. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  475. priority: EventPriority = EventPriority.NORMAL,
  476. noinline handler: suspend E.(E) -> Unit,
  477. ): Listener<E> = subscribeOnce(E::class, coroutineContext, priority, handler)
  478. /**
  479. * @see subscribeOnce
  480. */
  481. public fun <E : Event> subscribeOnce(
  482. eventClass: KClass<out E>,
  483. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  484. priority: EventPriority = EventPriority.NORMAL,
  485. handler: suspend E.(E) -> Unit,
  486. ): Listener<E> = subscribeInternal(
  487. eventClass,
  488. createListener0(coroutineContext, ConcurrencyKind.LOCKED, priority) { it.handler(it); ListeningStatus.STOPPED }
  489. )
  490. // endregion
  491. /**
  492. * 注册 [ListenerHost] 中的所有 [EventHandler] 标注的方法到这个 [EventChannel]. 查看 [EventHandler].
  493. *
  494. * @param coroutineContext 在 [defaultCoroutineContext] 的基础上, 给事件监听协程的额外的 [CoroutineContext]
  495. *
  496. * @see subscribe
  497. * @see EventHandler
  498. * @see ListenerHost
  499. */
  500. @JvmOverloads
  501. public fun registerListenerHost(
  502. host: ListenerHost,
  503. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  504. ) {
  505. val jobOfListenerHost: Job?
  506. val coroutineContext0 = if (host is SimpleListenerHost) {
  507. val listenerCoroutineContext = host.coroutineContext
  508. val listenerJob = listenerCoroutineContext[Job]
  509. val rsp = listenerCoroutineContext.minusKey(Job) +
  510. coroutineContext +
  511. (listenerCoroutineContext[CoroutineExceptionHandler] ?: EmptyCoroutineContext)
  512. val registerCancelHook = when {
  513. listenerJob === null -> false
  514. // Registering cancellation hook is needless
  515. // if [Job] of [EventChannel] is same as [Job] of [SimpleListenerHost]
  516. (rsp[Job] ?: this.defaultCoroutineContext[Job]) === listenerJob -> false
  517. else -> true
  518. }
  519. jobOfListenerHost = if (registerCancelHook) {
  520. listenerCoroutineContext[Job]
  521. } else {
  522. null
  523. }
  524. rsp
  525. } else {
  526. jobOfListenerHost = null
  527. coroutineContext
  528. }
  529. for (method in host.javaClass.declaredMethods) {
  530. method.getAnnotation(EventHandler::class.java)?.let {
  531. val listener =
  532. JvmMethodListenersInternal.registerEventHandler(method, host, this, it, coroutineContext0)
  533. // For [SimpleListenerHost.cancelAll]
  534. jobOfListenerHost?.invokeOnCompletion { exception ->
  535. listener.cancel(
  536. when (exception) {
  537. is CancellationException -> exception
  538. is Throwable -> CancellationException(null, exception)
  539. else -> null
  540. }
  541. )
  542. }
  543. }
  544. }
  545. }
  546. // region Java API
  547. /**
  548. * Java API. 查看 [subscribeAlways] 获取更多信息.
  549. *
  550. * ```java
  551. * eventChannel.subscribeAlways(GroupMessageEvent.class, (event) -> { });
  552. * ```
  553. *
  554. * @see subscribe
  555. * @see subscribeAlways
  556. */
  557. @JvmOverloads
  558. @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
  559. @kotlin.internal.LowPriorityInOverloadResolution
  560. public fun <E : Event> subscribeAlways(
  561. eventClass: Class<out E>,
  562. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  563. concurrency: ConcurrencyKind = CONCURRENT,
  564. priority: EventPriority = EventPriority.NORMAL,
  565. handler: Consumer<E>,
  566. ): Listener<E> = subscribeInternal(
  567. eventClass.kotlin,
  568. createListener0(coroutineContext, concurrency, priority) { event ->
  569. runInterruptible(Dispatchers.IO) { handler.accept(event) }
  570. ListeningStatus.LISTENING
  571. }
  572. )
  573. /**
  574. * Java API. 查看 [subscribe] 获取更多信息.
  575. *
  576. * ```java
  577. * eventChannel.subscribe(GroupMessageEvent.class, (event) -> {
  578. * return ListeningStatus.LISTENING;
  579. * });
  580. * ```
  581. *
  582. * @see subscribe
  583. */
  584. @JvmOverloads
  585. @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
  586. @kotlin.internal.LowPriorityInOverloadResolution
  587. public fun <E : Event> subscribe(
  588. eventClass: Class<out E>,
  589. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  590. concurrency: ConcurrencyKind = CONCURRENT,
  591. priority: EventPriority = EventPriority.NORMAL,
  592. handler: java.util.function.Function<E, ListeningStatus>,
  593. ): Listener<E> = subscribeInternal(
  594. eventClass.kotlin,
  595. createListener0(coroutineContext, concurrency, priority) { event ->
  596. runInterruptible(Dispatchers.IO) { handler.apply(event) }
  597. }
  598. )
  599. /**
  600. * Java API. 查看 [subscribeOnce] 获取更多信息.
  601. *
  602. * ```java
  603. * eventChannel.subscribeOnce(GroupMessageEvent.class, (event) -> { });
  604. * ```
  605. *
  606. * @see subscribe
  607. * @see subscribeOnce
  608. */
  609. @JvmOverloads
  610. @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
  611. @kotlin.internal.LowPriorityInOverloadResolution
  612. public fun <E : Event> subscribeOnce(
  613. eventClass: Class<out E>,
  614. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  615. concurrency: ConcurrencyKind = CONCURRENT,
  616. priority: EventPriority = EventPriority.NORMAL,
  617. handler: Consumer<E>,
  618. ): Listener<E> = subscribeInternal(
  619. eventClass.kotlin,
  620. createListener0(coroutineContext, concurrency, priority) { event ->
  621. runInterruptible(Dispatchers.IO) { handler.accept(event) }
  622. ListeningStatus.STOPPED
  623. }
  624. )
  625. // endregion
  626. // region deprecated
  627. /**
  628. * 创建事件监听并将监听结果发送在 [Channel]. 将返回值 [Channel] [关闭][Channel.close] 时将会同时关闭事件监听.
  629. *
  630. * ## 已弃用
  631. *
  632. * 请使用 [forwardToChannel] 替代.
  633. *
  634. * @param capacity Channel 容量. 详见 [Channel] 构造.
  635. *
  636. * @see subscribeAlways
  637. * @see Channel
  638. */
  639. @Deprecated(
  640. "Please use forwardToChannel instead.",
  641. replaceWith = ReplaceWith(
  642. "Channel<BaseEvent>(capacity).apply { forwardToChannel(this, coroutineContext, priority) }",
  643. "kotlinx.coroutines.channels.Channel"
  644. ),
  645. level = DeprecationLevel.ERROR,
  646. )
  647. @DeprecatedSinceMirai(warningSince = "2.10", errorSince = "2.14")
  648. @MiraiExperimentalApi
  649. public fun asChannel(
  650. capacity: Int = Channel.RENDEZVOUS,
  651. coroutineContext: CoroutineContext = EmptyCoroutineContext,
  652. @Suppress("UNUSED_PARAMETER") concurrency: ConcurrencyKind = CONCURRENT,
  653. priority: EventPriority = EventPriority.NORMAL,
  654. ): Channel<out BaseEvent> =
  655. Channel<BaseEvent>(capacity).apply { forwardToChannel(this, coroutineContext, priority) }
  656. // endregion
  657. // region impl
  658. // protected, to hide from users
  659. @MiraiInternalApi
  660. protected abstract fun <E : Event> registerListener(eventClass: KClass<out E>, listener: Listener<E>)
  661. // to overcome visibility issue
  662. @OptIn(MiraiInternalApi::class)
  663. internal fun <E : Event> registerListener0(eventClass: KClass<out E>, listener: Listener<E>) {
  664. return registerListener(eventClass, listener)
  665. }
  666. @OptIn(MiraiInternalApi::class)
  667. private fun <L : Listener<E>, E : Event> subscribeInternal(eventClass: KClass<out E>, listener: L): L {
  668. registerListener(eventClass, listener)
  669. return listener
  670. }
  671. /**
  672. * Creates [Listener] instance using the [listenerBlock] action.
  673. */
  674. // @Contract("_ -> new") // always creates new instance
  675. @MiraiInternalApi
  676. protected abstract fun <E : Event> createListener(
  677. coroutineContext: CoroutineContext,
  678. concurrencyKind: ConcurrencyKind,
  679. priority: EventPriority,
  680. listenerBlock: suspend (E) -> ListeningStatus,
  681. ): Listener<E>
  682. // to overcome visibility issue
  683. @OptIn(MiraiInternalApi::class)
  684. internal fun <E : Event> createListener0(
  685. coroutineContext: CoroutineContext,
  686. concurrencyKind: ConcurrencyKind,
  687. priority: EventPriority,
  688. listenerBlock: suspend (E) -> ListeningStatus,
  689. ): Listener<E> = createListener(coroutineContext, concurrencyKind, priority, listenerBlock)
  690. // endregion
  691. }
  692. // used by mirai-core
  693. @OptIn(MiraiInternalApi::class)
  694. internal open class FilterEventChannel<BaseEvent : Event>(
  695. private val delegate: EventChannel<BaseEvent>,
  696. private val filter: suspend (event: BaseEvent) -> Boolean,
  697. ) : EventChannel<BaseEvent>(delegate.baseEventClass, delegate.defaultCoroutineContext) {
  698. private fun <E : Event> intercept(block: suspend (E) -> ListeningStatus): suspend (E) -> ListeningStatus {
  699. return { ev ->
  700. val filterResult = try {
  701. @Suppress("UNCHECKED_CAST")
  702. baseEventClass.isInstance(ev) && filter(ev as BaseEvent)
  703. } catch (e: Throwable) {
  704. if (e is ExceptionInEventChannelFilterException) throw e // wrapped by another filter
  705. throw ExceptionInEventChannelFilterException(ev, this, cause = e)
  706. }
  707. if (filterResult) block.invoke(ev)
  708. else ListeningStatus.LISTENING
  709. }
  710. }
  711. override fun asFlow(): Flow<BaseEvent> = delegate.asFlow().filter(filter)
  712. override fun <E : Event> registerListener(eventClass: KClass<out E>, listener: Listener<E>) {
  713. delegate.registerListener0(eventClass, listener)
  714. }
  715. override fun <E : Event> createListener(
  716. coroutineContext: CoroutineContext,
  717. concurrencyKind: ConcurrencyKind,
  718. priority: EventPriority,
  719. listenerBlock: suspend (E) -> ListeningStatus
  720. ): Listener<E> = delegate.createListener0(coroutineContext, concurrencyKind, priority, intercept(listenerBlock))
  721. override fun context(vararg coroutineContexts: CoroutineContext): EventChannel<BaseEvent> {
  722. return delegate.context(*coroutineContexts)
  723. }
  724. }