项目作者: rouzwawi

项目描述 :
gRPC with Kotlin Coroutines
高级语言: Kotlin
项目地址: git://github.com/rouzwawi/grpc-kotlin.git
创建时间: 2018-03-08T16:21:13Z
项目社区:https://github.com/rouzwawi/grpc-kotlin

开源协议:Apache License 2.0

下载


gRPC Kotlin - Coroutine based gRPC for Kotlin

CircleCI
Maven Central

gRPC Kotlin is a protoc plugin for generating native Kotlin bindings using coroutine primitives for gRPC services.


Why?

The asynchronous nature of bidirectional streaming rpc calls in gRPC makes them a bit hard to implement
and read. Getting your head around the StreamObserver<T>‘s can be a bit tricky at times. Specially
with the method argument being the response observer and the return value being the request observer, it all
feels a bit backwards to what a plain old synchronous version of the handler would look like.

In situations where you’d want to coordinate several request and response messages in one call, you’ll and up
having to manage some tricky state and synchronization between the observers. There are reactive bindings
for gRPC which make this easier. But I think we can do better!

Enter Kotlin Coroutines! By generating native Kotlin stubs that allows us to use suspend functions and
Channel, we can write our handler and client code in an idiomatic and easy to read Kotlin style.

Quick start

Note: This has been tested with gRPC 1.25.0, protobuf 3.10.0, kotlin 1.3.61 and coroutines 1.3.3.

Add a gRPC service definition to your project

greeter.proto

  1. syntax = "proto3";
  2. package org.example.greeter;
  3. option java_package = "org.example.greeter";
  4. option java_multiple_files = true;
  5. message GreetRequest {
  6. string greeting = 1;
  7. }
  8. message GreetReply {
  9. string reply = 1;
  10. }
  11. service Greeter {
  12. rpc Greet (GreetRequest) returns (GreetReply);
  13. rpc GreetServerStream (GreetRequest) returns (stream GreetReply);
  14. rpc GreetClientStream (stream GreetRequest) returns (GreetReply);
  15. rpc GreetBidirectional (stream GreetRequest) returns (stream GreetReply);
  16. }

Run the protoc plugin to get the generated code, see build tool configuration

Server

After compilation, you’ll find the generated Kotlin code in the same package as the generated Java
code. A service base class named GreeterImplBase and a file with extension functions for the
client stub named GreeterStubExt.kt. Both the service base class and client stub extensions will
use suspend and Channel<T> instead of the typical StreamObserver<T> interfaces.

All functions have the suspend modifier so they can call into any suspending code, including the
core coroutine primitives like delay and async.

All the server streaming calls return a ReceiveChannel<TReply> and can easily be implemented using
produce<TReply>.

All client streaming calls receive an argument of ReceiveChannel<TRequest> where they can receive()
messages from the caller.

Here’s an example server that demonstrates how each type of endpoint is implemented.

  1. import kotlinx.coroutines.*
  2. import kotlinx.coroutines.channels.ReceiveChannel
  3. import kotlinx.coroutines.channels.produce
  4. import java.util.concurrent.Executors.newFixedThreadPool
  5. class GreeterImpl : GreeterImplBase(
  6. coroutineContext = newFixedThreadPool(4).asCoroutineDispatcher()
  7. ) {
  8. // unary rpc
  9. override suspend fun greet(request: GreetRequest): GreetReply {
  10. return GreetReply.newBuilder()
  11. .setReply("Hello " + request.greeting)
  12. .build()
  13. }
  14. // server streaming rpc
  15. override fun greetServerStream(request: GreetRequest) = produce<GreetReply> {
  16. send(GreetReply.newBuilder()
  17. .setReply("Hello ${request.greeting}!")
  18. .build())
  19. send(GreetReply.newBuilder()
  20. .setReply("Greetings ${request.greeting}!")
  21. .build())
  22. }
  23. // client streaming rpc
  24. override suspend fun greetClientStream(requestChannel: ReceiveChannel<GreetRequest>): GreetReply {
  25. val greetings = mutableListOf<String>()
  26. for (request in requestChannel) {
  27. greetings.add(request.greeting)
  28. }
  29. return GreetReply.newBuilder()
  30. .setReply("Hi to all of $greetings!")
  31. .build()
  32. }
  33. // bidirectional rpc
  34. override fun greetBidirectional(requestChannel: ReceiveChannel<GreetRequest>) = produce<GreetReply> {
  35. var count = 0
  36. for (request in requestChannel) {
  37. val n = count++
  38. launch {
  39. delay(1000)
  40. send(GreetReply.newBuilder()
  41. .setReply("Yo #$n ${request.greeting}")
  42. .build())
  43. }
  44. }
  45. }
  46. }

