介绍PHP实现生产者与消费者
发布时间:2022-07-19 14:03:13 所属栏目:PHP教程 来源:互联网
导读:前言 PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。 生产者(测试) 创建消费者需要步骤: 生产者配置参数 创建生产者实例 创建主题实例(依赖生产者) 生产主题消息 推送消
前言 PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。 生产者(测试) 创建消费者需要步骤: 生产者配置参数 创建生产者实例 创建主题实例(依赖生产者) 生产主题消息 推送消息 具体代码如下: $conf = new RdKafkaConf(); // 绑定服务节点 $conf->set('metadata.broker.list', '127.0.0.1:32772'); // 创建生产者 // 推送消息,如果不调用此函数,消息不会被发送且会丢失 $result = $kafka->flush(5000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new RuntimeException('Was unable to flush, messages might be lost!'); } 消费者 创建一个消费者需要几个步骤: 消费者配置参数 应用配置参数创建消费者实例 订阅对应主题 拉取数据 提交位移 具体代码如下: $conf = new RdKafkaConf(); // 绑定消费者组 $conf->set('group.id', 'ceshi'); // 绑定服务节点,多个用,分隔 $conf->set('metadata.broker.list', '127.0.0.1:32787'); // 设置自动提交为false $conf->set('enable.auto.commit', 'false'); // 设置当前消费者拉取数据时的偏移量, 可选参数: // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。 // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。 $conf->set('auto.offset.reset', 'earliest'); // 创建消费者实例 $consumer = new RdKafkaKafkaConsumer($conf); // 消费者订阅主题,数组形式 $consumer->subscribe(['topic1','topic2']); while (true) { // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环) $message = $consumer->consume(5000); switch ($message->err) { $consumer->commit($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for moren"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed outn"; break; default: throw new Exception($message->errstr(), $message->err); break; // 对消费者指定分区,注意此方式不能与subscribe一同使用 $consumer->assign([ new RdKafkaTopicPartition("topic", 0), new RdKafkaTopicPartition("topic", 1), ]); (编辑:云计算网_泰州站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |