Răsfoiți Sursa

OutgoingMessagePipeline draft

Him188 4 ani în urmă
părinte
comite
7a6a784146
20 a modificat fișierele cu 1036 adăugiri și 120 ștergeri
  1. 45 0
      mirai-core-utils/src/commonMain/kotlin/Collections.kt
  2. 20 0
      mirai-core-utils/src/commonMain/kotlin/TimeSource.kt
  3. 5 4
      mirai-core-utils/src/commonMain/kotlin/TypeSafeMap.kt
  4. 9 9
      mirai-core/src/commonMain/kotlin/MiraiImpl.kt
  5. 17 2
      mirai-core/src/commonMain/kotlin/contact/AbstractContact.kt
  6. 5 81
      mirai-core/src/commonMain/kotlin/contact/SendMessageHandler.kt
  7. 2 1
      mirai-core/src/commonMain/kotlin/contact/util.kt
  8. 475 0
      mirai-core/src/commonMain/kotlin/network/message/OutgoingMessagePhasesCommon.kt
  9. 43 0
      mirai-core/src/commonMain/kotlin/network/message/OutgoingMessagePhasesGroup.kt
  10. 107 0
      mirai-core/src/commonMain/kotlin/network/message/OutgoingMessagePipeline.kt
  11. 67 0
      mirai-core/src/commonMain/kotlin/network/pipeline/Node.kt
  12. 32 0
      mirai-core/src/commonMain/kotlin/network/pipeline/PhaseConfiguration.kt
  13. 66 0
      mirai-core/src/commonMain/kotlin/network/pipeline/PipelineConfiguration.kt
  14. 74 0
      mirai-core/src/commonMain/kotlin/network/pipeline/PipelineConfigurationBuilder.kt
  15. 31 0
      mirai-core/src/commonMain/kotlin/network/pipeline/PipelineContext.kt
  16. 18 16
      mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/MultiMsg.kt
  17. 2 2
      mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/MusicSharePacket.kt
  18. 14 0
      mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/SendMessageResponse.kt
  19. 2 1
      mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/receive/MessageSvc.PbSendMsg.kt
  20. 2 4
      mirai-core/src/jvmTest/kotlin/message/data/MessageReceiptTest.kt

+ 45 - 0
mirai-core-utils/src/commonMain/kotlin/Collections.kt

@@ -0,0 +1,45 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+@file:JvmMultifileClass
+@file:JvmName("MiraiUtils")
+
+package net.mamoe.mirai.utils
+
+
+public class ListIndexer @PublishedApi internal constructor(
+    private val lastIndex: Int
+) {
+    @PublishedApi
+    internal var currentIndex: Int = 0
+
+    public fun setNextIndex(index: Int) {
+        require(index <= lastIndex) { "it must follows index <= lastIndex" }
+        currentIndex = index
+    }
+
+    @PublishedApi
+    internal fun nextIndex(): Boolean {
+        val next = currentIndex + 1
+        if (next <= lastIndex) {
+            currentIndex = next
+            return true
+        }
+        return false
+    }
+}
+
+public inline fun <C, L> L.forEachWithIndexer(block: ListIndexer.(item: C) -> Unit) where L : List<C>, L : RandomAccess {
+    ListIndexer(lastIndex).run {
+        do {
+            block(get(currentIndex))
+        } while (nextIndex())
+    }
+}
+

+ 20 - 0
mirai-core-utils/src/commonMain/kotlin/TimeSource.kt

@@ -0,0 +1,20 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.utils
+
+
+public interface TimeSource {
+    public fun currentTimeMillis(): Long
+    public fun currentTimeSeconds(): Long = currentTimeMillis() / 1000
+
+    public object System : TimeSource {
+        override fun currentTimeMillis(): Long = java.lang.System.currentTimeMillis()
+    }
+}

+ 5 - 4
mirai-core-utils/src/commonMain/kotlin/TypeSafeMap.kt

@@ -21,7 +21,8 @@ import kotlin.contracts.contract
 public value class TypeKey<T>(public val name: String) {
     override fun toString(): String = "Key($name)"
 
-    public inline infix fun to(value: T): TypeSafeMap = buildTypeSafeMap { set(this@TypeKey, value) }
+    public inline infix fun to(value: T): TypeSafeMap =
+        buildTypeSafeMap { set(this@TypeKey, value) }
 }
 
 /**
@@ -30,7 +31,8 @@ public value class TypeKey<T>(public val name: String) {
 public sealed interface TypeSafeMap {
     public val size: Int
 
-    public operator fun <T> get(key: TypeKey<T>): T
+    public operator fun <T> get(key: TypeKey<T>): T = getOrNull(key) ?: throw NoSuchElementException(key.toString())
+    public fun <T> getOrNull(key: TypeKey<T>): T?
     public operator fun <T> contains(key: TypeKey<T>): Boolean = get(key) != null
 
     public fun toMapBoxed(): Map<TypeKey<*>, Any?>
@@ -77,8 +79,7 @@ internal open class TypeSafeMapImpl(
         return "TypeSafeMapImpl(map=$map)"
     }
 
-    override operator fun <T> get(key: TypeKey<T>): T =
-        map[key.name]?.uncheckedCast() ?: throw NoSuchElementException(key.toString())
+    override fun <T> getOrNull(key: TypeKey<T>): T? = map[key.name]?.uncheckedCast()
 
     override operator fun <T> contains(key: TypeKey<T>): Boolean = get(key) != null
 

+ 9 - 9
mirai-core/src/commonMain/kotlin/MiraiImpl.kt

@@ -14,6 +14,7 @@ import io.ktor.client.engine.okhttp.*
 import io.ktor.client.features.*
 import io.ktor.client.request.*
 import io.ktor.client.request.forms.*
+import io.ktor.http.*
 import io.ktor.util.*
 import io.ktor.utils.io.core.*
 import kotlinx.coroutines.currentCoroutineContext
@@ -622,20 +623,19 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
     }
 
     internal open suspend fun uploadMessageHighway(
-        bot: Bot,
-        sendMessageHandler: SendMessageHandler<*>,
-        message: Collection<ForwardMessage.INode>,
+        contact: AbstractContact,
+        nodes: Collection<ForwardMessage.INode>,
         isLong: Boolean,
-    ): String = with(bot.asQQAndroidBot()) {
-        message.forEach {
+    ): String = contact.bot.run {
+        nodes.forEach {
             it.messageChain.ensureSequenceIdAvailable()
         }
 
 
-        val data = message.calculateValidationData(
+        val data = nodes.calculateValidationData(
             client = client,
             random = Random.nextInt().absoluteValue,
-            sendMessageHandler,
+            contact,
             isLong,
         )
 
@@ -644,7 +644,7 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
                 buType = if (isLong) 1 else 2,
                 client = bot.client,
                 messageData = data,
-                dstUin = sendMessageHandler.targetUin
+                dstUin = contact.uin
             ).sendAndExpect()
         }
 
@@ -664,7 +664,7 @@ internal open class MiraiImpl : IMirai, LowLevelApiAccessor {
                     msgUpReq = listOf(
                         LongMsg.MsgUpReq(
                             msgType = 3, // group
-                            dstUin = sendMessageHandler.targetUin,
+                            dstUin = contact.uin,
                             msgId = 0,
                             msgUkey = response.proto.msgUkey,
                             needCache = 0,

+ 17 - 2
mirai-core/src/commonMain/kotlin/contact/AbstractContact.kt

@@ -7,16 +7,31 @@
  * https://github.com/mamoe/mirai/blob/dev/LICENSE
  */
 
