项目作者: ikhoury

项目描述 :
Redis work queue processor
高级语言: Java
项目地址: git://github.com/ikhoury/rstreamer.git
创建时间: 2019-07-28T20:14:03Z
项目社区:https://github.com/ikhoury/rstreamer

开源协议:Apache License 2.0

下载


Rstreamer: Redis work queue processor

CircleCI
Quality Gate Status
Maven Central

This library can be used to implement applications that need to process messages from redis queues.

Setup

The project is build using maven with JDK 11. Run mvn install to have the artefact available in your local repostiory.

Concepts

A WorkSubscription describes a group of interested message handlers that want to process items from a redis queue.
The subscription is activated using the SubscriptionManager. It runs background threads that poll for tasks and process them using workers.
RStreamer is more efficient than regular polling using jedis because of the following:

  • Several tasks can be fetched in one batch which is far more network efficient and allows workers to process this batch in one run.
  • Each WorkSubscription has one thread dedicated to fetching tasks from redis.
  • Workers process tasks asynchronously on a dedicated thread pool for each subscription.
  • Tasks can be processed concurrently. The concurrency level is controlled using Leases which effectively back-pressure the polling thread, making it wait until capacity is available before fetching more tasks from the queue.

Subscription Manager

All subscriptions must be added to the SubscriptionManager prior to activation.
The manager depends on a RedisBatchPoller, the application driver for polling tasks from redis.
Its current implementation is JedisBatchPoller, which uses the jedis driver to communicate with redis.
To ensure graceful shutdown, call deactivateSubscriptions() before exiting your application to stop polling from redis and finish processing outstanding tasks.

Configuration

BatchPollerConfig

A RedisBatchPoller is configured using a BatchPollerConfig.

  1. BatchPollerConfig batchPollerConfig = BatchPollerConfigBuilder.defaultBatchPollerConfig()
  2. .withHost("myhost")
  3. .withPort(6379)
  4. .build();

The default config assumes you are connecting to the default redis port on localhost.

ReliableBatchPollerConfig

A ReliableBatchPoller is configured using a ReliableBatchPollerConfig.

  1. ReliableBatchPollerConfig reliableBatchPollerConfig = defaultReliableBatchPollerConfig()
  2. .withRetryAttempts(3)
  3. .withFailureRateThreshold(70)
  4. .build();

A sample of the operations are evaluated. If the sample’s failure rate is above the set threshold,
the circuit breaker will open for a while and stop the caller from making requests to an unresponsive server.

SubscriptionManagerConfig

SubscriptionManager is configured using SubscriptionManagerConfig.
This config holds a LeaseConfig and a PollingConfig.

  1. SubscriptionManagerConfig config = SubscriptionManageConfigBuilder.defaultSubscriptionManagerConfig()
  2. .with(
  3. LeaseConfigBuilder.defaultLeaseConfig()
  4. .withAvailableLeases(5)
  5. )
  6. .with(
  7. PollingConfigBuilder.defaultPollingConfig()
  8. .withBatchSize(50)
  9. .withBatchSizeThreshold(20)
  10. )
  11. .build();

LeaseConfig

The number of available leases controls the concurrency level for each subscription.
The larger the number, the more tasks (or group of tasks) can be processed in parallel before blocking the polling thread.

PollingConfig

It is more efficient to single poll than to batch poll a queue with very few items. Single polling uses redis’s blocking operation and hence can wait on the server side until an item is inserted. On the other hand, batch polling will continuously try to fetch a list of items. Therefore, a batchSizeThreshold parameter is used to specify the minimum number of items that must be fetched in a batch in order to continue batch polling in the next round.

Sample snippets

Worker

  1. public class SampleWorker implements Worker {
  2. @Override
  3. public void processSingleItem(String item) {
  4. System.out.println("Got item: " + item);
  5. }
  6. }

Batch Worker

  1. public class SampleBatchWorker implements BatchWorker {
  2. @Override
  3. public void processMultipleItems(List<String> items) {
  4. System.out.println("Got " + items.size() + " items");
  5. }
  6. @Override
  7. public void processSingleItem(String item) {
  8. System.out.println("Got item: " + item);
  9. }
  10. }

Work Subscription

  1. List<Worker> workers = new ArrayList();
  2. workers.add(new SampleWorker());
  3. workers.add(new SampleBatchWorker());
  4. WorkSubscription subscription = new WorkSubscription("my:task:queue", workers);

Batch Poller

  1. BatchPollerConfig batchPollerConfig = defaultBatchPollerConfig()
  2. .withHost("java-0")
  3. .build();
  4. RedisBatchPoller batchPoller = new JedisBatchPoller(batchPollerConfig, subscriptionCount);

Reliable Batch Poller

  1. ReliableBatchPollerConfig reliableBatchPollerConfig = defaultReliableBatchPollerConfig()
  2. .withRetryAttempts(3)
  3. .withFailureRateThreshold(70)
  4. .build();
  5. RedisBatchPoller reliableBatchPoller = new Resilience4jReliableBatchPoller(batchPoller, reliableBatchPollerConfig, subscriptionCount);

Subscription Manager

  1. SubscriptionManager subscriptionManager = new SubscriptionManager(createSubscriptionManagerConfig(), createRedisPoller());
  2. subscriptionManager.addSubscription(subscription);
  3. subscriptionManager.activateSubscriptions();
  4. Runtime.getRuntime().addShutdownHook(new Thread(subscriptionManager::deactivateSubscriptions));