send(); Log::info('KeepContinueReadV3 end at :'.date('Y-m-d H:i:s').',flag is :'.$str); } private function send(){ $client = new Client(); $channel_id = $this->getAllSite(); if(!$channel_id) return ; $requests = $this->start($channel_id); $pool = new Pool($client, $requests, [ 'concurrency' => 5, 'fulfilled' => function ($response, $index) { }, 'rejected' => function ($reason, $index) { }, ]); $promise = $pool->promise(); $promise->wait(); } private function getAllSite(){ $info = DB::connection('api_mysql')->table('custom_msg_switchs')->where('custom_category','continue_read')->first(); if(!$info) return []; $default_status = $info->default_switch_status; $site_list = explode(',',redisEnv('INNER_SITE')); $on_distribution_channel_id = []; foreach ($site_list as $distribution_channel_id){ $switch_info = DB::connection('api_mysql')->table('custom_msg_switchs_msgs') ->where('distribution_channel_id',$distribution_channel_id) ->where('custom_category','continue_read') ->select('status') ->first(); if($switch_info){ $switch = $switch_info->status; }else{ $switch = $default_status; } if($switch != 1) continue; $on_distribution_channel_id[] = $distribution_channel_id; } return $on_distribution_channel_id; } private function start($distribution_channel_ids){ $temp = 0; //$sites = $this->channelIds(); while (true){ $user = DB::connection('api_mysql')->table('temp_force_subscribe_users') ->select('id','uid','distribution_channel_id','openid','appid','last_interactive_time') ->where('id','>',$temp) ->whereIn('distribution_channel_id',$distribution_channel_ids) ->orderBy('id') ->limit(1000) ->get(); if(!$user) break; foreach ($user as $item){ $temp = $item->id; if( (time() - strtotime($item->last_interactive_time)) > 2*86400 ) continue; //if(!in_array($item->distribution_channel_id,$sites)) continue; $read_info = $this->getReadRecord($item->uid); if(empty($read_info['first']) || !isset($read_info['time']) || empty($read_info['time'])){ continue; } $is_access = $this->isAccess($item->uid,$read_info['time'],$now_role,$is_first); if(!$is_access) continue; $access_token = $this->getAccessToken($item->appid); if(!$access_token)continue; $url = 'https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token='.$access_token; $content = $this->content($item->uid,$item->distribution_channel_id,$read_info); $request = new GuzzleRequest('post',$url,[],\GuzzleHttp\json_encode([ 'touser'=>$item->openid, 'msgtype'=>'text', 'text'=>['content'=>$content] ],JSON_UNESCAPED_UNICODE)); if($is_first){ DB::connection('api_mysql')->table('custom_push_keep_continue_v2')->where('uid',$item->uid)->update([ 'times'=>$now_role, 'updated_at'=>date('Y-m-d H:i:s') ]); }else{ DB::connection('api_mysql')->table('custom_push_keep_continue_v2')->insert([ 'uid'=>$item->uid, 'times'=>$now_role, 'created_at'=>date('Y-m-d H:i:s'), 'updated_at'=>date('Y-m-d H:i:s') ]); } yield $request; } } } private function isAccess($uid,$last_read_time,&$now_role,&$is_first){ //获取用户上次发送的规则 $prev_send_info = DB::connection('api_mysql')->table('custom_push_keep_continue_v2')->where('uid',$uid)->select('updated_at','times')->orderBy('id','desc')->first(); //上次发送规则 $last_role = 0; $is_first = false; $last_send_time = 0; if($prev_send_info){ $is_first = true; $last_role = $prev_send_info->times; $last_send_time = strtotime($prev_send_info->updated_at); } $now_role = $last_role+1; if($now_role == 6) $now_role = 1; //上次更新时间,也就是上次发送时间 //第一次发送是根据用户离开阅读器的时间,即当前时间和用户上次阅读的时间比较是否查过规定的时间 if($now_role == 1){ if((time() - $last_read_time) < self::$role[$now_role] * 3600){ //小于发送时间 return false; } return true; } //是否到达检测时间,即上次发送时间加上当前规则所规定的时间,差距在半个小时以内就算到达 if(abs( time()-($last_send_time+self::$role[$now_role] * 3600)) > 1800){ //没有到达时间 return false; } //用户阅读时间是否符合条件 $time_diff = 0; foreach (self::$role as $k=>$t){ if($k <= $now_role){ $time_diff += $t; } } if( (time() - $last_read_time) >= $time_diff*3600){ //到了时间还是没阅读,就发送 return true; }else{ //时间节点遍历时若被打断,则重新开始计算 DB::connection('api_mysql')->table('custom_push_keep_continue_v2')->where('uid',$uid)->update([ 'times'=>0, 'updated_at'=>date('Y-m-d H:i:s') ]); return false; } } private function content($uid,$distribution_channel_id,$read_info){ $domain = sprintf('https://site%s.%s.com',encodeDistributionChannelId($distribution_channel_id), env('CUSTOM_HOST')); $user_info = DB::connection('api_mysql')->table('users')->where('id',$uid)->select('nickname')->first(); $nickname = '读者'; if($user_info && $user_info->nickname)$nickname = $user_info->nickname; $content_format = "@%s 为您推荐上次未看完的小说\r\n\r\n点击继续阅读❤\r\n\r\n"; $content = sprintf($content_format,$nickname,$domain.$read_info['first']['url']); if(!empty($read_info['seconds'])){ $content .= "历史阅读记录:\r\n\r\n"; foreach ($read_info['seconds'] as $record_item){ $content .= sprintf(" 🌳 %s\r\n",$domain.$record_item['url'],$record_item['book_name']); } } $content .= "\r\n为了方便下次阅读,请置顶公众号"; return $content; } private function getAccessToken($appid){ try{ $WechatController = new WechatController($appid); $accessToken = $WechatController->app->access_token; // EasyWeChat\Core\AccessToken 实例 $token = $accessToken->getToken(); // token 字符串 return $token; }catch(\Exception $e){ \Log::error($e->getMessage()); } return ''; } private function getReadRecord($uid){ $records = Redis::hgetall('book_read:' . $uid); $data = ['first'=>[],'seconds'=>[],'time'=>'']; foreach ($records as $k=>$item){ if($k == 'last_read'){ $record_arr = explode('_',$item); $bid = $record_arr[0]; //$book_name = $this->bid2BookName($bid); $bid = Hashids::encode($bid); $cid = $record_arr[1]; $time = $record_arr[2]; $data['first'] = [ 'url' => '/reader?bid='.$bid.'&cid='.$cid.'&fromtype=continue_read_v2', 'book_name'=>'', ]; $data['time'] = $time; continue; } if(!is_numeric($k)) continue; $record = explode('_', $item); $latest_read_cid = $record[0]; $book_name = self::bid2BookName($k); $latest_read_time = $record[count($record) - 1]; $data['seconds'][] =[ 'url' => '/reader?bid='.Hashids::encode($k).'&cid='.$latest_read_cid.'&fromtype=continue_read_v2', 'book_name'=>$book_name, 'time'=>$latest_read_time ]; } $temp = $data['seconds']; if($temp){ usort($temp, function ($a, $b) { if ($a['time'] >= $b['time']) return -1; return 1; }); } $temp_res = []; foreach ($temp as $k=>$it){ $temp_res[] = $it; if($k>=2) break; } $data['seconds'] = $temp_res; return $data; } private function bid2BookName($bid){ $book_name = null; if(is_null($book_name)){ $book_key = 'wap:string:book:'.$bid; $book_name = Redis::get($book_key); Redis::EXPIRE($book_key,3600); if(!$book_name){ $book_name = ''; $book_info = DB::connection('api_mysql')->table('book_configs')->where('bid',$bid)->select('book_name')->first(); //$book_info = BookConfigService::getBookById($bid); if($book_info && isset($book_info->book_name)){ $book_name = $book_info->book_name; Redis::setex($book_key,3600,$book_name); } } } return $book_name; } private function channelIds(){ $str = '5,123,14,13,8'; return explode(',',$str); } }