+@file:Suppress("NOTHING_TO_INLINE")
+
 package net.mamoe.mirai.internal.contact
 
 import net.mamoe.mirai.contact.Contact
+import net.mamoe.mirai.contact.Group
+import net.mamoe.mirai.contact.User
 import net.mamoe.mirai.internal.QQAndroidBot
 import net.mamoe.mirai.utils.childScopeContext
+import kotlin.contracts.contract
 import kotlin.coroutines.CoroutineContext
 
-internal abstract class AbstractContact(
+internal sealed class AbstractContact(
     final override val bot: QQAndroidBot,
     parentCoroutineContext: CoroutineContext,
 ) : Contact {
     final override val coroutineContext: CoroutineContext = parentCoroutineContext.childScopeContext()
-}
+}
+
+internal inline fun Contact.impl(): AbstractContact {
+    contract { returns() implies (this@impl is AbstractContact) }
+    return this as AbstractContact
+}
+
+internal val Contact.groupCodeOrNull: Long? get() = if (this is Group) this.groupCode else null
+internal val Contact.groupUinOrNull: Long? get() = if (this is Group) this.uin else null
+internal val Contact.userIdOrNull: Long? get() = if (this is User) this.id else null
+internal val Contact.uin: Long get() = if (this is Group) this.uin else this.id

+ 5 - 81
mirai-core/src/commonMain/kotlin/contact/SendMessageHandler.kt

@@ -27,9 +27,7 @@ import net.mamoe.mirai.internal.network.protocol.data.proto.MsgComm
 import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
 import net.mamoe.mirai.internal.network.protocol.packet.chat.FileManagement
 import net.mamoe.mirai.internal.network.protocol.packet.chat.MusicSharePacket
-import net.mamoe.mirai.internal.network.protocol.packet.chat.image.ImgStore
 import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.*
-import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect
 import net.mamoe.mirai.message.MessageReceipt
 import net.mamoe.mirai.message.data.*
 import net.mamoe.mirai.utils.castOrNull
@@ -100,7 +98,7 @@ internal abstract class SendMessageHandler<C : Contact> {
                 }
 
                 if (!contains(IgnoreLengthCheck)) {
-                    verityLength(this, contact)
+                    checkLength(this, contact)
                 }
 
                 this