Client

Extensions functions for the original Java stubs are generated that use suspend functions, Deferred<TReply>
and SendChannel<TRequest>.

  1. import io.grpc.ManagedChannelBuilder
  2. import kotlinx.coroutines.delay
  3. import kotlinx.coroutines.launch
  4. import kotlinx.coroutines.runBlocking
  5. fun main(args: Array<String>) {
  6. val localhost = ManagedChannelBuilder.forAddress("localhost", 8080)
  7. .usePlaintext()
  8. .build()
  9. val greeter = GreeterGrpc.newStub(localhost)
  10. runBlocking {
  11. // === Unary call =============================================================================
  12. val unaryResponse = greeter.greet(req("Alice"))
  13. println("unary reply = ${unaryResponse.reply}")
  14. // === Server streaming call ==================================================================
  15. val serverResponses = greeter.greetServerStream(req("Bob"))
  16. for (serverResponse in serverResponses) {
  17. println("server response = ${serverResponse.reply}")
  18. }
  19. // === Client streaming call ==================================================================
  20. val manyToOneCall = greeter.greetClientStream()
  21. manyToOneCall.send(req("Caroline"))
  22. manyToOneCall.send(req("David"))
  23. manyToOneCall.close()
  24. val oneReply = manyToOneCall.await()
  25. println("single reply = ${oneReply.reply}")
  26. // === Bidirectional call =====================================================================
  27. val bidiCall = greeter.greetBidirectional()
  28. launch {
  29. var n = 0
  30. for (greetReply in bidiCall) {
  31. println("r$n = ${greetReply.reply}")
  32. n++
  33. }
  34. println("no more replies")
  35. }
  36. delay(200)
  37. bidiCall.send(req("Eve"))
  38. delay(200)
  39. bidiCall.send(req("Fred"))
  40. delay(200)
  41. bidiCall.send(req("Gina"))
  42. bidiCall.close()
  43. }
  44. }

gRPC Context propagation

gRPC has a thread-local Context which is used to carry scoped values across API boundaries. With Kotlin coroutines
possibly being dispatched on multiple threads, the thread-local nature of Context needs some special care. This is
solved by two details in the generated Kotlin code.

First, all the generated service *ImplBase classes implement CoroutineScope. This allows you to use any of
the top level coroutine primitives such as launch, async and produce in your service implementation while still
keeping them within the context of your service code. The actual CoroutineContext that is used can be set through the
base class constructor, but defaults to Dispatchers.default.

  1. abstract class MyServiceImplBase(
  2. coroutineContext: CoroutineContext = Dispatchers.Default
  3. )

Second, in the getter for CoroutineScope.coroutineContext, an additional context key is added to the
CoroutineContext that manages the gRPC Context attach() and detach() calls when dispatching coroutine
continuations. This will ensure that the the gRPC context is always propagated across different coroutine boundaries,
and eliminates the need to manually carry it across in user code.

Here’s a simple example that makes calls to other services concurrently and expects an authenticated user to be present
in the gRPC Context. The two accesses to the context key may execute on different threads in the CoroutineContext but
the accesses work as expected.

  1. val authenticatedUser = Context.key<User>("authenticatedUser")
  2. override suspend fun greet(request: GreetRequest): GreetReply {
  3. val motd = async { messageOfTheDay.getMessage() }
  4. val weatherReport = async { weather.getWeatherReport(authenticatedUser.get().location) }
  5. val reply = buildString {
  6. append("Hello ${authenticatedUser.get().fullName}")
  7. append("---")
  8. append("Today's weather report: ${weatherReport.await()}")
  9. append("---")
  10. append(motd.await())
  11. }
  12. return GreetReply.newBuilder()
  13. .setReply(reply)
  14. .build()
  15. }

For another example of gRPC Context usage, see the code in ContextBasedGreeterTest

Thanks to wfhartford for contributing!

Exception handling

The generated server code follows the standard exception propagation for Kotlin coroutines as described
in the Exception handling documentation. This means that it’s safe to throw exceptions from within
the server implementation code. These will propagate up the coroutine scope and be translated to
responseObserver.onError(Throwable) calls. The preferred way to respond with a status code is to
throw a StatusException.

