项目作者: tangxunye

项目描述 :
php 操作最新版本的kafka
高级语言: PHP
项目地址: git://github.com/tangxunye/php-kafka-swoole.git
创建时间: 2016-07-19T07:42:55Z
项目社区:https://github.com/tangxunye/php-kafka-swoole

开源协议:

下载


PHP-Rdkafka Demo

环境依赖

Kafka高级别消费者(kafkaHighConsumer)使用

  1. include 'kafkaHighConsumer.php';
  2. $kafkaobj = new kafkaHighConsumer();
  3. $kafkaobj->Main(function () {
  4. $this->topics = ['lowtest'];
  5. $this->group_id = 'lowtest';
  6. }, function ($message) {
  7. echo 'offset:' . $message->offset . 'partition' . $message->partition . "\n";
  8. });
  • $this->topics 是你的要订阅的topic
  • $this->group_id 是你的分组名称
  • Main方法的第一个参数是用的闭包设置参数,第二个参数是获取到的消息进行处理的方法

多线程高级消费者(taskKafkaConsumer)使用

支持一次性开启多个单一分组的consumer,当然最好consumer的个数不能大于producers的个数

  1. include 'taskKafkaConsumer.php';
  2. $kafkaobj = new taskKafkaConsumer(function () {
  3. $this->topics = ['lowtest'];
  4. $this->group_id = 'lowtest';
  5. $this->task_worker_num = 2;
  6. }, function ($message) {
  7. echo 'offset:' . $message->offset . 'partition' . $message->partition . "\n";
  8. });
  • $this->topics 是你的要订阅的topic
  • $this->group_id 是你的分组名称
  • $this->task_worker_num 启动consumer的个数
  • Main方法的第一个参数是用的闭包设置参数,第二个参数是获取到的消息进行处理的方法

Kafka生产者(kafkaProducer)使用

  1. include 'kafkaProducer.php';
  2. $kafkaobj = new kafkaProducer();
  3. $kafkaobj->Main('msg', function () {
  4. $this->topic = 'lowtest';
  5. });
  • $this->topic 设置送消息的topic(单一)
  • Main方法的第一个参数是你要发送的消息,第二个是配置参数的匿名函数

配置文件(kafkaProducer)

需要建立一个配置文件,指定你的BORCKERS。

  1. define('KAFKA_BORCKERS', 'ip地址:端口');