Scala wrapper for kafka consumer and producer
Scala wrapper for kafka-clients v3.4.0
Kafka provides an official Java client out of the box, which could be used from
Scala code without any additional modifications.
The main disadvantage of using an official client directly is that it implies
a very specific threading model to the application. I.e. the consumer is not
thread safe and also expects a rebalance listener to do the operations in the
same thread.
This makes wrapping a client with Cats Effect
classes a bit more complicated than just calling IO { consumer.poll() }
unless
this is the only call, which is expected to be used.
Skafka does exactly that: a very thin wrapper over official Kafka client to
provide a ready-made Cats Effect API and handle some corner cases concerning
ConsumerRebalanceListener calls.
Comparing to more full-featured libraries such as
FS2 Kafka, it might be a little bit more
reliable, because there is little code/logic to hide the accidenital bugs in.
To summarize:
consumer.poll()
cats.effect.IO
, is a totally fine idea.Makes it easy to use your effect monad with help of cats-effect
Blocking calls are being executed on provided ExecutionContext
.
Simple case class
based configuration
Support of typesafe config
val producer = Producer.of[IO](config, ecBlocking)
val metadata: IO[RecordMetadata] = producer.use { producer =>
val record = ProducerRecord(topic = "topic", key = "key", value = "value")
producer.send(record).flatten
}
val consumer = Consumer.of[IO, String, String](config, ecBlocking)
val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>
for {
_ <- consumer.subscribe(Nel("topic"), None)
records <- consumer.poll(100.millis)
} yield records
}
The example below demonstrates creation of Consumer
, but same can be done for Producer
as well.
using
ConsumerMetricsOf.withJavaClientMetrics
(or its alternativemetrics.exposeJavaClientMetrics
)
registers new Prometheus collector under the hood. Please use unique prefixes for each collector
to avoid duplicated metrics in Prometheus (i.e. runtime exception on registration).
Prefix can be set as parameter in:ConsumerMetricsOf.withJavaClientMetrics(prometheus, Some("the_prefix"))
import ConsumerMetricsOf.*
val config: ConsumerConfig = ???
val prometheus: CollectorRegistry = ???
val metrics: ConsumerMetrics[IO] = ???
for {
metrics <- metrics.exposeJavaClientMetrics(prometheus)
consumerOf = ConsumerOf.apply1(metrics1.some)
consumer <- consumerOf(config)
} yield ???
addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")
libraryDependencies += "com.evolutiongaming" %% "skafka" % "15.0.0"
While Skafka provides an ability to use ConsumerRebalanceListener
functionality, not all of the method calls are supported.
See the following PRs for more details:
https://github.com/evolution-gaming/skafka/pull/150
https://github.com/evolution-gaming/skafka/pull/122
To our latest knowledge neither FS2 Kafka
supports all of the
methods / functionality.
The release process is based on Git tags and makes use of sbt-dynver to automatically obtain the version from the latest Git tag. The flow is defined in .github/workflows/release.yml
.
A typical release process is as follows:
vX.Y.Z
(example: v4.1.0
). Example: git tag v4.1.0 && git push origin v4.1.0
Releases
page, click Draft a new release
, select Choose a tag
, pick the tag you just createdGenerate release notes
. Release title will be automatically filled with the tag name. Change the description if neededPublish release