| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 | <?phpnamespace App\Console\Commands\WechatPlatform;use App\Jobs\WechatPlatform\GZHSendKFMessage;use App\Service\Util\Support\Trace\TraceContext;use App\Service\WechatPlatform\WechatPlatformConstService;use Illuminate\Console\Command;use Illuminate\Support\Facades\DB;class KFMessageSend extends Command{    /**     * The name and signature of the console command.     *     * @var string     */    protected $signature = 'WechatPlatform:KFMessageSend {--pk= : wechat_kf_messages.id}';    /**     * The console command description.     *     * @var string     */    protected $description = 'Command description';    /**     * Execute the console command.     */    public function handle()    {        $pk = $this->option('pk');        $traceContext = new TraceContext();        $now = date('Y-m-d H:i:s');        DB::table('wechat_kf_messages')            ->where([                ['status', '=', WechatPlatformConstService::KF_MESSAGE_STATUS_PRE_SEND],                ['is_enabled', '=', 1],                ['gzh_ids', '<>', '']            ])->whereNotNull('send_at')            ->when($pk, function ($query, $pk){                return $query->where('id', $pk);            }, function ($query) use ($now) {                return $query->where('send_at', '<=', $now)                    ->where('send_at', '>', date('Y-m-d H:i:s', strtotime('-15 minute')));            })->orderBy('id')            ->chunk(100, function ($items) use ($traceContext, $now){                DB::table('wechat_kf_messages')                    ->whereIn('id', $items->pluck('id'))                    ->update([                        'status'  => WechatPlatformConstService::KF_MESSAGE_STATUS_SENDING,                        'updated_at' => $now                    ]);                foreach ($items as $item) {                    myLog('KFMessageSend')->info('开始处理消息', [                        'message_id' => $item->id,                        'traceInfo' => $traceContext->getTraceInfo()                    ]);                    try {                        $gzhIds = explode('#', trim($item->gzh_ids, '#'));                        foreach ($gzhIds as $gzhId) {                            GZHSendKFMessage::dispatch([                                'gzhId' => $gzhId,                                'messageId' => $item->id,                                'traceInfo' => $traceContext->getTraceInfo()                            ])->onConnection('queue-redis')                                ->onQueue('{duanju_manage}.wechatPlatform.sendKFMessage');                        }                    }catch (\Exception $exception) {                        myLog('KFMessageSend')->error('发送客服消息异常', [                            'message_id' => $item->id,                            'exceptionMsg' => $exception->getMessage(),                            'traceInfo' => $traceContext->getTraceInfo()                        ]);                    }                }                DB::table('wechat_kf_messages')                    ->whereIn('id', $items->pluck('id'))                    ->update([                        'status'  => WechatPlatformConstService::KF_MESSAGE_STATUS_FINISH,                        'updated_at' => $now                    ]);            });    }}
 |