option('pk'); $traceContext = new TraceContext(); $now = date('Y-m-d H:i:s'); DB::table('wechat_kf_messages') ->where([ ['status', '=', WechatPlatformConstService::KF_MESSAGE_STATUS_PRE_SEND], ['is_enabled', '=', 1], ['gzh_ids', '<>', ''] ])->whereNotNull('send_at') ->when($pk, function ($query, $pk){ return $query->where('id', $pk); }, function ($query) use ($now) { return $query->where('send_at', '<=', $now) ->where('send_at', '>', date('Y-m-d H:i:s', strtotime('-15 minute'))); })->orderBy('id') ->chunk(100, function ($items) use ($traceContext, $now){ DB::table('wechat_kf_messages') ->whereIn('id', $items->pluck('id')) ->update([ 'status' => WechatPlatformConstService::KF_MESSAGE_STATUS_SENDING, 'updated_at' => $now ]); foreach ($items as $item) { myLog('KFMessageSend')->info('开始处理消息', [ 'message_id' => $item->id, 'traceInfo' => $traceContext->getTraceInfo() ]); try { $gzhIds = explode('#', trim($item->gzh_ids, '#')); foreach ($gzhIds as $gzhId) { GZHSendKFMessage::dispatch([ 'gzhId' => $gzhId, 'messageId' => $item->id, 'traceInfo' => $traceContext->getTraceInfo() ])->onConnection('queue-redis') ->onQueue('{duanju_manage}.wechatPlatform.sendKFMessage'); } }catch (\Exception $exception) { myLog('KFMessageSend')->error('发送客服消息异常', [ 'message_id' => $item->id, 'exceptionMsg' => $exception->getMessage(), 'traceInfo' => $traceContext->getTraceInfo() ]); } } DB::table('wechat_kf_messages') ->whereIn('id', $items->pluck('id')) ->update([ 'status' => WechatPlatformConstService::KF_MESSAGE_STATUS_FINISH, 'updated_at' => $now ]); }); } }