项目作者: Nirovision

项目描述 :
SQS using FS2
高级语言: Scala
项目地址: git://github.com/Nirovision/fs2-sqs.git
创建时间: 2016-12-01T01:40:17Z
项目社区:https://github.com/Nirovision/fs2-sqs

开源协议:MIT License

下载


fs2-sqs

Build Status
Download

SQS using FS2.

Overview

  • Lightweight wrapper over the raw Java AWS SDK. Doesn’t wrap the types, so you
    gain the full power of the AWS SDK and aren’t forced around a leaky abstraction.
  • Unopinionated primitive building blocks.

Goals

  • Automatic, hands off ability to perform batching of messages to reduce costs
  • Dead lettering of failed messages

Quick examples

Included are explicit types for the sake of clarity

Publishing messages

  1. ...
  2. // Construct an infinite Stream SendMessageRequest's, with the same body "123"
  3. val messageRequestsStream: Stream[Task, SendMessageRequest] =
  4. Stream.constant(new SendMessageRequest(queueUrl, "123")).repeat
  5. // Construct a Publish pipe that can turn SendMessageRequest's into SendMessageResult's
  6. val publishPipe: Pipe[Task, SendMessageRequest, SendMessageResult] = FS2SQS.publishPipe(client)
  7. def loggingSink[A]: Sink[Task, A] = { s =>
  8. s.map { i =>
  9. println(i)
  10. }
  11. }
  12. // Compose our stream and pipe.
  13. val effect = messageRequestsStream
  14. .through(publishPipe)
  15. .to(loggingSink)
  16. .onError(e => Stream.emit(println("Error: " + e.getMessage)))
  17. // Lift our effect into a Task, and run it.
  18. effect.run.unsafeRun()

Consuming messages

  1. ...
  2. // Construct a request to get messages from SQS
  3. val messageRequest = new ReceiveMessageRequest(queueUrl)
  4. .withMaxNumberOfMessages(1)
  5. .withWaitTimeSeconds(10)
  6. // Construct an infinite stream of Messages from SQS
  7. val messagesStream: Stream[Task, Message] = FS2SQS.messageStream(client, messageRequest)
  8. // A sink that can acknowledge Messages using a MessageAction
  9. val ackSink: Sink[Task, (Message, (Message) => MessageAction)] = FS2SQS.ackSink(client)
  10. // A pipe that either deletes or requeues the message
  11. val workPipe: Pipe[Task, Message, (Message, (Message) => MessageAction)] = { messages =>
  12. messages.map { message =>
  13. if (message.getBody == "DOM") {
  14. (message, (m: Message) => Right(new DeleteMessageRequest(queueUrl, m.getReceiptHandle)))
  15. } else {
  16. (message, (m: Message) => Left(new SendMessageRequest(queueUrl, m.getBody)))
  17. }
  18. }
  19. }
  20. // Compose our stream, work pipe and ack sink
  21. val effect: Stream[Task, Unit] = messagesStream
  22. .through(workPipe)
  23. .through(ackSink)
  24. // Lift our effect into a Task, and run it.
  25. effect.run.unsafeRun()

Pipes, Streams and Sinks

The FS2 primitives are provided in FS2SQS.scala. You can use these as building blocks around SQS.

publishPipe Pipe[Task, SendMessageRequest, SendMessageResult]

Publishes messages to SQS.

messageStream Stream[Task, Message]

An infinite stream of SQS messages.

ackSink Sink[Task, (Message, (Message => MessageAction))]

A sink that accepts functions from Message => MessageAction. MessageAction is a type alias for
Either[SendMessageRequest, DeleteMessageRequest]. In other words, this sink accepts functions from Message to
either a SendMessageRequest (for requeuing), or DeleteMessageRequest (for successful acknowledgement).

Installation

As a library:

Just add this to your build.sbt

  1. "com.imageintelligence" %% "fs2-sqs" % "1.0.0"

As a project to work on

Clone the repository:

  1. git clone https://github.com/ImageIntelligence/fs2-sqs.git

Compile

  1. sbt compile

Test

  1. sbt test

Examples:

Please see the examples directory.