TemplateMsgSendJob.php 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. <?php
  2. namespace App\Jobs;
  3. use App\Modules\OfficialAccount\Models\WechatTemplateMsgs;
  4. use App\Modules\OfficialAccount\Services\ForceSubscribeService;
  5. use Illuminate\Bus\Queueable;
  6. use Illuminate\Contracts\Queue\ShouldQueue;
  7. use Illuminate\Foundation\Bus\Dispatchable;
  8. use Illuminate\Queue\InteractsWithQueue;
  9. use Illuminate\Queue\SerializesModels;
  10. use Redis;
  11. class TemplateMsgSendJob implements ShouldQueue
  12. {
  13. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  14. protected $data;
  15. /**
  16. * Create a new job instance.
  17. *
  18. * @return void
  19. */
  20. public function __construct($data)
  21. {
  22. \Log::info('========初始化一个实例');
  23. $this->data = $data;
  24. }
  25. /**
  26. * Execute the job.
  27. *
  28. * @return void
  29. */
  30. public function handle()
  31. {
  32. try {
  33. $wechatTemplateMsgs = $this->data;
  34. $appid = $wechatTemplateMsgs['appid'];
  35. $template_id = $wechatTemplateMsgs['template_id'];
  36. $send_time = $wechatTemplateMsgs['send_time'];
  37. $template_content = $wechatTemplateMsgs['template_content'];
  38. $redirect_url = $wechatTemplateMsgs['redirect_url'];
  39. $task_id = $wechatTemplateMsgs['id'];
  40. $templateMsgs = WechatTemplateMsgs::wechatTemplateMsgsById($task_id);
  41. if (!$templateMsgs || $templateMsgs['del_flag'] == 1 || $templateMsgs['status'] == '4'){
  42. \Log::info('========模板消息已被停止或删除,不发送==========task_id = '.$wechatTemplateMsgs['id']);
  43. return;
  44. }else{
  45. \Log::info('========现将模板消息设置成发送状态 "8" 用户不能操作(删除,编辑,停止) = '.$wechatTemplateMsgs['id']);
  46. $templateMsgs['status'] = '8';
  47. $templateMsgs->save();
  48. $officialAccountArray = $templateMsgs->toArray();
  49. Redis::hset('send_wechat_msg:task_id:'.$templateMsgs['id'], 'wechat_msg', json_encode($officialAccountArray));
  50. }
  51. \Log::info('-----------------模板消息参数--------------appid = '.$appid.', channelid = '.$wechatTemplateMsgs['distribution_channel_id']);
  52. \Log::info('========发送的模板消息内容========== template_content');
  53. \Log::info($template_content);
  54. //1,通过筛选得到条件符合的强关用户
  55. $skip = 0;
  56. while ( $skip <= 2500000) {
  57. \Log::info('-------------模板消息-----分页取强关用户,目前页数----------'.$skip);
  58. $users = ForceSubscribeService::forceUserCountByTemplatePrams($appid,$wechatTemplateMsgs['distribution_channel_id'],$wechatTemplateMsgs['subscribe_time'],$wechatTemplateMsgs['sex'],$wechatTemplateMsgs['balance'],$wechatTemplateMsgs['category_id'],$wechatTemplateMsgs['order_type'],$skip);
  59. \Log::info('-----------------实际发送模板消息用户数量-------------'.count($users));
  60. if(count($users)>0){
  61. for ($i=0; $i < count($users); $i++) {
  62. //2,循环用户发送模板消息
  63. //发送到最后一条时,将
  64. if ($i == count($users)-1) {
  65. $data = array();
  66. $data['statusstr'] = (time());
  67. $data['openid'] = $users[$i]['openid'];
  68. $data['appid'] = $appid;
  69. $data['template_id'] = $template_id;
  70. //4,通过RebitMQ发送模板消息
  71. // $data['template_content'] = $templateSendContents;
  72. $data['template_content'] =str_replace(".DATA","",$template_content);
  73. $data['type'] = 'last_task';// last_task,one_task
  74. $data['url'] = $redirect_url;
  75. $data['task_id'] = $task_id;
  76. $data['send_time'] = $send_time;
  77. $send_data=array(
  78. 'send_time'=>$send_time,
  79. 'data' => $data
  80. );
  81. $now_time = strtotime($send_time)-time();
  82. $delay = $now_time;
  83. \Log::info('------------------发送最后一条模板消息----------task_id = '.$task_id.', 时间差 = '.$now_time);
  84. $job = (new SendTemplate($send_data))->onConnection('rabbitmq')->delay($delay)->onQueue('send_template_list');
  85. dispatch($job);
  86. }else{
  87. $data = array();
  88. $data['statusstr'] = (time());
  89. $data['openid'] = $users[$i]['openid'];
  90. $data['appid'] = $appid;
  91. $data['template_id'] = $template_id;
  92. \Log::info('========================[[[[send template message]]]]====================');
  93. \Log::info(str_replace(".DATA","",$template_content));
  94. $data['template_content'] =str_replace(".DATA","",$template_content);
  95. $data['type'] = 'one_task';// last_task,one_task
  96. $data['url'] = $redirect_url;
  97. $data['task_id'] = $task_id;
  98. $data['send_time'] = $send_time;
  99. $send_data=array(
  100. 'send_time'=>$send_time,
  101. 'data' => $data
  102. );
  103. $now_time = strtotime($send_time)-time();
  104. $delay = $now_time;
  105. \Log::info('========================发送模板消息 不是最后一条模板=================task_id = '.$task_id.'. openid = '.$data['openid'].', 时间差 = '.$delay);
  106. $job = (new SendTemplate($send_data))->onConnection('rabbitmq')->delay($delay)->onQueue('send_template_list');
  107. dispatch($job);
  108. }
  109. }
  110. }else{
  111. \Log::info('========================发送模板消息_没找到用户===================='.$task_id);
  112. break;
  113. }
  114. $skip += count($users);
  115. }
  116. //更新模板消息的用户数量并更新redis
  117. \Log::info('========================更新模板消息并存redis====================task_id = '.$task_id);
  118. $templateMsgs['user_num'] = $skip;
  119. $templateMsgs->save();
  120. $officialAccountArray = $templateMsgs->toArray();
  121. Redis::hset('send_wechat_msg:task_id:'.$templateMsgs['id'], 'wechat_msg', json_encode($officialAccountArray));
  122. } catch (\Exception $e) {
  123. \Log::info('========================循环用户发送模板消息时报错====================');
  124. \Log::info($e->getMessage());
  125. }
  126. }
  127. }