项目作者: jobcloud

项目描述 :
PHP Kafka producer / consumer library with PHP Avro support, based on php-rdkafka
高级语言: PHP
项目地址: git://github.com/jobcloud/php-kafka-lib.git
创建时间: 2019-12-08T10:23:41Z
项目社区:https://github.com/jobcloud/php-kafka-lib

开源协议:MIT License

下载


php-kafka-lib

CircleCI
Maintainability
Test Coverage
Latest Stable Version
Latest Unstable Version

Description

This is a library that makes it easier to use Kafka in your PHP project.

This library relies on arnaud-lb/php-rdkafka
Avro support relies on flix-tech/avro-serde-php
The documentation of the php extension,
can help out to understand the internals of this library.

Requirements

  • php: ^8.0
  • ext-rdkafka: >=4.0.0
  • librdkafka: >=0.11.6 (if you use <librdkafka:1.x please define your own error callback)

:warning: To use the transactional producer you’ll need:

  • ext-rdkafka: >=4.1.0
  • librdkafka: >=1.4

Installation

  1. composer require jobcloud/php-kafka-lib "~1.0"

Enable Avro support

If you need Avro support, run:

  1. composer require flix-tech/avro-serde-php "~1.4"

Usage

Producer

Kafka

Simple example
  1. <?php
  2. use Jobcloud\Kafka\Message\KafkaProducerMessage;
  3. use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
  4. $producer = KafkaProducerBuilder::create()
  5. ->withAdditionalBroker('localhost:9092')
  6. ->build();
  7. $message = KafkaProducerMessage::create('test-topic', 0)
  8. ->withKey('asdf-asdf-asfd-asdf')
  9. ->withBody('some test message payload')
  10. ->withHeaders([ 'key' => 'value' ]);
  11. $producer->produce($message);
  12. // Shutdown producer, flush messages that are in queue. Give up after 20s
  13. $result = $producer->flush(20000);
Transactional producer (needs >=php-rdkafka:4.1 and >=librdkafka:1.4)
  1. <?php
  2. use Jobcloud\Kafka\Message\KafkaProducerMessage;
  3. use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
  4. use Jobcloud\Kafka\Exception\KafkaProducerTransactionRetryException;
  5. use Jobcloud\Kafka\Exception\KafkaProducerTransactionAbortException;
  6. use Jobcloud\Kafka\Exception\KafkaProducerTransactionFatalException;
  7. $producer = KafkaProducerBuilder::create()
  8. ->withAdditionalBroker('localhost:9092')
  9. ->build();
  10. $message = KafkaProducerMessage::create('test-topic', 0)
  11. ->withKey('asdf-asdf-asfd-asdf')
  12. ->withBody('some test message payload')
  13. ->withHeaders([ 'key' => 'value' ]);
  14. try {
  15. $producer->beginTransaction(10000);
  16. $producer->produce($message);
  17. $producer->commitTransaction(10000);
  18. } catch (KafkaProducerTransactionRetryException $e) {
  19. // something went wrong but you can retry the failed call (either beginTransaction or commitTransaction)
  20. } catch (KafkaProducerTransactionAbortException $e) {
  21. // you need to call $producer->abortTransaction(10000); and try again
  22. } catch (KafkaProducerTransactionFatalException $e) {
  23. // something went very wrong, re-create your producer, otherwise you could jeopardize the idempotency guarantees
  24. }
  25. // Shutdown producer, flush messages that are in queue. Give up after 20s
  26. $result = $producer->flush(20000);
Avro Producer

To create an avro prodcuer add the avro encoder.

  1. <?php
  2. use FlixTech\AvroSerializer\Objects\RecordSerializer;
  3. use Jobcloud\Kafka\Message\KafkaProducerMessage;
  4. use Jobcloud\Kafka\Message\Encoder\AvroEncoder;
  5. use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
  6. use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
  7. use Jobcloud\Kafka\Message\KafkaAvroSchema;
  8. use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
  9. use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
  10. use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
  11. use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
  12. use GuzzleHttp\Client;
  13. $cachedRegistry = new CachedRegistry(
  14. new BlockingRegistry(
  15. new PromisingRegistry(
  16. new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081'])
  17. )
  18. ),
  19. new AvroObjectCacheAdapter()
  20. );
  21. $registry = new AvroSchemaRegistry($cachedRegistry);
  22. $recordSerializer = new RecordSerializer($cachedRegistry);
  23. //if no version is defined, latest version will be used
  24. //if no schema definition is defined, the appropriate version will be fetched form the registry
  25. $registry->addBodySchemaMappingForTopic(
  26. 'test-topic',
  27. new KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */)
  28. );
  29. $registry->addKeySchemaMappingForTopic(
  30. 'test-topic',
  31. new KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */)
  32. );
  33. // if you are only encoding key or value, you can pass that mode as additional third argument
  34. // per default both key and body will get encoded
  35. $encoder = new AvroEncoder($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */);
  36. $producer = KafkaProducerBuilder::create()
  37. ->withAdditionalBroker('kafka:9092')
  38. ->withEncoder($encoder)
  39. ->build();
  40. $schemaName = 'testSchema';
  41. $version = 1;
  42. $message = KafkaProducerMessage::create('test-topic', 0)
  43. ->withKey('asdf-asdf-asfd-asdf')
  44. ->withBody(['name' => 'someName'])
  45. ->withHeaders([ 'key' => 'value' ]);
  46. $producer->produce($message);
  47. // Shutdown producer, flush messages that are in queue. Give up after 20s
  48. $result = $producer->flush(20000);

