Comparison with Reactive
comparison with reactive libraries
| Feature |
RxJava |
Reactor |
Coroutine |
| 여러번 발행 가능한 스트림 |
Flowable |
Flux |
Flow |
| 한번만 발행 가능한 단일 value 스트림 |
Maybe |
Mono |
suspend fun |
| 시작 스케줄러 지정 |
.subscribeOn(Scheduler) |
.subscribeOn(Scheduler) |
CoroutineScope(Dispatcher) |
| 중간 스케줄러 지정 |
.observeOn(Scheduler) |
.publishOn(Scheduler) |
Flow: flowOn(Dispatcher) </br> suspend fun: CoroutineScope(Dispatcher) |
| block |
.blockingGet() |
.block() |
runBlocking { / coroutine codes / } |
| 변환 |
.map {} |
.map {} |
Flow: .map {} </br> suspend fun: just using imperative codes |
| 필터 |
.filter {} |
.filter {} |
Flow : .filter {} </br> suspend fun: just using imperative codes |
| List로 변환 |
Flowable.toList() |
Flux.collectList() |
Flow.toList() |
reactor flatMap vs coroutine suspend
Mono
Mono.just(1000)
.subscribeOn(Schedulers.boundedElastic())
.flatMap { money -> monoBuy(money) }
.flatMap { result -> monoRefund(result) }
.subscribe({
log.info("success $it")
}, {
log.info("failed $it")
})
suspend
val money = 1000
try {
val receipt = suspendBuy(money)
val result = suspendRefund(receipt)
log.info("success $result")
} catch (e: Exception) {
log.info("failed $e")
}
reactor subscribeOn vs coroutine withContext
reactor subscribeOn
Mono.fromCallable { "inside reactor scheduler" }
.subscribeOn(Schedulers.boundedElastic())
coroutine withContext
withContext(Dispatchers.IO) {
"inside coroutine dispatcher"
}
reactor subscribe vs coroutine launch
reactor subscribe
Mono.fromCallable { "inside mono" }
.subscribe { message -> log.info("$message") }
coroutine launch
CoroutineScope(Dispatchers.IO).launch {
log.info("inside coroutine")
}
reactor zip vs coroutine async
reactor zip
val cat = Mono.just("cat")
.subscribeOn(Schedulers.parallel())
val dog = Mono.just("dog")
.subscribeOn(Schedulers.parallel())
val hamster = Mono.just("hamster")
.subscribeOn(Schedulers.parallel())
val result = Mono.zip(job1, job2, job3).block()
println(result.t1)
println(result.t2)
println(result.t3)
coroutine async
val cat = CoroutineScope(Dispatchers.IO).async { "cat" }
val dog = CoroutineScope(Dispatchers.IO).async { "dog" }
val hamster = CoroutineScope(Dispatchers.IO).async { "hamster" }
println(cat.await())
println(dog.await())
println(hamster.await())
reactor parallel flux vs coroutine parallel flow
reactor parallel flux
Flux.range(1,10)
.parallel(10)
.runOn(Schedulers.newParallel("parallel", 10))
.map { num ->
sleep(1000)
"$num received"
}.subscribe { log.info("$it") }
coroutine parallel flow
val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
CoroutineScope(dispatcher).launch {
(1..10).asFlow()
.map { num ->
this.async {
sleep(1000)
"$num received"
}
}
.buffer(10)
.map { deferred -> deferred.await() }
.collect {
log.info("$it")
}
}
handling error
reactor
fun `reactor error handling test`() {
Mono.just("").map {
throw IllegalStateException("reactor mono error!!")
}.map {
"unreached"
}.onErrorResume {
Mono.just("$it")
}.block().also { println("result: $it") }
}
coroutine
fun `coroutine error handling test`() {
val result = runBlocking {
try {
throwError()
} catch (e: Exception) {
"error $e"
}
}
println("result :$result")
}
private suspend fun throwError() {
delay(10)
throw IllegalStateException()
}