项目作者: qntfy

项目描述 :
Frizzle for AWS Kinesis
高级语言: Go
项目地址: git://github.com/qntfy/frinesis.git
创建时间: 2019-01-23T22:53:32Z
项目社区:https://github.com/qntfy/frinesis

开源协议:MIT License

下载


frinesis

Travis Build Status
Coverage Status
MIT licensed
GitHub release
Go Report Card
GoDoc

An AWS Kinesis implementation of a Frizzle Sink.

In addition to the AWS Kinesis SDK for Go, Frinesis uses a modified version of
sendgridlabs/go-kinesis/batchproducer (under separate MIT license).

Frizzle is a magic message (Msg) bus designed for parallel processing w many goroutines.

  • Receive() messages from a configured Source
  • Do your processing, possibly Send() each Msg on to one or more Sink destinations
  • Ack() (or Fail()) the Msg to notify the Source that processing completed

Prereqs / Build instructions

Go mod

As of Go 1.11, frinesis uses go mod for dependency management.

Running the tests

Frinesis has integration tests which require a kinesis endpoint to test against. KINESIS_ENDPOINT environment variable is
used by tests. We test with a localstack instance (docker-compose.yml provided) but other
tools like kinesalite could also work.

  1. $ docker-compose up -d
  2. # takes a few seconds to initialize; can use a tool like wait-for-it.sh in scripting
  3. $ export KINESIS_ENDPOINT=localhost:4568
  4. $ go test -v --cover ./...

Configuration

Frinesis Sinks are configured using Viper.

  1. func InitSink(config *viper.Viper) (*Sink, error)
  2. InitSinkWithLogger(config *viper.Viper, logger *zap.Logger)

We typically initialize Viper through environment variables (but client can do whatever it wants,
just needs to provide the configured Viper object with relevant values). The application might
use a prefix before the below values.

Variable Required Description Default
AWS_REGION_NAME required region being used e.g. us-east-1
KINESIS_ENDPOINT optional if using a custom endpoint e.g. for local testing. Defaults to AWS standard internal and retrieving credentials from IAM if not set. http:// prefixed if no scheme set
KINESIS_FLUSH_TIMEOUT sink (optional) how long to wait for Kinesis Sink to flush remaining messages on close (use duration) 30s

Async Error Handling

Since records are sent in batch fashion, Kinesis may report errors asynchronously.
Errors can be recovered via channel returned by the Sink.Events() method.
In addition to the String() method required by frizzle, currently only errors are
returned by frinesis (no other event types) so all Events recovered will also conform
to error interface.

Contributing

Contributions welcome! Take a look at open issues.