NOTE: To improve producer latency you can install the pcntl extension.
The php-kafka-lib already has code in place, similarly described here:
https://github.com/arnaud-lb/php-rdkafka#performance--low-latency-settings

Consumer

Kafka High Level

  1. <?php
  2. use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
  3. use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
  4. use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
  5. use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
  6. $consumer = KafkaConsumerBuilder::create()
  7. ->withAdditionalConfig(
  8. [
  9. 'compression.codec' => 'lz4',
  10. 'auto.commit.interval.ms' => 500
  11. ]
  12. )
  13. ->withAdditionalBroker('kafka:9092')
  14. ->withConsumerGroup('testGroup')
  15. ->withAdditionalSubscription('test-topic')
  16. ->build();
  17. $consumer->subscribe();
  18. while (true) {
  19. try {
  20. $message = $consumer->consume();
  21. // your business logic
  22. $consumer->commit($message);
  23. } catch (KafkaConsumerTimeoutException $e) {
  24. //no messages were read in a given time
  25. } catch (KafkaConsumerEndOfPartitionException $e) {
  26. //only occurs if enable.partition.eof is true (default: false)
  27. } catch (KafkaConsumerConsumeException $e) {
  28. // Failed
  29. }
  30. }

Kafka Low Level

  1. <?php
  2. use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
  3. use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
  4. use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
  5. use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
  6. $consumer = KafkaConsumerBuilder::create()
  7. ->withAdditionalConfig(
  8. [
  9. 'compression.codec' => 'lz4',
  10. 'auto.commit.interval.ms' => 500
  11. ]
  12. )
  13. ->withAdditionalBroker('kafka:9092')
  14. ->withConsumerGroup('testGroup')
  15. ->withAdditionalSubscription('test-topic')
  16. ->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL)
  17. ->build();
  18. $consumer->subscribe();
  19. while (true) {
  20. try {
  21. $message = $consumer->consume();
  22. // your business logic
  23. $consumer->commit($message);
  24. } catch (KafkaConsumerTimeoutException $e) {
  25. //no messages were read in a given time
  26. } catch (KafkaConsumerEndOfPartitionException $e) {
  27. //only occurs if enable.partition.eof is true (default: false)
  28. } catch (KafkaConsumerConsumeException $e) {
  29. // Failed
  30. }
  31. }

Avro Consumer

To create an avro consumer add the avro decoder.

  1. <?php
  2. use FlixTech\AvroSerializer\Objects\RecordSerializer;
  3. use \Jobcloud\Messaging\Kafka\Consumer\KafkaConsumerBuilder;
  4. use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
  5. use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
  6. use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
  7. use Jobcloud\Kafka\Message\Decoder\AvroDecoder;
  8. use Jobcloud\Kafka\Message\KafkaAvroSchema;
  9. use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
  10. use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
  11. use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
  12. use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
  13. use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
  14. use GuzzleHttp\Client;
  15. $cachedRegistry = new CachedRegistry(
  16. new BlockingRegistry(
  17. new PromisingRegistry(
  18. new Client(['base_uri' => 'jobcloud-kafka-schema-registry:9081'])
  19. )
  20. ),
  21. new AvroObjectCacheAdapter()
  22. );
  23. $registry = new AvroSchemaRegistry($cachedRegistry);
  24. $recordSerializer = new RecordSerializer($cachedRegistry);
  25. //if no version is defined, latest version will be used
  26. //if no schema definition is defined, the appropriate version will be fetched form the registry
  27. $registry->addBodySchemaMappingForTopic(
  28. 'test-topic',
  29. new KafkaAvroSchema('bodySchema' , 9 /* , AvroSchema $definition */)
  30. );
  31. $registry->addKeySchemaMappingForTopic(
  32. 'test-topic',
  33. new KafkaAvroSchema('keySchema' , 9 /* , AvroSchema $definition */)
  34. );
  35. // If you are only encoding / decoding key or value, only register the schema(s) you need.
  36. // It is advised against doing that though, some tools might not play
  37. // nice if you don't fully encode your message
  38. $decoder = new AvroDecoder($registry, $recordSerializer);
  39. $consumer = KafkaConsumerBuilder::create()
  40. ->withAdditionalConfig(
  41. [
  42. 'compression.codec' => 'lz4',
  43. 'auto.commit.interval.ms' => 500
  44. ]
  45. )
  46. ->withDecoder($decoder)
  47. ->withAdditionalBroker('kafka:9092')
  48. ->withConsumerGroup('testGroup')
  49. ->withAdditionalSubscription('test-topic')
  50. ->build();
  51. $consumer->subscribe();
  52. while (true) {
  53. try {
  54. $message = $consumer->consume();
  55. // your business logic
  56. $consumer->commit($message);
  57. } catch (KafkaConsumerTimeoutException $e) {
  58. //no messages were read in a given time
  59. } catch (KafkaConsumerEndOfPartitionException $e) {
  60. //only occurs if enable.partition.eof is true (default: false)
  61. } catch (KafkaConsumerConsumeException $e) {
  62. // Failed
  63. }
  64. }

Additional information

Replaces messaging-lib
Check Migration.md for help to migrate.