项目作者: ihipop

项目描述 :
https://github.com/weiboad/kafka-php 的 DI fork
高级语言: PHP
项目地址: git://github.com/ihipop/kafka-php-di.git
创建时间: 2018-12-19T07:30:59Z
项目社区:https://github.com/ihipop/kafka-php-di

开源协议:Apache License 2.0

下载


Kafka-php

中文文档

QQ Group
Build Status
Packagist
Packagist
Packagist
GitHub issues
GitHub forks
GitHub stars
GitHub license

Kafka-php is a pure PHP kafka client that currently supports greater than 0.8.x version of Kafka, this project v0.2.x and v0.1.x are incompatible if using the original v0.1.x You can refer to the document
Kafka PHP v0.1.x Document, but it is recommended to switch to v0.2.x . v0.2.x use PHP asynchronous implementation and kafka broker interaction, more stable than v0.1.x efficient, because the use of PHP language so do not compile any expansion can be used to reduce the access and maintenance costs

Requirements

  • Minimum PHP version: 5.5
  • Kafka version greater than 0.8
  • The consumer module needs kafka broker version greater than 0.9.0

Installation

Add the lib directory to the PHP include_path and use an autoloader like the one in the examples directory (the code follows the PEAR/Zend one-class-per-file convention).

Composer Install

Simply add a dependency on nmred/kafka-php to your project’s composer.json file if you use Composer to manage the dependencies of your project. Here is a minimal example of a composer.json file :

  1. {
  2. "require": {
  3. "nmred/kafka-php": "0.2.*"
  4. }
  5. }

Configuration

Configuration properties are documented in Configuration

Producer

Asynchronous mode

  1. <?php
  2. require '../vendor/autoload.php';
  3. date_default_timezone_set('PRC');
  4. use Monolog\Logger;
  5. use Monolog\Handler\StdoutHandler;
  6. // Create the logger
  7. $logger = new Logger('my_logger');
  8. // Now add some handlers
  9. $logger->pushHandler(new StdoutHandler());
  10. $config = \Kafka\ProducerConfig::getInstance();
  11. $config->setMetadataRefreshIntervalMs(10000);
  12. $config->setMetadataBrokerList('10.13.4.159:9192');
  13. $config->setBrokerVersion('0.9.0.1');
  14. $config->setRequiredAck(1);
  15. $config->setIsAsyn(false);
  16. $config->setProduceInterval(500);
  17. $producer = new \Kafka\Producer(function() {
  18. return array(
  19. array(
  20. 'topic' => 'test',
  21. 'value' => 'test....message.',
  22. 'key' => 'testkey',
  23. ),
  24. );
  25. });
  26. $producer->setLogger($logger);
  27. $producer->success(function($result) {
  28. var_dump($result);
  29. });
  30. $producer->error(function($errorCode) {
  31. var_dump($errorCode);
  32. });
  33. $producer->send(true);

Synchronous mode

  1. <?php
  2. require '../vendor/autoload.php';
  3. date_default_timezone_set('PRC');
  4. use Monolog\Logger;
  5. use Monolog\Handler\StdoutHandler;
  6. // Create the logger
  7. $logger = new Logger('my_logger');
  8. // Now add some handlers
  9. $logger->pushHandler(new StdoutHandler());
  10. $config = \Kafka\ProducerConfig::getInstance();
  11. $config->setMetadataRefreshIntervalMs(10000);
  12. $config->setMetadataBrokerList('127.0.0.1:9192');
  13. $config->setBrokerVersion('0.9.0.1');
  14. $config->setRequiredAck(1);
  15. $config->setIsAsyn(false);
  16. $config->setProduceInterval(500);
  17. $producer = new \Kafka\Producer();
  18. $producer->setLogger($logger);
  19. for($i = 0; $i < 100; $i++) {
  20. $result = $producer->send(array(
  21. array(
  22. 'topic' => 'test1',
  23. 'value' => 'test1....message.',
  24. 'key' => '',
  25. ),
  26. ));
  27. var_dump($result);
  28. }

Consumer

  1. <?php
  2. require '../vendor/autoload.php';
  3. date_default_timezone_set('PRC');
  4. use Monolog\Logger;
  5. use Monolog\Handler\StdoutHandler;
  6. // Create the logger
  7. $logger = new Logger('my_logger');
  8. // Now add some handlers
  9. $logger->pushHandler(new StdoutHandler());
  10. $config = \Kafka\ConsumerConfig::getInstance();
  11. $config->setMetadataRefreshIntervalMs(10000);
  12. $config->setMetadataBrokerList('10.13.4.159:9192');
  13. $config->setGroupId('test');
  14. $config->setBrokerVersion('0.9.0.1');
  15. $config->setTopics(array('test'));
  16. //$config->setOffsetReset('earliest');
  17. $consumer = new \Kafka\Consumer();
  18. $consumer->setLogger($logger);
  19. $consumer->start(function($topic, $part, $message) {
  20. var_dump($message);
  21. });

Low-Level API

Refer Example

QQ Group

531522091
QQ Group