thinkphp 使用消息队列更新数据实例
这是一个请求数据并更新数据的实例,由于更新内容太大,一次性运行容易导致达到并发上限,所以使用队列处理
1.time.php 一个定时执行文件
<?php
namespace app\admin\controller;
use app\common\controller\AdminController;
use think\facade\Db;
use think\cache\driver\Redis;
use think\facade\Queue;
class Time extends AdminController
{
public function index()
{
ignore_user_abort();
set_time_limit(0);
$count = Db::name('analysis_patent_retrieval')
->count();
$page = bcdiv($count,'500')+1;
for ($i = 0;$i<$page;$i=$i+1){
// 延迟时间(秒)
$delay = 10;
dump($i);
$res = Queue::push("app\queue\AprQueue",['i'=>$i],'apr');
dump($res);
}
}
}
2.AprQueue.php 队列处理逻辑,在app下创建一个queue的文件,把这个队列执行php放进去
<?php
namespace app\queue;
use app\common\model\AnalysisPatentRetrieval;
use app\common\model\CustomRelate;
use app\common\model\Retrieve;
use Elasticsearch\ClientBuilder;
use think\facade\Db;
use think\queue\Job;
class AprQueue
{
public function fire(Job $job, $data){
$isDone = $this->doJob($data);
if ($isDone) {
// 任务执行成功 删除任务
$job->delete();
} else {
if ($job->attempts() > 3) {
// 任务重试3次后 删除任务
$job->delete();
}
}
}
/**
* 队列执行 业务处理
* @param $data
* @return bool
*/
private function doJob($data)
{
echo 'start11';
$i = $data['i'];
$data = Db::name('analysis_patent_retrieval')
->field('id,public_account')
->limit($i*500,500)
->select()
->toArray();
foreach ($data as $k => $v) {
//post请求数据并更新,这里就不写详细内容了,你可以根据自己的情况来写
}
echo 'end';
return true;
}
}
3.在config配置queue文件
return [
'default' => 'redis',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '11111111',
'select' => 6,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
4.在宝塔执行守护进程
5.然后把time.php放到定时任务执行就可以了