加入收藏 | 设为首页 | 会员中心 | 我要投稿 云计算网_泰州站长网 (http://www.0523zz.com/)- 视觉智能、AI应用、CDN、行业物联网、智能数字人!
当前位置: 首页 > 站长学院 > PHP教程 > 正文

介绍PHP实现生产者与消费者

发布时间:2022-07-19 14:03:13 所属栏目:PHP教程 来源:互联网
导读:前言 PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。 生产者(测试) 创建消费者需要步骤: 生产者配置参数 创建生产者实例 创建主题实例(依赖生产者) 生产主题消息 推送消
  前言
  PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。
 
  生产者(测试)
  创建消费者需要步骤:
 
  生产者配置参数
  创建生产者实例
  创建主题实例(依赖生产者)
  生产主题消息
  推送消息
  具体代码如下:
 
 
  $conf = new RdKafkaConf();
 
  // 绑定服务节点
 
  $conf->set('metadata.broker.list', '127.0.0.1:32772');
 
   
 
  // 创建生产者
 
  // 推送消息,如果不调用此函数,消息不会被发送且会丢失
 
  $result = $kafka->flush(5000);
 
   
 
  if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
 
      throw new RuntimeException('Was unable to flush, messages might be lost!');
 
  }
 
  消费者
  创建一个消费者需要几个步骤:
 
  消费者配置参数
  应用配置参数创建消费者实例
  订阅对应主题
  拉取数据
  提交位移
  具体代码如下:
 
 
  $conf = new RdKafkaConf();
 
  // 绑定消费者组
 
  $conf->set('group.id', 'ceshi');
 
  // 绑定服务节点,多个用,分隔
 
  $conf->set('metadata.broker.list', '127.0.0.1:32787');
 
  // 设置自动提交为false
 
  $conf->set('enable.auto.commit', 'false');
 
  // 设置当前消费者拉取数据时的偏移量, 可选参数:
 
  // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。
 
  // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。
 
  $conf->set('auto.offset.reset', 'earliest');
 
   
 
  // 创建消费者实例
 
  $consumer = new RdKafkaKafkaConsumer($conf);
 
  // 消费者订阅主题,数组形式
 
  $consumer->subscribe(['topic1','topic2']);
 
  while (true) {
 
      // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环)
 
      $message = $consumer->consume(5000);
 
      switch ($message->err) {
 
              $consumer->commit($message);
 
              break;
 
          case RD_KAFKA_RESP_ERR__PARTITION_EOF:
 
              echo "No more messages; will wait for moren";
 
              break;
 
          case RD_KAFKA_RESP_ERR__TIMED_OUT:
 
              echo "Timed outn";
 
              break;
 
          default:
 
              throw new Exception($message->errstr(), $message->err);
 
              break;
 
 
  // 对消费者指定分区,注意此方式不能与subscribe一同使用
 
  $consumer->assign([
 
      new RdKafkaTopicPartition("topic", 0),
 
      new RdKafkaTopicPartition("topic", 1),
 
  ]);

(编辑:云计算网_泰州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读