where('is_show',1) ->where('is_enable',1) ->select('id', 'distribution_channel_id', 'title', 'link', 'icon', 'desc', 'time_delay', 'mode', 'content', 'is_enable') ->get(); } public static function queue($distribution_channel_id,$appid,$openid){ $list = self::getList($distribution_channel_id); if($list->isEmpty())return ; foreach ($list as $item){ try{ if ($item->mode == 1) { $inset_data = [ 'openid'=>$openid, 'title'=>$item->title, 'link'=>$item->link, 'icon'=>$item->icon, 'desc'=>$item->desc, 'mode'=>1, 'appid'=>$appid, 'time_delay'=>$item->time_delay, 'distribution_channel_id'=>$distribution_channel_id ]; }elseif ($item->mode == 2) { $inset_data = [ 'openid'=>$openid, 'mode'=>2, 'content'=>$item->content, 'appid'=>$appid, 'time_delay'=>$item->time_delay, 'distribution_channel_id'=>$distribution_channel_id ]; } $inset_data['created_at'] = date('Y-m-d H:i:s'); $inset_data['updated_at'] = date('Y-m-d H:i:s'); $inset_data['predict_send_time'] = date('Y-m-d H:i:s',time()+$item->time_delay); $inset_data['send_status'] = 'ready'; //$id = DB::table('force_subscribe_delay_msg_send_record')->insertGetId($inset_data); if ($item->time_delay > 60) { DB::connection('api_mysql')->table('force_subscribe_delay_msg_send_record')->insertGetId($inset_data); }else { $inset_data['send_status'] = 'send_success'; $id = DB::connection('api_mysql')->table('force_subscribe_delay_msg_send_record')->insertGetId($inset_data); // 加入队列 $send_data = [ 'id'=>$id, 'openid'=>$openid, 'title'=>$item->title, 'link'=>$item->link, 'icon'=>$item->icon, 'desc'=>$item->desc, 'mode'=>$item->mode, 'appid'=>$appid, 'time_delay'=>$item->time_delay, 'distribution_channel_id'=>$distribution_channel_id, 'content'=>$item->content, ]; $job = (new ForceSubscribeDelayMsgJob($send_data))->onConnection('rabbitmq')->delay($item->time_delay)->onQueue('force_subscribe_delay_msg'); dispatch($job); } if(stripos($item->link,'?') !== false){ $link = $item->link.'&fromtype=subscribe_delay&send_time='.(time()+$item->time_delay); }else{ $link = $item->link.'?fromtype=subscribe_delay&send_time='.(time()+$item->time_delay); } // 原代码 $send_content = []; $send_content[] = [ ['title'=>textDecode($item->title)], ['description'=>textDecode($item->desc)], ['url'=> $link], ['image'=>$item->icon] ]; $send_data=array( 'send_time'=>date("Y-m-d H:i:s"), 'data' => [ 'openid'=>$openid, 'appid'=>$appid, 'type'=>'one_task', 'task_id'=>3, 'news_content'=>json_encode($send_content) ] ); // $job = (new SendNews($send_data))->onConnection('rabbitmq')->delay($item->time_delay)->onQueue('send_news_list'); // dispatch($job); }catch (\Exception $e){ } } } public static function dequeue($openid){ DB::connection('api_mysql')->table('force_subscribe_delay_msg_send_record')->where('openid',$openid) ->where('send_status','ready')->update([ 'send_status'=>'unsubscribe', 'updated_at'=>date('Y-m-d H:i:s') ]); } public static function getSendList($time_delay){ $result = DB::connection('api_mysql')->table('force_subscribe_delay_msg_send_record')->where('send_status','ready') ->where('predict_send_time','>=',date('Y-m-d H:i:s',time()-$time_delay*2)) ->where('predict_send_time','<=',date('Y-m-d H:i:s',time()+$time_delay)) ->select('openid','id','desc','link','icon','appid','title','distribution_channel_id','time_delay','mode','content') ->get(); if($result){ $client = new Client(); $access_token_list = []; foreach ($result as $item){ //用户类型是否满足条件 $is_access = self::msgUserTypeIsAccess($item->distribution_channel_id,$item->time_delay,$item->openid,$msg); if(!$is_access) { DB::connection('api_mysql')->table('force_subscribe_delay_msg_send_record')->where('id',$item->id) ->update([ 'send_status'=>$msg, 'send_time'=>date('Y-m-d H:i:s'), 'updated_at'=>date('Y-m-d H:i:s'), 'send_result'=>'' ]); continue; } if(isset($access_token_list[$item->appid])){ $access_token = $access_token_list[$item->appid]; }else{ $access_token = self::getAccessToken($item->appid); if($access_token){ $access_token_list[$item->appid] = $access_token; } } if ($item->mode == 1 && $item->link) { if(stripos($item->link,'?') !== false){ $link = $item->link.'&fromtype=subscribe_delay_'.$item->id; }else{ $link = $item->link.'?fromtype=subscribe_delay_'.$item->id; } } if($access_token){ if ($item->mode == 1) { $result = self::send($client,$access_token,$item->openid,textDecode($item->title),textDecode($item->desc),$link,$item->icon); }elseif ($item->mode == 2) { $nickname = DB::connection('api_mysql')->table('users')->where(['openid'=>$item->openid, 'distribution_channel_id'=>$item->distribution_channel_id])->value('nickname'); $content = textDecode(str_replace('{user}', $nickname, $item->content)); $result = self::sendMsg($client,$access_token,$item->openid,$content); $content = $nickname = null; } $result_array = \GuzzleHttp\json_decode($result,1); if(isset($result_array['errcode']) && $result_array['errcode'] == 0){ $send_status = 'send_success'; }else{ $send_status = 'send_fail'; } DB::connection('api_mysql')->table('force_subscribe_delay_msg_send_record')->where('id',$item->id) ->update([ 'send_status'=>$send_status, 'send_time'=>date('Y-m-d H:i:s'), 'updated_at'=>date('Y-m-d H:i:s'), 'send_result'=>$result ]); } } } } public static function msgUserTypeIsAccess($distribution_channel_id,$time_delay,$openid,&$msg){ $forceSubscribeDelayMsg = ForceSubscribeDelayMsg::where('distribution_channel_id',$distribution_channel_id) ->where('time_delay',$time_delay) ->where('is_show',1) ->where('is_enable',1) ->select('user_type') ->first(); $msg = ''; if(!$forceSubscribeDelayMsg) { $msg = 'sys-error'; return false; } if(strtoupper($forceSubscribeDelayMsg->user_type) == 'ALL') return true; //已付费用户 $force_subscribe_user = DB::connection('api_mysql')->table('temp_force_subscribe_users')->where('openid',$openid)->where('is_subscribed',1)->select('uid')->first(); // if(!$force_subscribe_user) { // $msg = 'no subscribe'; // return false; // } // if(!$force_subscribe_user->uid) { // $msg = 'no subscribe uid'; // return false; // } if(strtoupper($forceSubscribeDelayMsg->user_type) == 'PAID'){ if (isset($force_subscribe_user->uid) && !empty($force_subscribe_user->uid)) { $paid_count = DB::connection('api_mysql')->table('orders')->where('uid',$force_subscribe_user->uid)->where('status','PAID')->count(); if($paid_count && $paid_count >0) return true; $msg = 'need paid'; return false; } else { return false; } } //未已付费用户 if(strtoupper($forceSubscribeDelayMsg->user_type) == 'UNPAID'){ if (isset($force_subscribe_user->uid) && !empty($force_subscribe_user->uid)) { $paid_count = DB::connection('api_mysql')->table('orders')->where('uid',$force_subscribe_user->uid)->where('status','PAID')->count(); if($paid_count && $paid_count >0) { $msg = 'need unpaid'; return false; } return true; }elseif ($force_subscribe_user) { return true; }else { return false; } } $msg = 'unknown type'; return false; } private static function getAccessToken($appid){ try{ $WechatController = new WechatController($appid); $accessToken = $WechatController->app->access_token; $token = $accessToken->getToken(); return $token; }catch(\Exception $e){ \Log::error($e); } return ''; } private static function send(Client $client,$access_token,$openid,$title,$description,$url,$picurl){ $push_url = 'https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token='.$access_token; $request = new GuzzleRequest('post',$push_url,[],\GuzzleHttp\json_encode([ 'touser'=>$openid, 'msgtype'=>'news', 'news'=>[ 'articles'=>[compact('title','description','url','picurl')] ] ],JSON_UNESCAPED_UNICODE)); try{ return $client->send($request)->getBody()->getContents(); }catch (\Exception $e){} return ''; } // 发送文字消息 private static function sendMsg(Client $client,$access_token,$openid,$content){ $push_url = 'https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token='.$access_token; $request = new GuzzleRequest('post',$push_url,[],\GuzzleHttp\json_encode([ 'touser'=>$openid, 'msgtype'=>'text', 'text'=>[ 'content'=>$content ] ],JSON_UNESCAPED_UNICODE)); try{ return $client->send($request)->getBody()->getContents(); }catch (\Exception $e){} return ''; } }