KFMessageSend.php 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. <?php
  2. namespace App\Console\Commands\WechatPlatform;
  3. use App\Jobs\WechatPlatform\GZHSendKFMessage;
  4. use App\Service\Util\Support\Trace\TraceContext;
  5. use App\Service\WechatPlatform\WechatPlatformConstService;
  6. use Illuminate\Console\Command;
  7. use Illuminate\Support\Facades\DB;
  8. class KFMessageSend extends Command
  9. {
  10. /**
  11. * The name and signature of the console command.
  12. *
  13. * @var string
  14. */
  15. protected $signature = 'WechatPlatform:KFMessageSend {--pk= : wechat_kf_messages.id}';
  16. /**
  17. * The console command description.
  18. *
  19. * @var string
  20. */
  21. protected $description = 'Command description';
  22. /**
  23. * Execute the console command.
  24. */
  25. public function handle()
  26. {
  27. $pk = $this->option('pk');
  28. $traceContext = new TraceContext();
  29. $now = date('Y-m-d H:i:s');
  30. DB::table('wechat_kf_messages')
  31. ->where([
  32. ['status', '=', WechatPlatformConstService::KF_MESSAGE_STATUS_PRE_SEND],
  33. ['is_enabled', '=', 1],
  34. ['gzh_ids', '<>', '']
  35. ])->whereNotNull('send_at')
  36. ->when($pk, function ($query, $pk){
  37. return $query->where('id', $pk);
  38. }, function ($query) use ($now) {
  39. return $query->where('send_at', '<=', $now)
  40. ->where('send_at', '>', date('Y-m-d H:i:s', strtotime('-15 minute')));
  41. })->orderBy('id')
  42. ->chunk(100, function ($items) use ($traceContext, $now){
  43. DB::table('wechat_kf_messages')
  44. ->whereIn('id', $items->pluck('id'))
  45. ->update([
  46. 'status' => WechatPlatformConstService::KF_MESSAGE_STATUS_SENDING,
  47. 'updated_at' => $now
  48. ]);
  49. foreach ($items as $item) {
  50. myLog('KFMessageSend')->info('开始处理消息', [
  51. 'message_id' => $item->id,
  52. 'traceInfo' => $traceContext->getTraceInfo()
  53. ]);
  54. try {
  55. $gzhIds = explode('#', trim($item->gzh_ids, '#'));
  56. foreach ($gzhIds as $gzhId) {
  57. GZHSendKFMessage::dispatch([
  58. 'gzhId' => $gzhId,
  59. 'messageId' => $item->id,
  60. 'traceInfo' => $traceContext->getTraceInfo()
  61. ])->onConnection('queue-redis')
  62. ->onQueue('{duanju_manage}.wechatPlatform.sendKFMessage');
  63. }
  64. }catch (\Exception $exception) {
  65. myLog('KFMessageSend')->error('发送客服消息异常', [
  66. 'message_id' => $item->id,
  67. 'exceptionMsg' => $exception->getMessage(),
  68. 'traceInfo' => $traceContext->getTraceInfo()
  69. ]);
  70. }
  71. }
  72. DB::table('wechat_kf_messages')
  73. ->whereIn('id', $items->pluck('id'))
  74. ->update([
  75. 'status' => WechatPlatformConstService::KF_MESSAGE_STATUS_FINISH,
  76. 'updated_at' => $now
  77. ]);
  78. });
  79. }
  80. }