EventProcessor

事件流程处理器。推送一个事件 Event 并得到结果。 EventProcessor 在功能上可以认为是一组 EventListener 的统一处理单位。

协程上下文

当通过 EventProcessor.push 推送一个事件并得到一个事件处理链时, 这其中的每一个事件处理器所处上下文会通过 Flow.flowOn 运行在 EventDispatcherConfiguration.coroutineContext 中。

val app = launchApplication(...) {
eventDispatcher {
coroutineContext = context1 // 配置事件调度器的统一上下文
}
}

app.eventDispatcher.register { context ->
withContext(context2) { // 在事件处理逻辑内切换上下文
// ...
}
EventResult.empty()
}

val flow = app.eventDispatcher.push(event)
// 上游的逻辑会通过 flowOn 运行在 context1 中
.onEach { ... } // 会切换到 context3 上
.flowOn(context3) // 将上游调度器切换至 context3
.onEach { ... } // 会在 collect 所在的默认(当前)环境中
.collect { ... } // 在默认(当前)环境收集结果

参考上述示例,协程上下文的使用“优先级”可“近似地”参考为 context2 context1 context3

Java API

EventProcessors 的静态API中提供了一些非挂起的可供 Java 友好调用的API, 例如:

List<EventResult> resultList = EventProcessors.pushAndCollectToListAsync(processor, event, scope);
// ...
Flux<EventResult> resultList = EventProcessors.pushAndAsFlux(processor, event, scope);
// ...

其中提供了一些异步或响应式相关的转化、处理API。 对于它们各自的说明、限制、要求则在 push 的基础上参考它们的文档说明。

Author

ForteScarlet

Inheritors

Functions

Link copied to clipboard
abstract fun push(event: Event): Flow<EventResult>

推送一个事件, 得到内部所有事件依次将其处理后得到最终的结果流。

Link copied to clipboard
open fun pushAndLaunch(scope: CoroutineScope, event: Event): Job

通过 scope 将事件推送并异步处理,不关心事件的结果。

Inherited functions

Link copied to clipboard

推送事件并将结果转化为 Flux. 需要项目环境中存在 kotlinx-coroutines-reactor 依赖。

Link copied to clipboard

推送事件并将结果转化为 Stream 后返回。

Link copied to clipboard
suspend fun EventProcessor.pushAndCollect(event: Event, collector: FlowCollector<EventResult>? = null)

将事件推送并收集处理。

Link copied to clipboard

推送事件并将结果收集为 C 后返回 CompletableFuture.

推送事件并将结果使用 Collector 收集为 R 后返回 CompletableFuture.

Link copied to clipboard

推送事件并将结果收集为 C 后返回。

推送事件并将结果使用 Collector 收集为 R 后返回。

Link copied to clipboard

推送事件并将结果收集为 List 后返回 CompletableFuture.

Link copied to clipboard

推送事件并将结果收集为 List 后返回。

Link copied to clipboard

将事件推送并异步处理。

Link copied to clipboard
inline fun EventProcessor.pushAndLaunchThen(scope: CoroutineScope, event: Event, crossinline useFlow: (Flow<EventResult>) -> Unit): Job

将事件推送并异步处理。