Note that you should not call close(Throwable) or close() from within the ProducerScope<T>
blocks you get from produce as the producer will automatically be closed when all sub-contexts are
closed (or if an exception is thrown).

Maven configuration

Add the grpc-kotlin-gen plugin to your protobuf-maven-plugin configuration (see compile-custom goal)

  1. <properties>
  2. <kotlin.version>1.3.61</kotlin.version>
  3. <kotlinx-coroutines.version>1.3.3</kotlinx-coroutines.version>
  4. <grpc.version>1.25.0</grpc.version>
  5. <protobuf.version>3.10.0</protobuf.version>
  6. <grpc-kotlin.version>0.1.4</grpc-kotlin.version>
  7. </properties>
  8. <dependencies>
  9. <dependency>
  10. <groupId>org.jetbrains.kotlin</groupId>
  11. <artifactId>kotlin-stdlib</artifactId>
  12. <version>${kotlin.version}</version>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.jetbrains.kotlinx</groupId>
  16. <artifactId>kotlinx-coroutines-core</artifactId>
  17. <version>${kotlinx-coroutines.version}</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>io.grpc</groupId>
  21. <artifactId>grpc-netty</artifactId>
  22. <version>${grpc.version}</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>io.grpc</groupId>
  26. <artifactId>grpc-protobuf</artifactId>
  27. <version>${grpc.version}</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>io.grpc</groupId>
  31. <artifactId>grpc-stub</artifactId>
  32. <version>${grpc.version}</version>
  33. </dependency>
  34. </dependencies>
  35. <build>
  36. <extensions>
  37. <extension>
  38. <groupId>kr.motd.maven</groupId>
  39. <artifactId>os-maven-plugin</artifactId>
  40. <version>1.5.0.Final</version>
  41. </extension>
  42. </extensions>
  43. <plugins>
  44. <plugin>
  45. <groupId>org.xolstice.maven.plugins</groupId>
  46. <artifactId>protobuf-maven-plugin</artifactId>
  47. <version>0.6.1</version>
  48. <configuration>
  49. <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
  50. </configuration>
  51. <executions>
  52. <execution>
  53. <goals><goal>compile</goal></goals>
  54. </execution>
  55. <execution>
  56. <id>grpc-java</id>
  57. <goals><goal>compile-custom</goal></goals>
  58. <configuration>
  59. <pluginId>grpc-java</pluginId>
  60. <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
  61. </configuration>
  62. </execution>
  63. <execution>
  64. <id>grpc-kotlin</id>
  65. <goals><goal>compile-custom</goal></goals>
  66. <configuration>
  67. <pluginId>grpc-kotlin</pluginId>
  68. <pluginArtifact>io.rouz:grpc-kotlin-gen:${grpc-kotlin.version}:exe:${os.detected.classifier}</pluginArtifact>
  69. </configuration>
  70. </execution>
  71. </executions>
  72. </plugin>
  73. <!-- make sure to add the generated source directories to the kotlin-maven-plugin -->
  74. <plugin>
  75. <artifactId>kotlin-maven-plugin</artifactId>
  76. <groupId>org.jetbrains.kotlin</groupId>
  77. <version>${kotlin.version}</version>
  78. <executions>
  79. <execution>
  80. <id>compile</id>
  81. <goals><goal>compile</goal></goals>
  82. <configuration>
  83. <sourceDirs>
  84. <sourceDir>${project.basedir}/src/main/kotlin</sourceDir>
  85. <sourceDir>${project.basedir}/target/generated-sources/protobuf/grpc-kotlin</sourceDir>
  86. <sourceDir>${project.basedir}/target/generated-sources/protobuf/grpc-java</sourceDir>
  87. <sourceDir>${project.basedir}/target/generated-sources/protobuf/java</sourceDir>
  88. </sourceDirs>
  89. </configuration>
  90. </execution>
  91. </executions>
  92. </plugin>
  93. </plugins>
  94. </build>

Gradle configuration

