|
@@ -30,6 +30,7 @@ import net.mamoe.mirai.internal.network.protocol.packet.chat.receive.OnlinePushP
|
|
import net.mamoe.mirai.internal.network.toPacket
|
|
import net.mamoe.mirai.internal.network.toPacket
|
|
import net.mamoe.mirai.internal.utils.io.ProtocolStruct
|
|
import net.mamoe.mirai.internal.utils.io.ProtocolStruct
|
|
import net.mamoe.mirai.utils.*
|
|
import net.mamoe.mirai.utils.*
|
|
|
|
+import java.io.Closeable
|
|
import java.util.*
|
|
import java.util.*
|
|
import java.util.concurrent.ConcurrentLinkedQueue
|
|
import java.util.concurrent.ConcurrentLinkedQueue
|
|
import kotlin.reflect.KClass
|
|
import kotlin.reflect.KClass
|
|
@@ -40,7 +41,15 @@ internal typealias ProcessResult = Collection<Packet>
|
|
* Centralized processor pipeline for [MessageSvcPbGetMsg] and [OnlinePushPbPushTransMsg]
|
|
* Centralized processor pipeline for [MessageSvcPbGetMsg] and [OnlinePushPbPushTransMsg]
|
|
*/
|
|
*/
|
|
internal interface NoticeProcessorPipeline {
|
|
internal interface NoticeProcessorPipeline {
|
|
- fun registerProcessor(processor: NoticeProcessor)
|
|
|
|
|
|
+ fun interface DisposableRegistry : Closeable {
|
|
|
|
+ fun dispose()
|
|
|
|
+
|
|
|
|
+ override fun close() {
|
|
|
|
+ dispose()
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fun registerProcessor(processor: NoticeProcessor): DisposableRegistry
|
|
|
|
|
|
/**
|
|
/**
|
|
* Process [data] into [Packet]s. Exceptions are wrapped into [ParseErrorPacket]
|
|
* Process [data] into [Packet]s. Exceptions are wrapped into [ParseErrorPacket]
|
|
@@ -69,7 +78,7 @@ internal value class MutableProcessResult(
|
|
val data: MutableCollection<Packet>
|
|
val data: MutableCollection<Packet>
|
|
)
|
|
)
|
|
|
|
|
|
-internal interface PipelineContext : BotAware {
|
|
|
|
|
|
+internal interface NoticePipelineContext : BotAware {
|
|
override val bot: QQAndroidBot
|
|
override val bot: QQAndroidBot
|
|
|
|
|
|
val attributes: TypeSafeMap
|
|
val attributes: TypeSafeMap
|
|
@@ -128,28 +137,32 @@ internal interface PipelineContext : BotAware {
|
|
val KEY_FROM_SYNC = TypeKey<Boolean>("fromSync")
|
|
val KEY_FROM_SYNC = TypeKey<Boolean>("fromSync")
|
|
val KEY_MSG_INFO = TypeKey<MsgInfo>("msgInfo")
|
|
val KEY_MSG_INFO = TypeKey<MsgInfo>("msgInfo")
|
|
|
|
|
|
- val PipelineContext.fromSync get() = attributes[KEY_FROM_SYNC]
|
|
|
|
|
|
+ val NoticePipelineContext.fromSync get() = attributes[KEY_FROM_SYNC]
|
|
|
|
|
|
/**
|
|
/**
|
|
* 来自 [MsgInfo] 的数据, 即 [MsgType0x210], [MsgType0x2DC] 的处理过程之中可以使用
|
|
* 来自 [MsgInfo] 的数据, 即 [MsgType0x210], [MsgType0x2DC] 的处理过程之中可以使用
|
|
*/
|
|
*/
|
|
- val PipelineContext.msgInfo get() = attributes[KEY_MSG_INFO]
|
|
|
|
|
|
+ val NoticePipelineContext.msgInfo get() = attributes[KEY_MSG_INFO]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-internal abstract class AbstractPipelineContext(
|
|
|
|
|
|
+internal abstract class AbstractNoticePipelineContext(
|
|
override val bot: QQAndroidBot, override val attributes: TypeSafeMap,
|
|
override val bot: QQAndroidBot, override val attributes: TypeSafeMap,
|
|
-) : PipelineContext {
|
|
|
|
|
|
+) : NoticePipelineContext {
|
|
private val consumers: Stack<Any> = Stack()
|
|
private val consumers: Stack<Any> = Stack()
|
|
|
|
|
|
override val isConsumed: Boolean get() = consumers.isNotEmpty()
|
|
override val isConsumed: Boolean get() = consumers.isNotEmpty()
|
|
override fun NoticeProcessor.markAsConsumed(marker: Any) {
|
|
override fun NoticeProcessor.markAsConsumed(marker: Any) {
|
|
|
|
+ traceLogging.info { "markAsConsumed: marker=$marker" }
|
|
consumers.push(marker)
|
|
consumers.push(marker)
|
|
}
|
|
}
|
|
|
|
|
|
override fun NoticeProcessor.markNotConsumed(marker: Any) {
|
|
override fun NoticeProcessor.markNotConsumed(marker: Any) {
|
|
if (consumers.peek() === marker) {
|
|
if (consumers.peek() === marker) {
|
|
consumers.pop()
|
|
consumers.pop()
|
|
|
|
+ traceLogging.info { "markNotConsumed: Y, marker=$marker" }
|
|
|
|
+ } else {
|
|
|
|
+ traceLogging.info { "markNotConsumed: N, marker=$marker" }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -157,17 +170,27 @@ internal abstract class AbstractPipelineContext(
|
|
|
|
|
|
override fun collect(packet: Packet) {
|
|
override fun collect(packet: Packet) {
|
|
collected.data.add(packet)
|
|
collected.data.add(packet)
|
|
|
|
+ traceLogging.info { "collect: $packet" }
|
|
}
|
|
}
|
|
|
|
|
|
override fun collect(packets: Iterable<Packet>) {
|
|
override fun collect(packets: Iterable<Packet>) {
|
|
this.collected.data.addAll(packets)
|
|
this.collected.data.addAll(packets)
|
|
|
|
+ traceLogging.info {
|
|
|
|
+ val list = packets.toList()
|
|
|
|
+ "collect: [${list.size}] ${list.joinToString()}"
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
abstract override suspend fun processAlso(data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult
|
|
abstract override suspend fun processAlso(data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
-internal inline val PipelineContext.context get() = this
|
|
|
|
|
|
+internal inline val NoticePipelineContext.context get() = this
|
|
|
|
+
|
|
|
|
+private val traceLogging: MiraiLogger by lazy {
|
|
|
|
+ MiraiLogger.Factory.create(NoticeProcessorPipelineImpl::class, "NoticeProcessorPipeline")
|
|
|
|
+ .withSwitch(systemProp("mirai.network.notice.pipeline.log.full", false))
|
|
|
|
+}
|
|
|
|
|
|
internal open class NoticeProcessorPipelineImpl private constructor() : NoticeProcessorPipeline {
|
|
internal open class NoticeProcessorPipelineImpl private constructor() : NoticeProcessorPipeline {
|
|
/**
|
|
/**
|
|
@@ -175,24 +198,37 @@ internal open class NoticeProcessorPipelineImpl private constructor() : NoticePr
|
|
*/
|
|
*/
|
|
private val processors = ConcurrentLinkedQueue<NoticeProcessor>()
|
|
private val processors = ConcurrentLinkedQueue<NoticeProcessor>()
|
|
|
|
|
|
- override fun registerProcessor(processor: NoticeProcessor) {
|
|
|
|
|
|
+ override fun registerProcessor(processor: NoticeProcessor): NoticeProcessorPipeline.DisposableRegistry {
|
|
processors.add(processor)
|
|
processors.add(processor)
|
|
|
|
+ return NoticeProcessorPipeline.DisposableRegistry {
|
|
|
|
+ processors.remove(processor)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
inner class ContextImpl(
|
|
inner class ContextImpl(
|
|
bot: QQAndroidBot, attributes: TypeSafeMap,
|
|
bot: QQAndroidBot, attributes: TypeSafeMap,
|
|
- ) : AbstractPipelineContext(bot, attributes) {
|
|
|
|
|
|
+ ) : AbstractNoticePipelineContext(bot, attributes) {
|
|
override suspend fun processAlso(data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult {
|
|
override suspend fun processAlso(data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult {
|
|
- return process(bot, data, this.attributes + attributes)
|
|
|
|
|
|
+ traceLogging.info { "processAlso: data=$data" }
|
|
|
|
+ return process(bot, data, this.attributes + attributes).also {
|
|
|
|
+ this.collected.data += it
|
|
|
|
+ traceLogging.info { "processAlso: result=$it" }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
override suspend fun process(bot: QQAndroidBot, data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult {
|
|
override suspend fun process(bot: QQAndroidBot, data: ProtocolStruct, attributes: TypeSafeMap): ProcessResult {
|
|
|
|
+ traceLogging.info { "process: data=$data" }
|
|
val context = ContextImpl(bot, attributes)
|
|
val context = ContextImpl(bot, attributes)
|
|
|
|
+
|
|
|
|
+ val diff = if (traceLogging.isEnabled) CollectionDiff<Packet>() else null
|
|
|
|
+ diff?.save(context.collected.data)
|
|
|
|
+
|
|
for (processor in processors) {
|
|
for (processor in processors) {
|
|
- kotlin.runCatching {
|
|
|
|
|
|
+
|
|
|
|
+ val result = kotlin.runCatching {
|
|
processor.process(context, data)
|
|
processor.process(context, data)
|
|
}.onFailure { e ->
|
|
}.onFailure { e ->
|
|
context.collect(
|
|
context.collect(
|
|
@@ -205,6 +241,16 @@ internal open class NoticeProcessorPipelineImpl private constructor() : NoticePr
|
|
),
|
|
),
|
|
)
|
|
)
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ diff?.run {
|
|
|
|
+ val diffPackets = subtractAndSave(context.collected.data)
|
|
|
|
+
|
|
|
|
+ traceLogging.info {
|
|
|
|
+ "Finished ${
|
|
|
|
+ processor.toString().replace("net.mamoe.mirai.internal.network.notice.", "")
|
|
|
|
+ }, success=${result.isSuccess}, consumed=${context.isConsumed}, diff=$diffPackets"
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
return context.collected.data
|
|
return context.collected.data
|
|
}
|
|
}
|
|
@@ -231,7 +277,7 @@ internal open class NoticeProcessorPipelineImpl private constructor() : NoticePr
|
|
* A processor handling some specific type of message.
|
|
* A processor handling some specific type of message.
|
|
*/
|
|
*/
|
|
internal interface NoticeProcessor {
|
|
internal interface NoticeProcessor {
|
|
- suspend fun process(context: PipelineContext, data: Any?)
|
|
|
|
|
|
+ suspend fun process(context: NoticePipelineContext, data: Any?)
|
|
}
|
|
}
|
|
|
|
|
|
internal abstract class AnyNoticeProcessor : SimpleNoticeProcessor<ProtocolStruct>(type())
|
|
internal abstract class AnyNoticeProcessor : SimpleNoticeProcessor<ProtocolStruct>(type())
|
|
@@ -240,13 +286,13 @@ internal abstract class SimpleNoticeProcessor<in T : ProtocolStruct>(
|
|
private val type: KClass<T>,
|
|
private val type: KClass<T>,
|
|
) : NoticeProcessor {
|
|
) : NoticeProcessor {
|
|
|
|
|
|
- final override suspend fun process(context: PipelineContext, data: Any?) {
|
|
|
|
|
|
+ final override suspend fun process(context: NoticePipelineContext, data: Any?) {
|
|
if (type.isInstance(data)) {
|
|
if (type.isInstance(data)) {
|
|
context.processImpl(data.uncheckedCast())
|
|
context.processImpl(data.uncheckedCast())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- protected abstract suspend fun PipelineContext.processImpl(data: T)
|
|
|
|
|
|
+ protected abstract suspend fun NoticePipelineContext.processImpl(data: T)
|
|
|
|
|
|
companion object {
|
|
companion object {
|
|
@JvmStatic
|
|
@JvmStatic
|
|
@@ -255,11 +301,11 @@ internal abstract class SimpleNoticeProcessor<in T : ProtocolStruct>(
|
|
}
|
|
}
|
|
|
|
|
|
internal abstract class MsgCommonMsgProcessor : SimpleNoticeProcessor<MsgComm.Msg>(type()) {
|
|
internal abstract class MsgCommonMsgProcessor : SimpleNoticeProcessor<MsgComm.Msg>(type()) {
|
|
- abstract override suspend fun PipelineContext.processImpl(data: MsgComm.Msg)
|
|
|
|
|
|
+ abstract override suspend fun NoticePipelineContext.processImpl(data: MsgComm.Msg)
|
|
}
|
|
}
|
|
|
|
|
|
internal abstract class MixedNoticeProcessor : AnyNoticeProcessor() {
|
|
internal abstract class MixedNoticeProcessor : AnyNoticeProcessor() {
|
|
- final override suspend fun PipelineContext.processImpl(data: ProtocolStruct) {
|
|
|
|
|
|
+ final override suspend fun NoticePipelineContext.processImpl(data: ProtocolStruct) {
|
|
when (data) {
|
|
when (data) {
|
|
is PbMsgInfo -> processImpl(data)
|
|
is PbMsgInfo -> processImpl(data)
|
|
is MsgOnlinePush.PbPushMsg -> processImpl(data)
|
|
is MsgOnlinePush.PbPushMsg -> processImpl(data)
|
|
@@ -272,13 +318,13 @@ internal abstract class MixedNoticeProcessor : AnyNoticeProcessor() {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- protected open suspend fun PipelineContext.processImpl(data: MsgType0x210) {} // 528
|
|
|
|
- protected open suspend fun PipelineContext.processImpl(data: MsgType0x2DC) {} // 732
|
|
|
|
- protected open suspend fun PipelineContext.processImpl(data: PbMsgInfo) {}
|
|
|
|
- protected open suspend fun PipelineContext.processImpl(data: MsgOnlinePush.PbPushMsg) {}
|
|
|
|
- protected open suspend fun PipelineContext.processImpl(data: MsgComm.Msg) {}
|
|
|
|
- protected open suspend fun PipelineContext.processImpl(data: Structmsg.StructMsg) {}
|
|
|
|
- protected open suspend fun PipelineContext.processImpl(data: RequestPushStatus) {}
|
|
|
|
|
|
+ protected open suspend fun NoticePipelineContext.processImpl(data: MsgType0x210) {} // 528
|
|
|
|
+ protected open suspend fun NoticePipelineContext.processImpl(data: MsgType0x2DC) {} // 732
|
|
|
|
+ protected open suspend fun NoticePipelineContext.processImpl(data: PbMsgInfo) {}
|
|
|
|
+ protected open suspend fun NoticePipelineContext.processImpl(data: MsgOnlinePush.PbPushMsg) {}
|
|
|
|
+ protected open suspend fun NoticePipelineContext.processImpl(data: MsgComm.Msg) {}
|
|
|
|
+ protected open suspend fun NoticePipelineContext.processImpl(data: Structmsg.StructMsg) {}
|
|
|
|
+ protected open suspend fun NoticePipelineContext.processImpl(data: RequestPushStatus) {}
|
|
|
|
|
|
- protected open suspend fun PipelineContext.processImpl(data: DecodedNotifyMsgBody) {}
|
|
|
|
|
|
+ protected open suspend fun NoticePipelineContext.processImpl(data: DecodedNotifyMsgBody) {}
|
|
}
|
|
}
|