@@ -235,7 +233,7 @@ internal abstract class SendMessageHandler<C : Contact> {
         chain: MessageChain,
     ): String = with(contact) {
         return getMiraiImpl().uploadMessageHighway(
-            bot, this@SendMessageHandler,
+            contact.impl(),
             listOf(
                 ForwardMessage.Node(
                     senderId = bot.id,
@@ -277,9 +275,8 @@ internal suspend fun <C : Contact> SendMessageHandler<C>.transformSpecialMessage
         }
 
         val resId = getMiraiImpl().uploadMessageHighway(
-            bot = contact.bot,
-            sendMessageHandler = this,
-            message = forward.nodeList,
+            contact = contact.impl(),
+            nodes = forward.nodeList,
             isLong = false,
         )
         return RichMessage.forwardMessage(
@@ -374,16 +371,7 @@ internal open class GroupSendMessageHandler(
         get() = contact.botAsMember.nameCardOrNick
 
     override suspend fun conversionMessageChain(chain: MessageChain): MessageChain = chain.map { element ->
-        when (element) {
-            is OfflineGroupImage -> {
-                contact.fixImageFileId(element)
-                element
-            }
-            is FriendImage -> {
-                contact.updateFriendImageForGroupMessage(element)
-            }
-            else -> element
-        }
+        TODO("removed")
     }.toMessageChain()
 
     override suspend fun constructSourceForSpecialMessage(
@@ -404,68 +392,4 @@ internal open class GroupSendMessageHandler(
             originalMessage = finalMessage
         )
     }
-
-    companion object {
-        private suspend fun GroupImpl.fixImageFileId(image: OfflineGroupImage) {
-            if (image.fileId == null) {
-                val response: ImgStore.GroupPicUp.Response = ImgStore.GroupPicUp(
-                    bot.client,
-                    uin = bot.id,
-                    groupCode = this.id,
-                    md5 = image.md5,
-                    size = 1,
-                ).sendAndExpect(bot)
-
-                when (response) {
-                    is ImgStore.GroupPicUp.Response.Failed -> {
-                        image.fileId = 0 // Failed
-                    }
-                    is ImgStore.GroupPicUp.Response.FileExists -> {
-                        image.fileId = response.fileId.toInt()
-                    }
-                    is ImgStore.GroupPicUp.Response.RequireUpload -> {
-                        image.fileId = response.fileId.toInt()
-                    }
-                }
-            }
-        }
-
-        /**
-         * Ensures server holds the cache
-         */
-        private suspend fun GroupImpl.updateFriendImageForGroupMessage(image: FriendImage): OfflineGroupImage {
-            bot.network.run {
-                val response = ImgStore.GroupPicUp(
-                    bot.client,
-                    uin = bot.id,
-                    groupCode = id,
-                    md5 = image.md5,
-                    size = image.size
-                ).sendAndExpect()
-                return OfflineGroupImage(
-                    imageId = image.imageId,
-                    width = image.width,
-                    height = image.height,
-                    size = if (response is ImgStore.GroupPicUp.Response.FileExists) {
-                        response.fileInfo.fileSize
-                    } else {
-                        image.size
-                    },
-                    imageType = image.imageType
-                ).also { img ->
-                    when (response) {
-                        is ImgStore.GroupPicUp.Response.FileExists -> {
-                            img.fileId = response.fileId.toInt()
-                        }
-                        is ImgStore.GroupPicUp.Response.RequireUpload -> {
-                            img.fileId = response.fileId.toInt()
-                        }
-                        is ImgStore.GroupPicUp.Response.Failed -> {
-                            img.fileId = 0
-                        }
-                    }
-                }
-            }
-        }
-    }
 }

+ 2 - 1
mirai-core/src/commonMain/kotlin/contact/util.kt

@@ -33,7 +33,8 @@ internal fun Contact.logMessageSent(message: Message) {
 
 internal fun MessageChain.countImages(): Int = this.count { it is Image }
 
-internal fun MessageChain.verityLength(
+@Throws(MessageTooLargeException::class)
+internal fun MessageChain.checkLength(
     originalMessage: Message, target: Contact,
 ): Int {
     val chain = this

+ 475 - 0
mirai-core/src/commonMain/kotlin/network/message/OutgoingMessagePhasesCommon.kt

@@ -0,0 +1,475 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.network.message
+
+import kotlinx.coroutines.CompletableDeferred
+import kotlinx.coroutines.async
+import kotlinx.coroutines.cancel
+import net.mamoe.mirai.contact.BotIsBeingMutedException
+import net.mamoe.mirai.contact.Group
+import net.mamoe.mirai.contact.MessageTooLargeException
+import net.mamoe.mirai.event.broadcast
+import net.mamoe.mirai.event.events.GroupMessagePostSendEvent
+import net.mamoe.mirai.event.events.GroupMessagePreSendEvent
+import net.mamoe.mirai.event.events.MessagePostSendEvent
+import net.mamoe.mirai.event.events.MessagePreSendEvent
+import net.mamoe.mirai.internal.contact.*
+import net.mamoe.mirai.internal.getMiraiImpl
+import net.mamoe.mirai.internal.message.*
+import net.mamoe.mirai.internal.network.message.MessagePipelineContext.Companion.KEY_CAN_SEND_AS_FRAGMENTED
+import net.mamoe.mirai.internal.network.message.MessagePipelineContext.Companion.KEY_CAN_SEND_AS_LONG
+import net.mamoe.mirai.internal.network.message.MessagePipelineContext.Companion.KEY_CAN_SEND_AS_SIMPLE
+import net.mamoe.mirai.internal.network.message.MessagePipelineContext.Companion.KEY_FINAL_MESSAGE_CHAIN
+import net.mamoe.mirai.internal.network.message.MessagePipelineContext.Companion.KEY_MESSAGE_SOURCE_RESULT
+import net.mamoe.mirai.internal.network.message.MessagePipelineContext.Companion.KEY_ORIGINAL_MESSAGE
+import net.mamoe.mirai.internal.network.pipeline.*
+import net.mamoe.mirai.internal.network.pipeline.PipelineContext.Companion.KEY_EXECUTION_RESULT
+import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
+import net.mamoe.mirai.internal.network.protocol.packet.chat.FileManagement
+import net.mamoe.mirai.internal.network.protocol.packet.chat.MusicSharePacket
+import net.mamoe.mirai.internal.network.protocol.packet.chat.SendMessageResponse
+import net.mamoe.mirai.internal.network.protocol.packet.chat.image.ImgStore
+import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.MessageSvcPbSendMsg
+import net.mamoe.mirai.internal.network.protocol.packet.sendAndExpect
+import net.mamoe.mirai.message.MessageReceipt
+import net.mamoe.mirai.message.data.*
+import net.mamoe.mirai.utils.Either
+import net.mamoe.mirai.utils.Either.Companion.fold
+import net.mamoe.mirai.utils.cast
+
+
+internal typealias MessagePipelineConfigurationBuilder<C> = PipelineConfigurationBuilder<MessagePipelineContext<C>, MessageChain, MessageReceipt<C>>
+
+@Suppress("unused") // provides type C
+internal inline fun <C : AbstractContact> OutgoingMessagePhases<C>.buildPhaseConfiguration(
+    block: MessagePipelineConfigurationBuilder<C>.() -> Node.Finish<MessageReceipt<C>>,
+): PipelineConfiguration<MessagePipelineContext<C>, MessageChain, MessageReceipt<C>> =
+    MessagePipelineConfigurationBuilder<C>().apply { block() }.build()
+
+internal fun main() {
+    OutgoingMessagePhasesGroup.run {
+        buildPhaseConfiguration {
+            Begin then
+                    Preconditions then
+                    MessageToMessageChain then
+                    OutgoingMessagePhasesCommon.BroadcastPreSendEvent(::GroupMessagePreSendEvent) then
+                    CheckLength then
+                    EnsureSequenceIdAvailable then
+                    UploadForwardMessages then
+                    FixGroupImages then
+
+                    Savepoint(1) then
+
+                    ConvertToLongMessage onFailureJumpTo 1 then
+                    StartCreatePackets then
+                    OutgoingMessagePhasesCommon.CreatePacketsForMusicShare(specialMessageSourceStrategy) then
+                    OutgoingMessagePhasesCommon.CreatePacketsForFileMessage(specialMessageSourceStrategy) then
+                    // TODO: 2021/8/15 fallback here
+                    LogMessageSent() then
+                    SendPacketsAndCreateReceipt() onFailureJumpTo 1 then
+
+                    Finish finally
+
+                    OutgoingMessagePhasesCommon.BroadcastPostSendEvent(::GroupMessagePostSendEvent) finally
+                    CloseContext() finally
+                    ThrowExceptions()
+        }
+    }
+}
+
+@DslMarker
+internal annotation class PhaseMarker
+
+@DslMarker
+internal annotation class FinallyMarker
+
+internal interface OutgoingMessagePhases<in C : AbstractContact>
+
+@Suppress("PropertyName", "FunctionName")
+internal abstract class OutgoingMessagePhasesCommon {
+    companion object : OutgoingMessagePhasesCommon()
+
+    @PhaseMarker
+    val Begin = object : Phase<MessagePipelineContext<AbstractContact>, Message, Message>("Begin") {
+        override suspend fun MessagePipelineContextRaw.doPhase(input: Message): Message {
+            return input
+        }
+    }
+
+    @PhaseMarker
+    val Preconditions = object : Phase<MessagePipelineContext<AbstractContact>, Message, Message>("Preconditions") {
+        override suspend fun MessagePipelineContextRaw.doPhase(input: Message): Message {
+            require(!input.isContentEmpty()) { "message is empty" }
+
+            return input
+        }
+    }
+
+    @PhaseMarker
+    val MessageToMessageChain =
+        object : Phase<MessagePipelineContext<AbstractContact>, Message, MessageChain>("MessageToMessageChain") {
+            override suspend fun MessagePipelineContextRaw.doPhase(input: Message): MessageChain {
+                return input.toMessageChain()
+            }
+        }
+
+    @PhaseMarker
+    val CheckLength =
+        object : Phase<MessagePipelineContext<AbstractContact>, MessageChain, MessageChain>("CheckLength") {
+            override suspend fun MessagePipelineContextRaw.doPhase(input: MessageChain): MessageChain {
+                if (input.contains(IgnoreLengthCheck)) return input
+
+                input.takeSingleContent<ForwardMessage>()?.let { forward ->
+                    checkForwardLength(forward)
+                }
+
+                input.checkLength(attributes[KEY_ORIGINAL_MESSAGE], contact)
+
+                return input
+            }
+
+            fun MessagePipelineContextRaw.checkForwardLength(forward: ForwardMessage) {
+                check(forward.nodeList.size <= 200) {
+                    throw MessageTooLargeException(
+                        contact, forward, forward,
+                        "ForwardMessage allows up to 200 nodes, but found ${forward.nodeList.size}"
+                    )
+                }
+            }
+        }
+
+    @PhaseMarker
+    class BroadcastPreSendEvent<C : AbstractContact> @PhaseMarker constructor(
+        private val constructor: (C, Message) -> MessagePreSendEvent
+    ) : Phase<MessagePipelineContext<C>, MessageChain, MessageChain>("BroadcastPreSendEvent") {
+        override suspend fun MessagePipelineContext<C>.doPhase(input: MessageChain): MessageChain {
+            constructor(contact, input).broadcast()
+            return input
+        }
+    }
+
+    class BroadcastPostSendEvent<C : AbstractContact> @PhaseMarker constructor(
+        private val constructor: (C, MessageChain, Throwable?, MessageReceipt<C>?) -> MessagePostSendEvent<in C>
+    ) : Node.Finally<MessagePipelineContext<C>>("BroadcastPreSendEvent") {
+        override suspend fun MessagePipelineContext<C>.doFinally() {
+            val result = attributes[KEY_EXECUTION_RESULT]
+            if (result.isFailure) return
+            val chain = attributes[KEY_FINAL_MESSAGE_CHAIN]
+            constructor(contact, chain, result.exceptionOrNull(), result.getOrNull()?.cast()).broadcast()
+        }
+    }
+
+    @PhaseMarker
+    fun <T> LogMessageSent() = object : Phase<MessagePipelineContext<AbstractContact>, T, T>("LogMessageSent") {
+        override suspend fun MessagePipelineContext<AbstractContact>.doPhase(input: T): T {
+            contact.logMessageSent(attributes[KEY_ORIGINAL_MESSAGE])
+            return input
+        }
+    }
+
+    @PhaseMarker
+    val UploadForwardMessages =
+        object : Phase<MessagePipelineContext<AbstractContact>, MessageChain, MessageChain>("UploadForwardMessages") {
+            override suspend fun MessagePipelineContextRaw.doPhase(input: MessageChain): MessageChain {
+                return input.replaced<ForwardMessage> { uploadForward(it) }
+            }
+
+            private suspend fun MessagePipelineContextRaw.uploadForward(forward: ForwardMessage): ForwardMessageInternal {
+                val resId = getMiraiImpl().uploadMessageHighway(
+                    contact = contact,
+                    nodes = forward.nodeList,
+                    isLong = false,
+                )
+                return RichMessage.forwardMessage(
+                    resId = resId,
+                    timeSeconds = time.currentTimeSeconds(),
+                    forwardMessage = forward,
+                )
+            }
+        }
+
+    @PhaseMarker
+    val FixGroupImages =
+        object : Phase<MessagePipelineContext<GroupImpl>, MessageChain, MessageChain>("FixGroupImages") {
+            override suspend fun MessagePipelineContext<GroupImpl>.doPhase(input: MessageChain): MessageChain {
+                input.forEach {
+                    if (it is OfflineGroupImage) contact.fixImageFileId(it)
+                }
+                input.replaced<FriendImage> {
+                    contact.updateFriendImageForGroupMessage(it)
+                }
+                return input
+            }
+
+            suspend fun GroupImpl.fixImageFileId(image: OfflineGroupImage) {
+                if (image.fileId == null) {
+                    val response: ImgStore.GroupPicUp.Response = ImgStore.GroupPicUp(
+                        bot.client,
+                        uin = bot.id,
+                        groupCode = this.id,
+                        md5 = image.md5,
+                        size = 1,
+                    ).sendAndExpect(bot)
+
+                    when (response) {
+                        is ImgStore.GroupPicUp.Response.Failed -> {
+                            image.fileId = 0 // Failed
+                        }
+                        is ImgStore.GroupPicUp.Response.FileExists -> {
+                            image.fileId = response.fileId.toInt()
+                        }
+                        is ImgStore.GroupPicUp.Response.RequireUpload -> {
+                            image.fileId = response.fileId.toInt()
+                        }
+                    }
+                }
+            }
+
+            /**
+             * Ensures server holds the cache
+             */
+            suspend fun GroupImpl.updateFriendImageForGroupMessage(image: FriendImage): OfflineGroupImage {
+                val response = ImgStore.GroupPicUp(
+                    bot.client,
+                    uin = bot.id,
+                    groupCode = id,
+                    md5 = image.md5,
+                    size = if (image is OnlineFriendImageImpl) image.delegate.fileLen else 0
+                ).sendAndExpect(bot.network)
+                return OfflineGroupImage(image.imageId).also { img ->
+                    when (response) {
+                        is ImgStore.GroupPicUp.Response.FileExists -> {
+                            img.fileId = response.fileId.toInt()
+                        }
+                        is ImgStore.GroupPicUp.Response.RequireUpload -> {
+                            img.fileId = response.fileId.toInt()
+                        }
+                        is ImgStore.GroupPicUp.Response.Failed -> {
+                            img.fileId = 0
+                        }
+                    }
+                }
+            }
+        }
+
+    @PhaseMarker
+    val EnsureSequenceIdAvailable =
+        object :
+            Phase<MessagePipelineContext<AbstractContact>, MessageChain, MessageChain>("EnsureSequenceIdAvailable") {
+            override suspend fun MessagePipelineContextRaw.doPhase(input: MessageChain): MessageChain {
+                input.findIsInstance<QuoteReply>()?.source?.ensureSequenceIdAvailable()
+                return input
+            }
+        }
+
+    @PhaseMarker
+    val ConvertToLongMessage =
+        object : Phase<MessagePipelineContext<AbstractContact>, MessageChain, MessageChain>("ConvertToLongMessage") {
+            override suspend fun MessagePipelineContextRaw.doPhase(input: MessageChain): MessageChain {
+                if (ForceAsLongMessage in input) {
+                    return convertToLongMessageImpl(input)
+                }
+
+                when {
+                    attributes[KEY_CAN_SEND_AS_SIMPLE] -> { // fastest
+                        attributes[KEY_CAN_SEND_AS_SIMPLE] = false
+                        return input
+                    }
+                    attributes[KEY_CAN_SEND_AS_LONG] && DontAsLongMessage !in input -> {
+                        attributes[KEY_CAN_SEND_AS_LONG] = false
+                        return convertToLongMessageImpl(input)
+                    }
+                    attributes[KEY_CAN_SEND_AS_FRAGMENTED] -> { // slowest
+                        attributes[KEY_CAN_SEND_AS_FRAGMENTED] = false
+                        return input
+                    }
+                    else -> {
+                        error("Failed to send message: all strategies tried out.")
+                    }
+                }
+            }
+
+            suspend fun MessagePipelineContextRaw.convertToLongMessageImpl(chain: MessageChain): MessageChain {
+                val resId = uploadLongMessageHighway(chain)
+                return chain + RichMessage.longMessage(
+                    brief = chain.takeContent(27),
+                    resId = resId,
+                    timeSeconds = time.currentTimeSeconds()
+                ) // LongMessageInternal replaces all contents and preserves metadata
+            }
+
+            suspend fun MessagePipelineContextRaw.uploadLongMessageHighway(
+                chain: MessageChain,
+            ): String {
+                return getMiraiImpl().uploadMessageHighway(
+                    contact,
+                    listOf(
+                        ForwardMessage.Node(
+                            senderId = bot.id,
+                            time = time.currentTimeSeconds().toInt(),
+                            messageChain = chain,
+                            senderName = bot.nick
+                        )
+                    ),
+                    true
+                )
+            }
+        }
+
+    @PhaseMarker
+    val StartCreatePackets =
+        object :
+            Phase<MessagePipelineContext<AbstractContact>, MessageChain, List<OutgoingPacket>?>("CreateMessagePackets") {
+            override suspend fun MessagePipelineContext<AbstractContact>.doPhase(input: MessageChain): List<OutgoingPacket>? {
+                attributes[KEY_FINAL_MESSAGE_CHAIN] = input
+                return null
+            }
+        }
+
+    abstract class CreatePacketsPhase<in C : AbstractContact>(
+        name: String
+    ) : Phase<MessagePipelineContext<C>, List<OutgoingPacket>?, List<OutgoingPacket>?>(name) {
+        override suspend fun MessagePipelineContext<C>.doPhase(input: List<OutgoingPacket>?): List<OutgoingPacket>? {
+            return doPhase(attributes[KEY_FINAL_MESSAGE_CHAIN])
+        }
+
+        protected abstract suspend fun MessagePipelineContext<C>.doPhase(chain: MessageChain): List<OutgoingPacket>?
+    }
+
+    @PhaseMarker
+    class CreatePacketsForMusicShare<in C : AbstractContact> @PhaseMarker constructor(
+        private val specialMessageSourceStrategy: SpecialMessageSourceStrategy<C>
+    ) : CreatePacketsPhase<C>("CreatePacketsForMusicShare") {
+        override suspend fun MessagePipelineContext<C>.doPhase(chain: MessageChain): List<OutgoingPacket>? {
+            val musicShare = chain[MusicShare] ?: return null
+            attributes[KEY_MESSAGE_SOURCE_RESULT] =
+                CompletableDeferred(specialMessageSourceStrategy.constructSourceForSpecialMessage(context, chain, 3116))
+            return listOf(
+                MusicSharePacket(
+                    bot.client, musicShare, contact.id,
+                    targetKind = if (contact is GroupImpl) MessageSourceKind.GROUP else MessageSourceKind.FRIEND // always FRIEND
+                )
+            )
+        }
+    }
+
+    interface SpecialMessageSourceStrategy<in C : AbstractContact> {
+        suspend fun constructSourceForSpecialMessage(
+            context: MessagePipelineContext<C>,
+            finalMessage: MessageChain,
+            fromAppId: Int,
+        ): OnlineMessageSource.Outgoing
+    }
+
+    @PhaseMarker
+    class CreatePacketsForFileMessage<in C : AbstractContact> @PhaseMarker constructor(
+        private val specialMessageSourceStrategy: SpecialMessageSourceStrategy<C>
+    ) : CreatePacketsPhase<C>("CreatePacketsForFileMessage") {
+        override suspend fun MessagePipelineContext<C>.doPhase(chain: MessageChain): List<OutgoingPacket>? {
+            val file = chain[FileMessage] ?: return null
+            file.checkIsImpl()
+            attributes[KEY_MESSAGE_SOURCE_RESULT] =
+                contact.async { specialMessageSourceStrategy.constructSourceForSpecialMessage(context, chain, 2021) }
+            return listOf(FileManagement.Feed(bot.client, contact.id, file.busId, file.id))
+        }
+    }
+
+    @PhaseMarker
+    abstract class CreatePacketsFallback<C : AbstractContact> : CreatePacketsPhase<C>("CreatePacketsFallback")
+
+    @PhaseMarker
+    fun <C : AbstractContact> SendPacketsAndCreateReceipt() =
+        object :
+            Phase<MessagePipelineContext<C>, List<OutgoingPacket>?, MessageReceipt<C>>("EnsureSequenceIdAvailable") {
+            override suspend fun MessagePipelineContext<C>.doPhase(input: List<OutgoingPacket>?): MessageReceipt<C> {
+                checkNotNull(input) { "Internal error: packets are null." }
+                val finalMessage = attributes[KEY_FINAL_MESSAGE_CHAIN]
+
+                for (packet in input) {
+                    val resp = packet.sendAndExpect<SendMessageResponse>(bot)
+                    when (resp) {
+                        is MusicSharePacket.Response -> {
+                            resp.pkg.checkSuccess("send music share")
+                        }
+                        is MessageSvcPbSendMsg.Response.MessageTooLarge -> {
+                            throw MessageTooLargeException(
+                                contact, attributes[KEY_ORIGINAL_MESSAGE], finalMessage,
+                                "Message '${finalMessage.content.take(10)}' is too large."
+                            )
+                        }
+                        is MessageSvcPbSendMsg.Response.Failed -> {
+                            val contact = contact
+                            when (resp.errorCode) {
+                                120 -> if (contact is Group) throw BotIsBeingMutedException(contact)
+                            }
+                            error("Send message failed: $resp")
+                        }
+                        is MessageSvcPbSendMsg.Response.SUCCESS -> {
+                        }
+                    }
+                }
+
+                val sourceAwait = attributes[KEY_MESSAGE_SOURCE_RESULT].await()
+
+                try {
+                    sourceAwait.ensureSequenceIdAvailable()
+                } catch (e: Exception) {
+                    logger.warning(
+                        "Timeout awaiting sequenceId for message(${finalMessage.content.take(10)}). Some features may not work properly",
+                        e
+                    )
+                }
+
+                return sourceAwait.createMessageReceipt(contact, true)
+            }
+
+        }
+
+    @PhaseMarker
+    fun <Ctx : MessagePipelineContext<AbstractContact>> CloseContext() = object : Node.Finally<Ctx>("CloseContext") {
+        override suspend fun Ctx.doFinally() {
+            cancel() // coroutine scope
+        }
+    }
+
+    @PhaseMarker
+    fun <Ctx : MessagePipelineContext<AbstractContact>> ThrowExceptions() =
+        object : Node.Finally<Ctx>("ThrowExceptions") {
+            override suspend fun Ctx.doFinally() {
+                attributes[KEY_EXECUTION_RESULT].onFailure {
+                    exceptionCollector.collectThrow(it)
+                }
+            }
+        }
+}
+
+@OverloadResolutionByLambdaReturnType
+internal inline fun <reified T : SingleMessage> MessageChain.replaced(
+    replacer: (message: T) -> Either<SingleMessage, Iterable<SingleMessage>>
+): MessageChain {
+    if (!this.anyIsInstance<T>()) return this
+
+    return buildMessageChain(this.size) {
+        for (singleMessage in this@replaced) {
+            if (singleMessage is T) {
+                replacer(singleMessage).fold(
+                    onLeft = { add(it) },
+                    onRight = { addAll(it) }
+                )
+            }
+        }
+    }
+}
+
+@OverloadResolutionByLambdaReturnType
+@JvmName("replaced1")
+internal inline fun <reified T : SingleMessage> MessageChain.replaced(
+    replacer: (message: T) -> SingleMessage
+): MessageChain = replaced<T> { Either(replacer(it)) }

+ 43 - 0
mirai-core/src/commonMain/kotlin/network/message/OutgoingMessagePhasesGroup.kt

@@ -0,0 +1,43 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.network.message
+
+import net.mamoe.mirai.event.nextEventOrNull
+import net.mamoe.mirai.internal.contact.GroupImpl
+import net.mamoe.mirai.internal.message.OnlineMessageSourceToGroupImpl
+import net.mamoe.mirai.internal.network.notice.group.GroupMessageProcessor
+import net.mamoe.mirai.message.data.MessageChain
+import net.mamoe.mirai.message.data.OnlineMessageSource
+import net.mamoe.mirai.utils.currentTimeSeconds
+
+internal object OutgoingMessagePhasesGroup : OutgoingMessagePhasesCommon(), OutgoingMessagePhases<GroupImpl> {
+    val specialMessageSourceStrategy: SpecialMessageSourceStrategy<GroupImpl> =
+        object : SpecialMessageSourceStrategy<GroupImpl> {
+            override suspend fun constructSourceForSpecialMessage(
+                context: MessagePipelineContext<GroupImpl>,
+                finalMessage: MessageChain,
+                fromAppId: Int
+            ): OnlineMessageSource.Outgoing = context.run {
+                val receipt: GroupMessageProcessor.SendGroupMessageReceipt =
+                    nextEventOrNull(3000) { it.fromAppId == fromAppId }
+                        ?: GroupMessageProcessor.SendGroupMessageReceipt.EMPTY
+
+                return OnlineMessageSourceToGroupImpl(
+                    contact,
+                    internalIds = intArrayOf(receipt.messageRandom),
+                    providedSequenceIds = intArrayOf(receipt.sequenceId),
+                    sender = bot,
+                    target = contact,
+                    time = currentTimeSeconds().toInt(),
+                    originalMessage = finalMessage
+                )
+            }
+        }
+}

+ 107 - 0
mirai-core/src/commonMain/kotlin/network/message/OutgoingMessagePipeline.kt

@@ -0,0 +1,107 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.network.message
+
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Deferred
+import net.mamoe.mirai.contact.Contact
+import net.mamoe.mirai.internal.contact.AbstractContact
+import net.mamoe.mirai.internal.contact.broadcastMessagePreSendEvent
+import net.mamoe.mirai.internal.network.component.ComponentKey
+import net.mamoe.mirai.internal.network.message.MessagePipelineContext.Companion.KEY_CAN_SEND_AS_FRAGMENTED
+import net.mamoe.mirai.internal.network.message.MessagePipelineContext.Companion.KEY_CAN_SEND_AS_LONG
+import net.mamoe.mirai.internal.network.message.MessagePipelineContext.Companion.KEY_CAN_SEND_AS_SIMPLE
+import net.mamoe.mirai.internal.network.notice.BotAware
+import net.mamoe.mirai.internal.network.pipeline.PipelineConfiguration
+import net.mamoe.mirai.internal.network.pipeline.PipelineContext
+import net.mamoe.mirai.internal.utils.subLogger
+import net.mamoe.mirai.message.MessageReceipt
+import net.mamoe.mirai.message.data.MessageChain
+import net.mamoe.mirai.message.data.OnlineMessageSource
+import net.mamoe.mirai.utils.*
+import kotlin.coroutines.CoroutineContext
+import kotlin.reflect.KClass
+
+/**
+ * Steps:
+ *
+ * 1. Preconditions - Check length
+ * 2. PreSendEvent - [Contact.broadcastMessagePreSendEvent]
+ * 3. Logging - Log sent
+ * 4. Pre transformation
+ * 5. Transform special elements
+ * 6. Fix images
+ * 7. Convert to long message
+ * 8. Ensure sequence id available
+ * 9. Post transformation
+ * 10. Send packet
+ *
+ * @since 2.8-M1
+ */
+internal interface OutgoingMessagePipeline {
+    suspend fun <C : AbstractContact> sendMessage(contact: C, message: MessageChain): MessageReceipt<C>
+
+    companion object : ComponentKey<OutgoingMessagePipeline>
+}
+
+internal class OutgoingMessagePipelineImpl(
+    private val pipelineConfigurations: Map<KClass<out AbstractContact>, MessagePipelineConfiguration<out AbstractContact>>, // must be exhaustive ---- covering all AbstractContact
+) : OutgoingMessagePipeline {
+    override suspend fun <C : AbstractContact> sendMessage(contact: C, message: MessageChain): MessageReceipt<C> {
+        val context = MessagePipelineContextImpl<AbstractContact>(contact)
+        return pipelineConfigurations[contact::class]?.execute(context, message)?.cast()
+            ?: error("Internal error: Could")
+    }
+}
+
+internal typealias MessagePipelineConfiguration<T> = PipelineConfiguration<MessagePipelineContext<T>, MessageChain, MessageReceipt<T>>
+
+internal interface MessagePipelineContext<out C : AbstractContact> : PipelineContext, BotAware, CoroutineScope {
+    val contact: C
+    val time: TimeSource
+
+    override val bot get() = contact.bot
+
+    companion object {
+        @JvmField
+        val KEY_CAN_SEND_AS_SIMPLE = TypeKey<Boolean>("canSendAsSimple")
+
+        @JvmField
+        val KEY_CAN_SEND_AS_LONG = TypeKey<Boolean>("canSendAsLong")
+
+        @JvmField
+        val KEY_CAN_SEND_AS_FRAGMENTED = TypeKey<Boolean>("canSendAsFragmented")
+
+        @JvmField
+        val KEY_ORIGINAL_MESSAGE = TypeKey<MessageChain>("originalMessage")
+
+        @JvmField
+        val KEY_FINAL_MESSAGE_CHAIN = TypeKey<MessageChain>("finalMessageChain")
+
+        @JvmField
+        val KEY_MESSAGE_SOURCE_RESULT = TypeKey<Deferred<OnlineMessageSource.Outgoing>>("messageSourceResult")
+    }
+}
+
+internal typealias MessagePipelineContextRaw = MessagePipelineContext<AbstractContact>
+
+internal class MessagePipelineContextImpl<out C : AbstractContact>(
+    override val contact: C,
+    override val coroutineContext: CoroutineContext = contact.coroutineContext.childScopeContext()
+        .addNameHierarchically("MessagePipelineContext"),
+    override val logger: MiraiLogger = contact.bot.logger.subLogger("MessagePipelineContext"), // TODO: 2021/8/15 use contact's logger
+    override val attributes: MutableTypeSafeMap = buildTypeSafeMap {
+        set(KEY_CAN_SEND_AS_FRAGMENTED, true)
+        set(KEY_CAN_SEND_AS_LONG, true)
+        set(KEY_CAN_SEND_AS_SIMPLE, true)
+    },
+    override val exceptionCollector: ExceptionCollector = ExceptionCollector(),
+    override val time: TimeSource = TimeSource.System
+) : MessagePipelineContext<C>

+ 67 - 0
mirai-core/src/commonMain/kotlin/network/pipeline/Node.kt

@@ -0,0 +1,67 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.network.pipeline
+
+import net.mamoe.mirai.internal.network.pipeline.Node.*
+import net.mamoe.mirai.utils.recoverCatchingSuppressed
+
+/**
+ * @see Finish
+ * @see Finally
+ * @see SavePoint
+ * @see JumpToSavepointOnFailure
+ * @see Phase
+ */
+internal sealed class Node<in C : PipelineContext, in In, out Out>(
+    val name: String
+) {
+    class Finish<FinalOut>(name: String = "Finish") : Node<PipelineContext, FinalOut, FinalOut>(name)
+    abstract class Finally<C : PipelineContext>(name: String) : Node<C, Any?, Nothing>(name) {
+        abstract suspend fun C.doFinally()
+    }
+
+    internal class SavePoint<C : PipelineContext, T>(val id: Any) : Node<C, T, T>("Savepoint $id")
+
+    internal class JumpToSavepointOnFailure<C : PipelineContext, AIn, AOut>(
+        val delegate: Phase<C, AIn, AOut>,
+        val targetSavepointId: Any,
+    ) : Node<C, AIn, AOut>(delegate.name)
+}
+
+/**
+ * Runnable [Node]
+ */
+internal abstract class Phase<in C : PipelineContext, in In, out Out>(
+    name: String
+) : Node<C, In, Out>(name) {
+    abstract suspend fun C.doPhase(input: In): Out
+}
+
+internal suspend inline fun <C : PipelineContext, In, Out> Phase<C, In, Out>.doPhase(
+    context: C,
+    input: In
+): Out {
+    return context.run { doPhase(input) }
+}
+
+internal class RecoverablePhase<C : PipelineContext, AIn, AOut>(
+    val delegate: Phase<C, AIn, AOut>,
+    val onFailure: Array<Phase<C, AIn, AOut>>,
+) : Phase<C, AIn, AOut>(delegate.name) {
+    override suspend fun C.doPhase(input: AIn): AOut {
+        val context = this
+
+        return onFailure.fold(kotlin.runCatching {
+            delegate.doPhase(context, input)
+        }) { acc, phase ->
+            acc.recoverCatchingSuppressed { phase.doPhase(context, input) }
+        }.getOrThrow()
+    }
+}

+ 32 - 0
mirai-core/src/commonMain/kotlin/network/pipeline/PhaseConfiguration.kt

@@ -0,0 +1,32 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.network.pipeline
+
+internal class PhaseConfiguration<C : PipelineContext, AIn, AOut>(
+    private val phase: Phase<C, AIn, AOut>
+) {
+    private val recovers: MutableList<Phase<C, AIn, AOut>> = mutableListOf()
+
+    @PhaseConfigurationDsl
+    fun onFailure(phase: Phase<C, AIn, AOut>): Nothing? {
+        recovers.add(phase)
+        return null
+    }
+
+    @PhaseConfigurationDsl
+    fun onFailureJumpTo(id: Any): Node.JumpToSavepointOnFailure<C, AIn, AOut> { // savepoint id
+        return Node.JumpToSavepointOnFailure(phase, id)
+    }
+
+    fun toPhase(): Phase<C, AIn, AOut> = RecoverablePhase(phase, recovers.toTypedArray())
+}
+
+@DslMarker
+internal annotation class PhaseConfigurationDsl

+ 66 - 0
mirai-core/src/commonMain/kotlin/network/pipeline/PipelineConfiguration.kt

@@ -0,0 +1,66 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.network.pipeline
+
+import net.mamoe.mirai.utils.cast
+import net.mamoe.mirai.utils.forEachWithIndexer
+import net.mamoe.mirai.utils.uncheckedCast
+
+internal class PipelineConfiguration<C : PipelineContext, InitialIn, FinalOut> {
+    private val _nodes: ArrayList<Node<C, *, *>> = ArrayList() // must be ordered
+    val nodes: List<Node<C, *, *>> get() = _nodes
+
+    fun addNode(node: Node<C, *, *>) {
+        _nodes.add(node)
+    }
+
+    suspend fun execute(context: C, initialIn: InitialIn): FinalOut {
+        var value: Any? = initialIn
+
+        /**
+         * Run [Node.Finally]s and throw [e] with [PipelineContext.exceptionCollector].
+         */
+        suspend fun fail(e: Throwable): Nothing {
+            _nodes.forEach { node ->
+                if (node is Node.Finally) {
+                    node.run { context.doFinally() }
+                }
+            }
+            context.exceptionCollector.collectThrow(e)
+        }
+
+        _nodes.forEachWithIndexer { node ->
+            context.attributes[PipelineContext.KEY_EXECUTION_RESULT] = Result.success(value)
+            when (node) {
+                is Phase<*, *, *> -> {
+                    try {
+                        value = node.cast<Phase<C, Any?, Any?>>().doPhase(context, value)
+                    } catch (e: Throwable) {
+                        fail(e)
+                    }
+                }
+                is Node.Finish -> return value.uncheckedCast()
+                is Node.SavePoint -> {
+                    // nothing to do
+                }
+                is Node.Finally -> node.run { context.doFinally() }
+                is Node.JumpToSavepointOnFailure -> {
+                    try {
+                        node.delegate.cast<Phase<C, Any?, Any?>>().doPhase(context, value)
+                    } catch (e: Throwable) {
+                        context.exceptionCollector.collect(e)
+                        setNextIndex(_nodes.indexOfFirst { it is Node.SavePoint && it.id == node.targetSavepointId })
+                    }
+                }
+            }
+        }
+        error("There is no finishing phase.")
+    }
+}

+ 74 - 0
mirai-core/src/commonMain/kotlin/network/pipeline/PipelineConfigurationBuilder.kt

@@ -0,0 +1,74 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.network.pipeline
+
+import net.mamoe.mirai.internal.network.message.buildPhaseConfiguration
+
+@DslMarker
+internal annotation class CriticalPointMarker
+
+/**
+ * @see buildPhaseConfiguration
+ */
+internal class PipelineConfigurationBuilder<C : PipelineContext, InitialIn, FinalOut>(
+    val configuration: PipelineConfiguration<C, InitialIn, FinalOut> = PipelineConfiguration()
+) {
+
+    @Suppress("PropertyName")
+    @CriticalPointMarker
+    val Finish = Node.Finish<FinalOut>()
+
+
+    @Suppress("FunctionName")
+    @CriticalPointMarker
+    fun <T> Savepoint(id: Any): Node.SavePoint<C, T> {
+        require(configuration.nodes.none { it is Node.SavePoint<*, *> && it.id == id }) {
+            "There is already a savepoint with id '$id'."
+        }
+        Node.SavePoint<C, T>(id).let {
+            configuration.addNode(it)
+            return it
+        }
+    }
+
+    infix fun <AIn, AOut, BOut, Next : Node<C, AOut, BOut>> Node<C, AIn, AOut>.then(next: Next): Next {
+        configuration.addNode(this)
+        return next
+    }
+
+    infix fun <FinalOut> Node.Finish<FinalOut>.finally(finally: Node.Finally<C>): Node.Finish<FinalOut> {
+        configuration.addNode(finally)
+        return this
+    }
+
+    @BuilderInference
+    inline operator fun <AIn, AOut> Phase<C, AIn, AOut>.invoke(@BuilderInference action: PhaseConfiguration<C, AIn, AOut>.() -> Node<C, AIn, AOut>?): Node<C, AIn, AOut> {
+        PhaseConfiguration(this).run {
+            action()?.let { return it }
+            return toPhase()
+        }
+    }
+
+    /**
+     * Fast path for [PhaseConfiguration.onFailureJumpTo]
+     */
+    @PhaseConfigurationDsl
+    infix fun <AIn, AOut> Phase<C, AIn, AOut>.onFailureJumpTo(id: Any): Node.JumpToSavepointOnFailure<C, AIn, AOut> { // savepoint id
+        return Node.JumpToSavepointOnFailure(this, id)
+    }
+
+
+    fun build() = configuration
+}
+
+internal inline fun <C : PipelineContext, InitialIn, FinalOut> buildPhaseConfiguration(
+    block: PipelineConfigurationBuilder<C, InitialIn, FinalOut>.() -> Node.Finish<FinalOut>,
+): PipelineConfiguration<C, InitialIn, FinalOut> =
+    PipelineConfigurationBuilder<C, InitialIn, FinalOut>().apply { block() }.build()

+ 31 - 0
mirai-core/src/commonMain/kotlin/network/pipeline/PipelineContext.kt

@@ -0,0 +1,31 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.network.pipeline
+
+import net.mamoe.mirai.utils.ExceptionCollector
+import net.mamoe.mirai.utils.MiraiLogger
+import net.mamoe.mirai.utils.MutableTypeSafeMap
+import net.mamoe.mirai.utils.TypeKey
+
+internal interface PipelineContext {
+    val logger: MiraiLogger
+    val attributes: MutableTypeSafeMap
+    val exceptionCollector: ExceptionCollector
+
+    companion object {
+        /**
+         * For final phases
+         */
+        @JvmField
+        val KEY_EXECUTION_RESULT = TypeKey<Result<Any?>>("executionResult")
+    }
+}
+
+internal inline val <T : PipelineContext> T.context get() = this

+ 18 - 16
mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/MultiMsg.kt

@@ -13,7 +13,9 @@ package net.mamoe.mirai.internal.network.protocol.packet.chat
 
 import kotlinx.io.core.ByteReadPacket
 import net.mamoe.mirai.internal.QQAndroidBot
-import net.mamoe.mirai.internal.contact.SendMessageHandler
+import net.mamoe.mirai.internal.contact.AbstractContact
+import net.mamoe.mirai.internal.contact.groupCodeOrNull
+import net.mamoe.mirai.internal.contact.userIdOrNull
 import net.mamoe.mirai.internal.message.MessageSourceInternal
 import net.mamoe.mirai.internal.message.contextualBugReportException
 import net.mamoe.mirai.internal.message.toRichTextElems
@@ -49,7 +51,7 @@ internal class MessageValidationData(
 internal fun Collection<ForwardMessage.INode>.calculateValidationData(
     client: QQAndroidClient,
     random: Int,
-    handler: SendMessageHandler<*>,
+    contact: AbstractContact,
     isLong: Boolean,
 ): MessageValidationData {
     val offeredSourceIds = mutableSetOf<Int>()
@@ -66,32 +68,32 @@ internal fun Collection<ForwardMessage.INode>.calculateValidationData(
         return client.atomicNextMessageSequenceId()
     }
 
-    val msgList = map { chain ->
+    val msgList = map { node ->
         MsgComm.Msg(
             msgHead = MsgComm.MsgHead(
-                fromUin = chain.senderId,
-                toUin = if (isLong) {
-                    handler.targetUserUin ?: 0
-                } else 0,
-                msgSeq = calculateMsgSeq(chain),
-                msgTime = chain.time,
+                fromUin = node.senderId,
+                toUin = if (isLong) contact.userIdOrNull ?: 0 else 0,
+                msgSeq = calculateMsgSeq(node),
+                msgTime = node.time,
                 msgUid = 0x01000000000000000L or random.toLongUnsigned(),
                 mutiltransHead = MsgComm.MutilTransHead(
                     status = 0,
                     msgId = 1
                 ),
                 msgType = 82, // troop
-                groupInfo = handler.run { chain.groupInfo },
+                groupInfo = MsgComm.GroupInfo(
+                    groupCode = contact.groupCodeOrNull ?: 0,
+                    groupCard = node.senderName
+                ),
                 isSrcMsg = false
             ),
             msgBody = ImMsgBody.MsgBody(
                 richText = ImMsgBody.RichText(
-                    elems = chain.messageChain.toMessageChain()
-                        .toRichTextElems(
-                            handler.contact,
-                            withGeneralFlags = false,
-                            isForward = true,
-                        ).toMutableList()
+                    elems = node.messageChain.toMessageChain().toRichTextElems(
+                        contact,
+                        withGeneralFlags = false,
+                        isForward = true,
+                    ).toMutableList()
                 )
             )
         )

+ 2 - 2
mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/MusicSharePacket.kt

@@ -30,8 +30,8 @@ internal object MusicSharePacket :
 
     class Response(
         val pkg: OidbSso.OIDBSSOPkg,
-    ) : Packet {
-        val response by lazy {
+    ) : Packet, SendMessageResponse {
+        val response: OidbCmd0xb77.RspBody by lazy {
             pkg.bodybuffer.loadAs(OidbCmd0xb77.RspBody.serializer())
         }
 

+ 14 - 0
mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/SendMessageResponse.kt

@@ -0,0 +1,14 @@
+/*
+ * Copyright 2019-2021 Mamoe Technologies and contributors.
+ *
+ * 此源代码的使用受 GNU AFFERO GENERAL PUBLIC LICENSE version 3 许可证的约束, 可以在以下链接找到该许可证.
+ * Use of this source code is governed by the GNU AGPLv3 license that can be found through the following link.
+ *
+ * https://github.com/mamoe/mirai/blob/dev/LICENSE
+ */
+
+package net.mamoe.mirai.internal.network.protocol.packet.chat
+
+import net.mamoe.mirai.internal.network.Packet
+
+internal interface SendMessageResponse : Packet

+ 2 - 1
mirai-core/src/commonMain/kotlin/network/protocol/packet/chat/receive/MessageSvc.PbSendMsg.kt

@@ -33,6 +33,7 @@ import net.mamoe.mirai.internal.network.protocol.data.proto.MsgSvc
 import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacket
 import net.mamoe.mirai.internal.network.protocol.packet.OutgoingPacketFactory
 import net.mamoe.mirai.internal.network.protocol.packet.buildOutgoingUniPacket
+import net.mamoe.mirai.internal.network.protocol.packet.chat.SendMessageResponse
 import net.mamoe.mirai.internal.utils.io.serialization.readProtoBuf
 import net.mamoe.mirai.internal.utils.io.serialization.writeProtoBuf
 import net.mamoe.mirai.message.data.*
@@ -44,7 +45,7 @@ import kotlin.math.absoluteValue
 import kotlin.random.Random
 
 internal object MessageSvcPbSendMsg : OutgoingPacketFactory<MessageSvcPbSendMsg.Response>("MessageSvc.PbSendMsg") {
-    sealed class Response : Packet {
+    sealed class Response : Packet, SendMessageResponse {
         object SUCCESS : Response() {
             override fun toString(): String = "MessageSvcPbSendMsg.Response.SUCCESS"
         }

+ 2 - 4
mirai-core/src/jvmTest/kotlin/message/data/MessageReceiptTest.kt

@@ -11,7 +11,6 @@ package net.mamoe.mirai.internal.message.data
 
 import kotlinx.coroutines.CompletableDeferred
 import kotlinx.coroutines.Deferred
-import net.mamoe.mirai.Bot
 import net.mamoe.mirai.internal.AbstractTestWithMiraiImpl
 import net.mamoe.mirai.internal.MockBot
 import net.mamoe.mirai.internal.contact.*
@@ -31,9 +30,8 @@ import kotlin.test.assertSame
 
 internal class MessageReceiptTest : AbstractTestWithMiraiImpl() {
     override suspend fun uploadMessageHighway(
-        bot: Bot,
-        sendMessageHandler: SendMessageHandler<*>,
-        message: Collection<ForwardMessage.INode>,
+        contact: AbstractContact,
+        nodes: Collection<ForwardMessage.INode>,
         isLong: Boolean,
     ): String {
         return "id"