项目作者: simPod

项目描述 :
PHP Kafka boilerplate wrapper around RdKafka
高级语言: PHP
项目地址: git://github.com/simPod/PhpKafka.git
创建时间: 2019-06-16T12:01:16Z
项目社区:https://github.com/simPod/PhpKafka

开源协议:MIT License

下载


PHP Kafka boilerplate wrapper around RdKafka

GitHub Actions
Code Coverage
Downloads
Packagist
Infection MSI

Installation

Add as Composer dependency:

  1. composer require simpod/kafka

Config Constants

Some config constants are provided like ConsumerConfig, ProducerConfig or CommonClientConfigs.

However, they are copied from Java API and not all are applicable to librdkafka. Consult with librdkafka documentation before use.

Clients

Consumer

KafkaConsumer boilerplate is available with startBatch() method (to suplement this example in librdkafka) and with start(). They also handle
termination signals for you.

Classic Consumer

  1. <?php
  2. declare(strict_types=1);
  3. namespace Your\AppNamespace;
  4. use RdKafka\Message;
  5. use SimPod\Kafka\Clients\Consumer\ConsumerConfig;
  6. use SimPod\Kafka\Clients\Consumer\KafkaConsumer;
  7. final class ExampleConsumer
  8. {
  9. public function run(): void
  10. {
  11. $kafkaConsumer = new KafkaConsumer($this->getConfig(), Logger::get());
  12. $kafkaConsumer->subscribe(['topic1']);
  13. $kafkaConsumer->start(
  14. 120 * 1000,
  15. static function (Message $message) use ($kafkaConsumer) : void {
  16. // Process message here
  17. $kafkaConsumer->commit($message); // Autocommit is disabled
  18. }
  19. );
  20. }
  21. private function getConfig(): ConsumerConfig
  22. {
  23. $config = new ConsumerConfig();
  24. $config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, '127.0.0.1:9092');
  25. $config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false);
  26. $config->set(ConsumerConfig::CLIENT_ID_CONFIG, gethostname());
  27. $config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest');
  28. $config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group_name');
  29. return $config;
  30. }
  31. }

Batching Consumer

  1. <?php
  2. declare(strict_types=1);
  3. namespace Your\AppNamespace;
  4. use RdKafka\Message;
  5. use SimPod\Kafka\Clients\Consumer\ConsumerConfig;
  6. use SimPod\Kafka\Clients\Consumer\ConsumerRecords;
  7. use SimPod\Kafka\Clients\Consumer\KafkaConsumer;
  8. final class ExampleBatchConsumer
  9. {
  10. public function run(): void
  11. {
  12. $kafkaConsumer = new KafkaConsumer($this->getConfig());
  13. $kafkaConsumer->subscribe(['topic1']);
  14. $kafkaConsumer->startBatch(
  15. 200000,
  16. 120 * 1000,
  17. static function (Message $message): void {
  18. // Process record
  19. },
  20. static function (ConsumerRecords $consumerRecords) use ($kafkaConsumer) : void {
  21. // Process records batch
  22. $kafkaConsumer->commit($consumerRecords->getLast());
  23. }
  24. );
  25. }
  26. private function getConfig(): ConsumerConfig
  27. {
  28. $config = new ConsumerConfig();
  29. $config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, '127.0.0.1:9092');
  30. $config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false);
  31. $config->set(ConsumerConfig::CLIENT_ID_CONFIG, gethostname());
  32. $config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest');
  33. $config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group_name');
  34. return $config;
  35. }
  36. }