php 操作最新版本的kafka
include 'kafkaHighConsumer.php';
$kafkaobj = new kafkaHighConsumer();
$kafkaobj->Main(function () {
$this->topics = ['lowtest'];
$this->group_id = 'lowtest';
}, function ($message) {
echo 'offset:' . $message->offset . 'partition' . $message->partition . "\n";
});
支持一次性开启多个单一分组的consumer,当然最好consumer的个数不能大于producers的个数
include 'taskKafkaConsumer.php';
$kafkaobj = new taskKafkaConsumer(function () {
$this->topics = ['lowtest'];
$this->group_id = 'lowtest';
$this->task_worker_num = 2;
}, function ($message) {
echo 'offset:' . $message->offset . 'partition' . $message->partition . "\n";
});
include 'kafkaProducer.php';
$kafkaobj = new kafkaProducer();
$kafkaobj->Main('msg', function () {
$this->topic = 'lowtest';
});
需要建立一个配置文件,指定你的BORCKERS。
define('KAFKA_BORCKERS', 'ip地址:端口');