PHP消息队列之使用mysql存放队列

5887次阅读 286人点赞 作者: WuBin 发布时间: 2023-10-09 16:15:19
扫码到手机查看

解耦分析

视频课程地址:https://www.imooc.com/video/15165

首先通过图片来分析:

这里使用两个表,一个订单系统,专门处理订单,此时标记订单状态为未处理;

然后使用服务器的定时任务,每分钟执行一次,去处理队列中的状态为未处理的条,并将其状态改为处理中;

最后当,配送系统完成后,将该条状态改为处理完成。

如此便实现了解耦,避免某一个系统出现问题,从而也要修改另外一个系统。

首先创建一个数据表:

use mytest;
CREATE TABLE `order_queue` (
    `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '订单的id号',
    `order_id` int(11) NOT NULL,
    `mobile` varchar(20) NOT NULL COMMENT '用户的手机号',
    `address` varchar(100) NOT NULL COMMENT '用户的地址',
    `created_at` datetime NOT NULL COMMENT '订单创建的时间',
    `updated_at` datetime NOT NULL COMMENT '处理完成的时间',
    `status` tinyint(2) NOT NULL COMMENT '当前状态,0未处理,1已处理,2处理中',
    PRIMARY KEY(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

封装一个基本的数据库类

数据库连接类db.php:https://blog.csdn.net/u011415782/article/details/77864102

<?php
// 数据库连接类
class DB{
  //私有的属性
  private static $dbcon=false;
  private $host;
  private $port;
  private $user;
  private $pass;
  private $db;
  private $charset;
  private $link;
  //私有的构造方法
  private function __construct(){
    $this->host =  'localhost';
    $this->port =  '3306';
    $this->user =  'wubin';
    $this->pass =  'www.wubin.work';
    $this->db =  'mytest';
    $this->charset= 'utf8';
    //连接数据库
    $this->db_connect();
    //选择数据库
    $this->db_usedb();
    //设置字符集
    $this->db_charset();
   }
   //连接数据库
   private function db_connect(){
    $this->link=mysqli_connect($this->host.':'.$this->port,$this->user,$this->pass);
    if(!$this->link){
      echo "数据库连接失败<br>";
      echo "错误编码".mysqli_errno($this->link)."<br>";
      echo "错误信息".mysqli_error($this->link)."<br>";
      exit;
    }
   }
   //设置字符集
    private function db_charset(){
     mysqli_query($this->link,"set names {$this->charset}");
    }
    //选择数据库
   private function db_usedb(){
     mysqli_query($this->link,"use {$this->db}");
   }
   //私有的克隆
   private function __clone(){
     die('clone is not allowed');
   }
   //公用的静态方法
   public static function getIntance(){
     if(self::$dbcon==false){
      self::$dbcon=new self;
     }
     return self::$dbcon;
   }
   //执行sql语句的方法
    public function query($sql){
     $res=mysqli_query($this->link,$sql);
     if(!$res){
      echo "sql语句执行失败<br>";
      echo "错误编码是".mysqli_errno($this->link)."<br>";
      echo "错误信息是".mysqli_error($this->link)."<br>";
     }
     return $res;
   }
    //获得最后一条记录id
    public function getInsertid(){
     return mysqli_insert_id($this->link);
    }
   /**
    * 查询某个字段
    * @param
    * @return string or int
    */
    public function getOne($sql){
     $query=$this->query($sql);
      return mysqli_free_result($query);
    }
    //获取一行记录,return array 一维数组
    public function getRow($sql,$type="assoc"){
     $query=$this->query($sql);
     if(!in_array($type,array("assoc",'array',"row"))){
       die("mysqli_query error");
     }
     $funcname="mysqli_fetch_".$type;
     return $funcname($query);
    }
    //获取一条记录,前置条件通过资源获取一条记录
    public function getFormSource($query,$type="assoc"){
    if(!in_array($type,array("assoc","array","row")))
    {
      die("mysqli_query error");
    }
    $funcname="mysqli_fetch_".$type;
    return $funcname($query);
    }
    //获取多条数据,二维数组
    public function getAll($sql){
     $query=$this->query($sql);
     $list=array();
     while ($r=$this->getFormSource($query)) {
      $list[]=$r;
     }
     return $list;
    }

    public function selectAll($table,$where,$fields='*',$order='',$skip=0,$limit=1000)
    {
              if(is_array($where)){
                    foreach ($where as $key => $val) {
                        if (is_numeric($val)) {
                            $condition = $key.'='.$val;
                        }else{
                            $condition = $key.'=\"'.$val.'\"';
                        }
                    }
              } else {
                $condition = $where;
              }
              if (!empty($order)) {
                  $order = " order by ".$order;
              }
              $sql = "select $fields from $table where $condition $order limit $skip,$limit";
              $query = $this->query($sql);
              $list = array();
              while ($r= $this->getFormSource($query)) {
                  $list[] = $r;
              }
              return $list;
    }
     /**
     * 定义添加数据的方法
     * @param string $table 表名
     * @param string orarray $data [数据]
     * @return int 最新添加的id
     */
     public function insert($table,$data){
     //遍历数组,得到每一个字段和字段的值
     $key_str='';
     $v_str='';
     foreach($data as $key=>$v){
     //  if(empty($v)){
     //   die("error");
     // }
        //$key的值是每一个字段s一个字段所对应的值
        $key_str.=$key.',';
        $v_str.="'$v',";
     }
     $key_str=trim($key_str,',');
     $v_str=trim($v_str,',');
     //判断数据是否为空
     $sql="insert into $table ($key_str) values ($v_str)";
     $this->query($sql);
    //返回上一次增加操做产生ID值
     return $this->getInsertid();
   }
   /*
    * 删除一条数据方法
    * @param1 $table, $where=array('id'=>'1') 表名 条件
    * @return 受影响的行数
    */
    public function deleteOne($table, $where){
      if(is_array($where)){
        foreach ($where as $key => $val) {
          $condition = $key.'='.$val;
        }
      } else {
        $condition = $where;
      }
      $sql = "delete from $table where $condition";
      $this->query($sql);
      //返回受影响的行数
      return mysqli_affected_rows($this->link);
    }
    /*
    * 删除多条数据方法
    * @param1 $table, $where 表名 条件
    * @return 受影响的行数
    */
    public function deleteAll($table, $where){
      if(is_array($where)){
        foreach ($where as $key => $val) {
          if(is_array($val)){
            $condition = $key.' in ('.implode(',', $val) .')';
          } else {
            $condition = $key. '=' .$val;
          }
        }
      } else {
        $condition = $where;
      }
      $sql = "delete from $table where $condition";
      $this->query($sql);
      //返回受影响的行数
      return mysqli_affected_rows($this->link);
    }
   /**
    * [修改操作description]
    * @param [type] $table [表名]
    * @param [type] $data [数据]
    * @param [type] $where [条件]
    * @return [type]
    */
   public function update($table,$data,$where,$limit=0){
     //遍历数组,得到每一个字段和字段的值
     $str='';
    foreach($data as $key=>$v){
     $str.="$key='$v',";
    }
    $str=rtrim($str,',');
      if(is_array($where)){
        foreach ($where as $key => $val) {
          if(is_array($val)){
            $condition = $key.' in ('.implode(',', $val) .')';
          } else {
            $condition = $key. '=' .$val;
          }
        }
      } else {
        $condition = $where;
      }

        if (!empty($limit)) {
            $limit = " limit ".$limit;
        }else{
            $limit='';
        }
    //修改SQL语句
    $sql="update $table set $str where $condition $limit";
    $this->query($sql);
    //返回受影响的行数
    return mysqli_affected_rows($this->link);
   }
}
?>

模拟订单系统

order.php

<?php
// 这个文件是用来接受用户订单信息并写入队列的一个文件
include 'db.php';
// 对用户数据进行过滤

if(!empty($_GET['mobile'])) {
    // 这里应该首先是订单中心的处理流程...最终生成一个订单号

    // 这里假设操作成功生成了一个订单号
    $order_id = rand(10000, 99999);

    // 将生成的订单信息存入队列表中
    $insert_data = array(
        'order_id' => $order_id ,
        'mobile' => $_GET['mobile'],
        'created_at' => date('Y-m-d H:i:s', time()),
        'status' => 0,
    );

    // 把数据存放到队列表中
    $db = DB::getIntance();
    $res = $db->insert('order_queue', $insert_data);
    if($res) {
        echo $insert_data['order_id'] . "保存成功";
    } else {
        echo "保存失败";
    }

}

goods.php商品系统

<?php
// 配送系统处理队列中的订单并进行标记的文件 配送系统一般会有一个处理时间

include 'db.php';
$db = DB::getIntance();

//  * 1 先把要处理的记录更新为等待处理,这是一个锁的设定,避免其他程序对此数据进行处理
// 定义要锁定的数据 2是正在处理中的状态
// 将未处理的状态都改为等待处理状态
$waiting = array('status' => 0); // 等待处理的数据
$lock = array('status' => 2);
$res_lock = $db->update(
    'order_queue',
    $lock,
    $waiting,
    2 // 每次就处理2条
);

//  * 2 我们要选择出刚刚更新的这些数据,然后进行配送系统的处理
if($res_lock) {
    $res = $db->selectAll(
        'order_queue',
        $lock
    );
    // 选择出要处理的订单内容
    // ...
    // 然后由配货系统进行退货处理

    // 处理完成后把订单更新为已处理
    $success = array(
        'status' => 1, // 状态更新成已完成
        'updated_at' => date('Y-m-d H:i:s', time())
    );
    $res_last = $db->update('order_queue', $success, $lock);
    if($res_last) {
        echo 'success' . $res_last;
    } else {
        echo 'fail' . $res_last;
    }
} else {
    echo '全部已经完成';
}

//  * 3 吧这些处理过的程序更新为已完成

定时任务

#!/bin/bash

date "+%G-%m-%d %H:%M:S"
cd /home/pi/sites/imooc/queue_mysql/
php goods.php

# 部署方式
linux使用$ crontab -e 执行定时任务
m分 h时 dom日 mon月 dow周 command对应的命令
*/1 * * * * /home/pi/sites/imooc/queue_mysql/good.sh >> /home/pi/sites/imooc/queue_mysql/log.log 2>&1
*/1每分钟执行一次,*每月执行一次 ,cammand 写路径代表执行这个地址的文件 linux区分大小写 
>>后面操作是记录执行结果
2>&1 错误输出转化为标准输出
点赞 支持一下 觉得不错?客官您就稍微鼓励一下吧!
关键词:消息队列
推荐阅读
  • uniapp实现被浏览器唤起的功能

    当用户打开h5链接时候,点击打开app若用户在已经安装过app的情况下直接打开app,若未安装过跳到应用市场下载安装这个功能在实现上主要分为两种场景,从普通浏览器唤醒以及从微信唤醒。

    8186次阅读 521人点赞 发布时间: 2022-12-14 16:34:53 立即查看
  • Vue

    盘点Vue2和Vue3的10种组件通信方式

    Vue中组件通信方式有很多,其中Vue2和Vue3实现起来也会有很多差异;本文将通过选项式API组合式API以及setup三种不同实现方式全面介绍Vue2和Vue3的组件通信方式。

    3282次阅读 241人点赞 发布时间: 2022-08-19 09:40:16 立即查看
  • JS

    几个高级前端常用的API

    推荐4个前端开发中常用的高端API,分别是MutationObserver、IntersectionObserver、getComputedstyle、getBoundingClientRect、requ...

    13228次阅读 854人点赞 发布时间: 2021-11-11 09:39:54 立即查看
  • PHP

    【正则】一些常用的正则表达式总结

    在日常开发中,正则表达式是非常有用的,正则表达式在每个语言中都是可以使用的,他就跟JSON一样,是通用的。了解一些常用的正则表达式,能大大提高你的工作效率。

    12104次阅读 385人点赞 发布时间: 2021-10-09 15:58:58 立即查看
  • 【中文】免费可商用字体下载与考证

    65款免费、可商用、无任何限制中文字体打包下载,这些字体都是经过长期验证,经得住市场考验的,让您规避被无良厂商起诉的风险。

    10347次阅读 829人点赞 发布时间: 2021-07-05 15:28:45 立即查看
  • Vue

    Vue3开发一个v-loading的自定义指令

    在vue3中实现一个自定义的指令,有助于我们简化开发,简化复用,通过一个指令的调用即可实现一些可高度复用的交互。

    14248次阅读 1132人点赞 发布时间: 2021-07-02 15:58:35 立即查看
  • JS

    关于手机上滚动穿透问题的解决

    当页面出现浮层的时候,滑动浮层的内容,正常情况下预期应该是浮层下边的内容不会滚动;然而事实并非如此。在PC上使用css即可解决,但是在手机端,情况就变的比较复杂,就需要禁止触摸事件才可以。

    14180次阅读 1156人点赞 发布时间: 2021-05-31 09:25:50 立即查看
  • Vue

    Vue+html2canvas截图空白的问题

    在使用vue做信网单页专题时,有海报生成的功能,这里推荐2个插件:一个是html2canvas,构造好DOM然后转canvas进行截图;另外使用vue-canvas-poster(这个截止到2021年3月...

    27073次阅读 2131人点赞 发布时间: 2021-03-02 09:04:51 立即查看
  • Vue

    vue-router4过度动画无效解决方案

    在初次使用vue3+vue-router4时候,先后遇到了过度动画transition进入和退出分别无效的情况,搜遍百度没没找到合适解决方法,包括vue-route4有一些API都进行了变化,以前的一些操...

    23237次阅读 1791人点赞 发布时间: 2021-02-23 13:37:20 立即查看
交流 收藏 目录