加入收藏 | 设为首页 | 会员中心 | 我要投稿 云计算网_泰州站长网 (http://www.0523zz.com/)- 视觉智能、AI应用、CDN、行业物联网、智能数字人!
当前位置: 首页 > 站长学院 > PHP教程 > 正文

PHP与RabbitMQ实现消息队列的完整代码

发布时间:2022-02-25 04:41:20 所属栏目:PHP教程 来源:互联网
导读:本篇文章给大家带来的内容是关于PHP和RabbitMQ实现消息队列的完整代码,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。 先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异. php扩展地址: http://pecl.php.
  本篇文章给大家带来的内容是关于PHP和RabbitMQ实现消息队列的完整代码,有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。
 
  先安装PHP对应的RabbitMQ,这里用的是 php_amqp 不同的扩展实现方式会有细微的差异.
 
  php扩展地址: http://pecl.php.net/package/amqp
 
  具体以官网为准 http://www.rabbitmq.com/getstarted.html
 
  介绍:
 
  config.php 配置信息
 
  BaseMQ.php MQ基类
 
  ProductMQ.php 生产者类
 
  ConsumerMQ.php 消费者类
 
  Consumer2MQ.php 消费者2(可有多个)
 
  config.php
 
  <?php
  
  return [
  
      //配置
  
      'host' => [
  
          'host' => '127.0.0.1',
  
          'port' => '5672',
  
          'login' => 'guest',
  
          'password' => 'guest',
  
          'vhost'=>'/',
  
      ],
  
      //交换机
  //Cuoxin.com
      'exchange'=>'word',
  
      //路由
  
      'routes' => [],
  
  ];
  BaseMQ.php
 
  <?php
  
  /**
  
   * Created by PhpStorm.
  
   * User: pc
  
   * Date: 2018/12/13
  
   * Time: 14:11
  
   */
  
  
  
  namespace MyObjSummary/rabbitMQ;
  
  
  
  /** Member
  
   *      AMQPChannel
  
   *      AMQPConnection
  
   *      AMQPEnvelope
  
   *      AMQPExchange
  
   *      AMQPQueue
  
   * Class BaseMQ
  
   * @package MyObjSummary/rabbitMQ
  
   */
  
  class BaseMQ
  
  {
  
      /** MQ Channel
  
       * @var /AMQPChannel
  
       */
  
      public $AMQPChannel ;
  
  
  
      /** MQ Link
  
       * @var /AMQPConnection
  
       */
  
      public $AMQPConnection ;
  
  
  
      /** MQ Envelope
  
       * @var /AMQPEnvelope
  
       */
  
      public $AMQPEnvelope ;
  
  
  
      /** MQ Exchange
  
       * @var /AMQPExchange
  
       */
  
      public $AMQPExchange ;
  
  
  
      /** MQ Queue
  
       * @var /AMQPQueue
  
       */
  
      public $AMQPQueue ;
  
  
  
      /** conf
  
       * @var
  
       */
  
      public $conf ;
  
  
  
      /** exchange
  
       * @var
  
       */
  
      public $exchange ;
  
  
  
      /** link
  
       * BaseMQ constructor.
  
       * @throws /AMQPConnectionException
  
       */
  
      public function __construct()
  
      {
  
          $conf =  require 'config.php' ;
  
          if(!$conf)
  
              throw new /AMQPConnectionException('config error!');
  
          $this->conf     = $conf['host'] ;
  
          $this->exchange = $conf['exchange'] ;
  
          $this->AMQPConnection = new /AMQPConnection($this->conf);
  
          if (!$this->AMQPConnection->connect())
  
              throw new /AMQPConnectionException("Cannot connect to the broker!/n");
  
      }
  
  
  
      /**
  
       * close link
  
       */
  
      public function close()
  
      {
  
          $this->AMQPConnection->disconnect();
  
      }
  
  
  
      /** Channel
  
       * @return /AMQPChannel
  
       * @throws /AMQPConnectionException
  
       */
  
      public function channel()
  
      {
  
          if(!$this->AMQPChannel) {
  
              $this->AMQPChannel =  new /AMQPChannel($this->AMQPConnection);
  
          }
  
          return $this->AMQPChannel;
  
      }
  
  
  
      /** Exchange
  
       * @return /AMQPExchange
  
       * @throws /AMQPConnectionException
  
       * @throws /AMQPExchangeException
  
       */
  
      public function exchange()
  
      {
  
          if(!$this->AMQPExchange) {
  
              $this->AMQPExchange = new /AMQPExchange($this->channel());
  
              $this->AMQPExchange->setName($this->exchange);
  
          }
  
          return $this->AMQPExchange ;
  
      }
  
  
  
      /** queue
  
       * @return /AMQPQueue
  
       * @throws /AMQPConnectionException
  
       * @throws /AMQPQueueException
  
       */
  
      public function queue()
  
      {
  
          if(!$this->AMQPQueue) {
  
              $this->AMQPQueue = new /AMQPQueue($this->channel());
  
          }
  
          return $this->AMQPQueue ;
  
      }
  
  
  
      /** Envelope
  
       * @return /AMQPEnvelope
  
       */
  
      public function envelope()
  
      {
  
          if(!$this->AMQPEnvelope) {
  
              $this->AMQPEnvelope = new /AMQPEnvelope();
  //Cuoxin.com
          }
  
          return $this->AMQPEnvelope;
  
      }
  
  }
  ProductMQ.php
 
  <?php
  
  //生产者 P
  
  namespace MyObjSummary/rabbitMQ;
  
  require 'BaseMQ.php';
  
  class ProductMQ extends BaseMQ
  
  {
  
      private $routes = ['hello','word']; //路由key
  
  
  
      /**
  
       * ProductMQ constructor.
  
       * @throws /AMQPConnectionException
  
       */
  
      public function __construct()
  
      {
  
         parent::__construct();
  
      }
  
  
  
      /** 只控制发送成功 不接受消费者是否收到
  
       * @throws /AMQPChannelException
  
       * @throws /AMQPConnectionException
  
       * @throws /AMQPExchangeException
  
       */
  
      public function run()
  
      {
  
          //频道
  
          $channel = $this->channel();
  
          //创建交换机对象
  
          $ex = $this->exchange();
  
          //消息内容
  
          $message = 'product message '.rand(1,99999);
  
          //开始事务
  
          $channel->startTransaction();
  
          $sendEd = true ;
  
          foreach ($this->routes as $route) {
  
              $sendEd = $ex->publish($message, $route) ;
  
              echo "Send Message:".$sendEd."/n";
  
          }
  
          if(!$sendEd) {
  
              $channel->rollbackTransaction();
  
          }
  
          $channel->commitTransaction(); //提交事务
  
          $this->close();
  
          die ;
  
      }
  
  }
  
  try{
  
      (new ProductMQ())->run();
  
  }catch (/Exception $exception){
  
      var_dump($exception->getMessage()) ;
  
  }
  ConsumerMQ.php
 
  <?php
  
  //消费者 C
  
  namespace MyObjSummary/rabbitMQ;
  
  require 'BaseMQ.php';
  
  class ConsumerMQ extends BaseMQ
  
  {
  
      private  $q_name = 'hello'; //队列名
  
      private  $route  = 'hello'; //路由key
  
  
  
      /**
  
       * ConsumerMQ constructor.
  
       * @throws /AMQPConnectionException
  
       */
  
      public function __construct()
  
      {
  
          parent::__construct();
  
      }
  
  
  
      /** 接受消息 如果终止 重连时会有消息
  
       * @throws /AMQPChannelException
  
       * @throws /AMQPConnectionException
  
       * @throws /AMQPExchangeException
  
       * @throws /AMQPQueueException
  
       */
  
      public function run()
  
      {
  
  
  
          //创建交换机
  
          $ex = $this->exchange();
  
          $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
  
          $ex->setFlags(AMQP_DURABLE); //持久化
  
          //echo "Exchange Status:".$ex->declare()."/n";
  
  
  
          //创建队列
  
          $q = $this->queue();
  
          //var_dump($q->declare());exit();
  
          $q->setName($this->q_name);
  
          $q->setFlags(AMQP_DURABLE); //持久化
  
          //echo "Message Total:".$q->declareQueue()."/n";
  
  
  
          //绑定交换机与队列,并指定路由键
  
          echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."/n";
  
  
  
          //阻塞模式接收消息
  
          echo "Message:/n";
  
          while(True){
  
              $q->consume(function ($envelope,$queue){
  
                  $msg = $envelope->getBody();
  
                  echo $msg."/n"; //处理消息
  
                  $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
  
              });
  
              //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
  
          }
  
          $this->close();
  
      }
  
  }
  
  try{
  
      (new ConsumerMQ)->run();
  
  }catch (/Exception $exception){
  
      var_dump($exception->getMessage()) ;
  
  }
 

(编辑:云计算网_泰州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读