Add the grpc-kotlin-gen plugin to the plugins section of protobuf-gradle-plugin

  1. def protobufVersion = '3.10.0'
  2. def grpcVersion = '1.25.0'
  3. def grpcKotlinVersion = '0.1.4'
  4. protobuf {
  5. protoc {
  6. // The artifact spec for the Protobuf Compiler
  7. artifact = "com.google.protobuf:protoc:${protobufVersion}"
  8. }
  9. plugins {
  10. grpc {
  11. artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
  12. }
  13. grpckotlin {
  14. artifact = "io.rouz:grpc-kotlin-gen:${grpcKotlinVersion}"
  15. }
  16. }
  17. generateProtoTasks {
  18. all()*.plugins {
  19. grpc {}
  20. grpckotlin {}
  21. }
  22. }
  23. }

Add the kotlin dependencies

  1. def kotlinVersion = '1.3.61'
  2. def kotlinCoroutinesVersion = '1.3.3'
  3. dependencies {
  4. compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlinVersion"
  5. compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion"
  6. }

Examples

This is a list of example gRPC services and clients written using this project

RPC method type reference

Unary call

rpc Greet (GreetRequest) returns (GreetReply);

Service

A suspendable function which returns a single message.

  1. override suspend fun greet(request: GreetRequest): GreetReply {
  2. // return GreetReply message
  3. }

Client

Suspendable call returning a single message.

  1. val response: GreetReply = stub.greet( /* GreetRequest */ )

Client streaming call

rpc GreetClientStream (stream GreetRequest) returns (GreetReply);

Service

A suspendable function which returns a single message, and receives messages from a ReceiveChannel<T>.

  1. override suspend fun greetClientStream(requestChannel: ReceiveChannel<GreetRequest>): GreetReply {
  2. // receive request messages
  3. val firstRequest = requestChannel.receive()
  4. // or iterate all request messages
  5. for (request in requestChannel) {
  6. // ...
  7. }
  8. // return GreetReply message
  9. }

Client

Using send() and close() on SendChannel<T>.

  1. val call: ManyToOneCall<GreetRequest, GreetReply> = stub.greetClientStream()
  2. call.send( /* GreetRequest */ )
  3. call.send( /* GreetRequest */ )
  4. call.close() // don't forget to close the send channel
  5. val responseMessage = call.await()

Server streaming call

rpc GreetServerStream (GreetRequest) returns (stream GreetReply);

Service

Using produce and send() to send a stream of messages.

  1. override fun greetServerStream(request: GreetRequest) = produce<GreetReply> {
  2. send( /* GreetReply message */ )
  3. send( /* GreetReply message */ )
  4. // ...
  5. }

Note that close() or close(Throwable) should not be used, see Exception handling.

In kotlinx-coroutines-core:1.0.0 produce is marked with @ExperimentalCoroutinesApi. In order
to use it, mark your server class with @UseExperimental(ExperimentalCoroutinesApi::class) and
add the -Xuse-experimental=kotlin.Experimental compiler flag.

Client

Using receive() on ReceiveChannel<T> or iterating with a for loop.

  1. val responses: ReceiveChannel<GreetReply> = stub.greetServerStream( /* GreetRequest */ )
  2. // await individual responses
  3. val responseMessage = serverResponses.receive()
  4. // or iterate all responses
  5. for (responseMessage in responses) {
  6. // ...
  7. }

Full bidirectional streaming call

rpc GreetBidirectional (stream GreetRequest) returns (stream GreetReply);

Service

Using produce and send() to send a stream of messages. Receiving messages from a ReceiveChannel<T>.

  1. override fun greetBidirectional(requestChannel: ReceiveChannel<GreetRequest>) = produce<GreetReply> {
  2. // receive request messages
  3. val firstRequest = requestChannel.receive()
  4. send( /* GreetReply message */ )
  5. val more = requestChannel.receive()
  6. send( /* GreetReply message */ )
  7. // ...
  8. }

Note that close() or close(Throwable) should not be used, see Exception handling.

In kotlinx-coroutines-core:1.0.0 produce is marked with @ExperimentalCoroutinesApi. In order
to use it, mark your server class with @UseExperimental(ExperimentalCoroutinesApi::class) and
add the -Xuse-experimental=kotlin.Experimental compiler flag.

Client

Using both a SendChannel<T> and a ReceiveChannel<T> to interact with the call.

  1. val call: ManyToManyCall<GreetRequest, GreetReply> = stub.greetBidirectional()
  2. launch {
  3. for (responseMessage in call) {
  4. log.info(responseMessage)
  5. }
  6. log.info("no more replies")
  7. }
  8. call.send( /* GreetRequest */ )
  9. call.send( /* GreetRequest */ )
  10. call.close() // don't forget to close the send channel