example
Github Link
SchedulerAndDispatcher
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.reactor.asCoroutineDispatcher
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
object SchedulerAndDispatcher {
val IO_SCHEDULER : Scheduler = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "reactor-io")
val IO_DISPATCHER: CoroutineDispatcher = IO_SCHEDULER.asCoroutineDispatcher()
}
TestRepository
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitFirstOrNull
import org.springframework.stereotype.Repository
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@Repository
class TestRepository(
private val testReactiveRepository: TestReactiveRepository,
private val testJpaRepository: TestJpaRepository
){
suspend fun findById(): String? = testReactiveRepository.findById().awaitFirstOrNull()
suspend fun findAll(): Flow<String> = testReactiveRepository.findAll().asFlow()
suspend fun findByIdWithJpa(): String? = testJpaRepository.findByIdOrNull()
suspend fun findAllWithJpa(): List<String> = testJpaRepository.findAll()
}
@Repository
class TestReactiveRepository{
fun findAll(): Flux<String> = Flux.just("this ", " is ", " coroutine", " flow")
fun findById(): Mono<String> = Mono.just("hello world!!!")
}
@Repository
class TestJpaRepository {
fun findAll(): List<String> = listOf("this ", " is ", " coroutine ", " flow ")
fun findByIdOrNull(): String? = "hell world!!!"
}
grpc service
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactor.mono
import org.lognet.springboot.grpc.GRpcService
import reactor.core.publisher.Mono
@GRpcService
class HelloGrpcService(private val testRepository: TestRepository)
: ReactorHelloServiceGrpc.HelloServiceImplBase() {
override fun getHello(request: Mono<Hello.Name>): Mono<Hello.Response> = request
.flatMap { requestName ->
mono(SchedulerAndDispatcher.IO_DISPATCHER) {
val getSuspend = CoroutineScope(SchedulerAndDispatcher.IO_DISPATCHER).async {
testRepository.findById()
}
val getFlowAsList = CoroutineScope(SchedulerAndDispatcher.IO_DISPATCHER).async {
testRepository.findAll().toList()
}
"hello ${requestName.value}\n${getFlowAsList.await()}\n${getSuspend.await()}"
}
}
.map { result ->
Hello.Response.newBuilder()
.setMessage(result)
.build()
}
}