1、首先安装kafka扩展
目前成都创新互联已为成百上千家的企业提供了网站建设、域名、网站空间、网站托管、企业网站设计、滑县网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
#安装librdkafka: 版本: https://github.com/edenhill/librdkafka/releases/tag/v0.9.2 $ git clone https://github.com/edenhill/librdkafka.git $ ./configure $ make $ sudo make install #安装 rdkafka.so 版本:https://github.com/arnaud-lb/php-rdkafka/releases/tag/3.0.1 $ git clone https://github.com/arnaud-lb/php-rdkafka.git $ cd php-rdkafka $ phpize $ ./configure $ make all -j 5 $ sudo make install
2、生产者代码示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test'); //topicname
$cf = new RdKafka\TopicConf();
$cf->set('offset.store.method', 'broker');
$cf->set('auto.offset.reset', 'smallest');
$rk = new RdKafka\Producer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname
for($i = 0; $i < 10; $i++) {
$topic->produce(0,0,'test' . $i);
}
3、消费者代码示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test');
$rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion
$cf = new RdKafka\TopicConf();
$cf->set('auto.offset.reset', 'smallest');
$cf->set('auto.commit.enable', true);
$rk = new RdKafka\Consumer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname,topicobject
$topic->consumeStart(0,10); //partition,offset
$msg = $topic->consume(0, 1000); //partition,timeout
var_dump($msg);
网页题目:php连接kafka
浏览地址:http://scyingshan.cn/article/gccjch.html