加入收藏 | 设为首页 | 会员中心 | 我要投稿 云计算网_泰州站长网 (http://www.0523zz.com/)- 视觉智能、AI应用、CDN、行业物联网、智能数字人!
当前位置: 首页 > 站长学院 > PHP教程 > 正文

php中socket服务的模型下的编程方法

发布时间:2022-02-23 18:59:40 所属栏目:PHP教程 来源:互联网
导读:前面我们花了一段时间来搭建高性能的socket服务,可以同时处理大量的连接,但这是在没有具体业务的情况下。 如果我们启用了一个单进程的server,但里面的一个业务耗时1秒,那么在这1秒内是阻塞的,后续的请求会等待,如果并发三个请求,那么三个请求的执行时
  前面我们花了一段时间来搭建高性能的socket服务,可以同时处理大量的连接,但这是在没有具体业务的情况下。
 
  如果我们启用了一个单进程的server,但里面的一个业务耗时1秒,那么在这1秒内是阻塞的,后续的请求会等待,如果并发三个请求,那么三个请求的执行时间会分别昌1秒,2秒,3秒.提高并发的方法有以下几种:
 
  1:多启动进程,提高并发数
 
  2:优化业务,减少耗时间相当于减少阻塞时间,提高并发数
 
  3:异步编程,避免阻塞,提高并发数
 
  这里我们重点介绍第三种方法,以访问第三方http为例。
 
  代码如下:
 
  //同步读取
  function get_data_blocking(){
      $socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6);
      fwrite($socket, "GET /sleep1.php HTTP/1.0/r/nHost: test.raventech.cn/r/nAccept: */*/r/n/r/n");
      $str = "";
      while (!feof($socket)) {
          $str .= fgets($socket, 1024);
      }
      fclose($socket);
      return $str;
  }
  
  //异步读取
  function get_data_unblocking(){
      $socket = stream_socket_client("tcp://test.raventech.cn:80", $errno, $errstr, 6);
      stream_set_blocking($socket, 0);
      fwrite($socket, "GET /sleep1.php HTTP/1.0/r/nHost: test.raventech.cn/r/nAccept: */*/r/n/r/n");
      $write  = NULL;
      $except = NULL;
      while( $socket ){
          $read   = array($socket);
          $num_changed_streams = stream_select($read, $write, $except, 0);
          if ( $num_changed_streams > 0 ) {
              foreach($read as $r){
                  $str = fread($r,2048);
                  fclose($socket);
                  $socket = false;
                  return $str;
              }
          }
          usleep(100);
      }
  }
  
  //真正的异步读取--利用server的IO复用事件来提高并发
  class Get_data_event{
  
      public $onMessage = null;
      private $str='';
  
      function __construct(&$server){
          $socket = stream_socket_client("tcp://test.xtgxiso.cn:80", $errno, $errstr, 6);
          stream_set_blocking($socket, 0);
          fwrite($socket, "GET /sleep1.php HTTP/1.0/r/nHost: test.xtgxiso.cn/r/nAccept: */*/r/n/r/n");
          $server->add_socket($socket, array($this, 'read'));
      }
  
      public function read($socket){
          while (1) {
              $buffer = fread($socket, 1024);
              if ($buffer === '' || $buffer === false) {
                  break;
              }
              $this->str .= $buffer;
          }
          if( $this->onMessage && $this->str ) {
              call_user_func($this->onMessage, $this->str);
          }
          $this->str = '';
          return false;
      }
  
  }
  
  /**
   * 单进程IO复用select
   */
  class Xtgxiso_server
  {
      public $socket = false;
      public $master = array();
      public $onConnect = null;
      public $onMessage = null;
      public $other_socket_callback = array();
  
      function __construct($host="0.0.0.0",$port=1215)
      {
          $this->socket = stream_socket_server("tcp://".$host.":".$port,$errno, $errstr);
          if (!$this->socket) die($errstr."--".$errno);
          stream_set_blocking($this->socket,0);
          $id = (int)$this->socket;
          $this->master[$id] = $this->socket;
      }
  
      public function add_socket($socket,$callback){
          $id = (int)$socket;
          $this->master[$id] = $socket;
          $this->other_socket_callback[$id] = $callback;
      }
  
      public function run(){
          $read = $this->master;
          $receive = array();
          echo  "start run.../n";
          while ( 1 ) {
              $read = $this->master;
              //echo  "waiting.../n";
              $mod_fd = @stream_select($read, $_w = NULL, $_e = NULL, 60);
              if ($mod_fd === FALSE) {
                  break;
              }
              foreach ( $read as $k => $v ) {
                  $id = (int)$v;
                  if ( $v === $this->socket ) {
                      //echo "new conn/n";
                      $conn = stream_socket_accept($this->socket);
                      if ($this->onConnect) {
                          call_user_func($this->onConnect, $conn);
                      }
                      $id = (int)$conn;
                      $this->master[$id] = $conn;
                  } else if ( @$this->other_socket_callback[$id] ){
                      call_user_func_array($this->other_socket_callback[$id], array($v));
                  } else {
                      //echo "read data/n";
                      if ( !isset($receive[$k]) ){
                          $receive[$k]="";
                      }
                      $buffer = fread($v, 1024);
                      //echo $buffer."/n";
                      if ( strlen($buffer) === 0 ) {
                          if ( $this->onClose ){
                              call_user_func($this->onClose,$v);
                          }
                          fclose($v);
                          $id = (int)$v;
                          unset($this->master[$id]);
                      } else if ( $buffer === FALSE ) {
                          if ( $this->onClose ){
                              call_user_func($this->onClose, $this->master[$key_to_del]);
                          }
                          fclose($v);
                          $id = (int)$v;
                          unset($this->master[$id]);
                      } else {
                          $pos = strpos($buffer, "/r/n/r/n");
                          if ( $pos === false) {
                              $receive[$k] .= $buffer;
                              //echo "received:".$buffer.";not all package,continue recdiveing/n";
                          }else{
                              $receive[$k] .= trim(substr ($buffer,0,$pos+4));
                              $buffer = substr($buffer,$pos+4);
                              if($this->onMessage) {
                                  call_user_func($this->onMessage,$v,$receive[$k]);
                              }
                              $receive[$k]='';
                          }
                      }
                  }
              }
              usleep(10000);
          }
      }
  }
  
  
  $server =  new Xtgxiso_server();
  
  $server->onConnect = function($conn){
      echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "/n";
  };
   
  $server->onClose = function($conn){
      echo "onClose --" . "/n";
  };
  
  $server->run();
  第三方服务sleep1.php的代码比较简单:
 
  sleep(1);//模拟耗时
  echo "OK";
  通过以上代码示例,我们分别注释运行 同步读取,异步读取,真正异步,来观察server的并发.测试方法可以写个test.html来模拟三个并发.
 
  <script src="http://127.0.0.1:1215/?id=1"></script>
  <script src="http://127.0.0.1:1215/?id=2"></script>
  <script src="http://127.0.0.1:1215/?id=3"></script>
  通过测试发现,真正异步的是并发的,每个请求耗时1秒,这样我们总算明白什么是真正的非阻塞异步编程了,关键就在共用IO复用.

(编辑:云计算网_泰州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    热点阅读