send();
Log::info('KeepContinueReadV3 end at :'.date('Y-m-d H:i:s').',flag is :'.$str);
}
private function send(){
$client = new Client();
$channel_id = $this->getAllSite();
if(!$channel_id) return ;
$requests = $this->start($channel_id);
$pool = new Pool($client, $requests, [
'concurrency' => 5,
'fulfilled' => function ($response, $index) {
},
'rejected' => function ($reason, $index) {
},
]);
$promise = $pool->promise();
$promise->wait();
}
private function getAllSite(){
$info = DB::connection('api_mysql')->table('custom_msg_switchs')->where('custom_category','continue_read')->first();
if(!$info) return [];
$default_status = $info->default_switch_status;
$site_list = explode(',',redisEnv('INNER_SITE'));
$on_distribution_channel_id = [];
foreach ($site_list as $distribution_channel_id){
$switch_info = DB::connection('api_mysql')->table('custom_msg_switchs_msgs')
->where('distribution_channel_id',$distribution_channel_id)
->where('custom_category','continue_read')
->select('status')
->first();
if($switch_info){
$switch = $switch_info->status;
}else{
$switch = $default_status;
}
if($switch != 1) continue;
$on_distribution_channel_id[] = $distribution_channel_id;
}
return $on_distribution_channel_id;
}
private function start($distribution_channel_ids){
$temp = 0;
//$sites = $this->channelIds();
while (true){
$user = DB::connection('api_mysql')->table('temp_force_subscribe_users')
->select('id','uid','distribution_channel_id','openid','appid','last_interactive_time')
->where('id','>',$temp)
->whereIn('distribution_channel_id',$distribution_channel_ids)
->orderBy('id')
->limit(1000)
->get();
if(!$user) break;
foreach ($user as $item){
$temp = $item->id;
if( (time() - strtotime($item->last_interactive_time)) > 2*86400 ) continue;
//if(!in_array($item->distribution_channel_id,$sites)) continue;
$read_info = $this->getReadRecord($item->uid);
if(empty($read_info['first']) || !isset($read_info['time']) || empty($read_info['time'])){
continue;
}
$is_access = $this->isAccess($item->uid,$read_info['time'],$now_role,$is_first);
if(!$is_access) continue;
$access_token = $this->getAccessToken($item->appid);
if(!$access_token)continue;
$url = 'https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token='.$access_token;
$content = $this->content($item->uid,$item->distribution_channel_id,$read_info);
$request = new GuzzleRequest('post',$url,[],\GuzzleHttp\json_encode([
'touser'=>$item->openid,
'msgtype'=>'text',
'text'=>['content'=>$content]
],JSON_UNESCAPED_UNICODE));
if($is_first){
DB::connection('api_mysql')->table('custom_push_keep_continue_v2')->where('uid',$item->uid)->update([
'times'=>$now_role,
'updated_at'=>date('Y-m-d H:i:s')
]);
}else{
DB::connection('api_mysql')->table('custom_push_keep_continue_v2')->insert([
'uid'=>$item->uid,
'times'=>$now_role,
'created_at'=>date('Y-m-d H:i:s'),
'updated_at'=>date('Y-m-d H:i:s')
]);
}
yield $request;
}
}
}
private function isAccess($uid,$last_read_time,&$now_role,&$is_first){
//获取用户上次发送的规则
$prev_send_info = DB::connection('api_mysql')->table('custom_push_keep_continue_v2')->where('uid',$uid)->select('updated_at','times')->orderBy('id','desc')->first();
//上次发送规则
$last_role = 0;
$is_first = false;
$last_send_time = 0;
if($prev_send_info){
$is_first = true;
$last_role = $prev_send_info->times;
$last_send_time = strtotime($prev_send_info->updated_at);
}
$now_role = $last_role+1;
if($now_role == 6) $now_role = 1;
//上次更新时间,也就是上次发送时间
//第一次发送是根据用户离开阅读器的时间,即当前时间和用户上次阅读的时间比较是否查过规定的时间
if($now_role == 1){
if((time() - $last_read_time) < self::$role[$now_role] * 3600){
//小于发送时间
return false;
}
return true;
}
//是否到达检测时间,即上次发送时间加上当前规则所规定的时间,差距在半个小时以内就算到达
if(abs( time()-($last_send_time+self::$role[$now_role] * 3600)) > 1800){
//没有到达时间
return false;
}
//用户阅读时间是否符合条件
$time_diff = 0;
foreach (self::$role as $k=>$t){
if($k <= $now_role){
$time_diff += $t;
}
}
if( (time() - $last_read_time) >= $time_diff*3600){
//到了时间还是没阅读,就发送
return true;
}else{
//时间节点遍历时若被打断,则重新开始计算
DB::connection('api_mysql')->table('custom_push_keep_continue_v2')->where('uid',$uid)->update([
'times'=>0,
'updated_at'=>date('Y-m-d H:i:s')
]);
return false;
}
}
private function content($uid,$distribution_channel_id,$read_info){
$domain = sprintf('https://site%s.%s.com',encodeDistributionChannelId($distribution_channel_id),
env('CUSTOM_HOST'));
$user_info = DB::connection('api_mysql')->table('users')->where('id',$uid)->select('nickname')->first();
$nickname = '读者';
if($user_info && $user_info->nickname)$nickname = $user_info->nickname;
$content_format = "@%s 为您推荐上次未看完的小说\r\n\r\n点击继续阅读❤\r\n\r\n";
$content = sprintf($content_format,$nickname,$domain.$read_info['first']['url']);
if(!empty($read_info['seconds'])){
$content .= "历史阅读记录:\r\n\r\n";
foreach ($read_info['seconds'] as $record_item){
$content .= sprintf(" 🌳 %s\r\n",$domain.$record_item['url'],$record_item['book_name']);
}
}
$content .= "\r\n为了方便下次阅读,请置顶公众号";
return $content;
}
private function getAccessToken($appid){
try{
$WechatController = new WechatController($appid);
$accessToken = $WechatController->app->access_token; // EasyWeChat\Core\AccessToken 实例
$token = $accessToken->getToken(); // token 字符串
return $token;
}catch(\Exception $e){
\Log::error($e->getMessage());
}
return '';
}
private function getReadRecord($uid){
$records = Redis::hgetall('book_read:' . $uid);
$data = ['first'=>[],'seconds'=>[],'time'=>''];
foreach ($records as $k=>$item){
if($k == 'last_read'){
$record_arr = explode('_',$item);
$bid = $record_arr[0];
//$book_name = $this->bid2BookName($bid);
$bid = Hashids::encode($bid);
$cid = $record_arr[1];
$time = $record_arr[2];
$data['first'] = [
'url' => '/reader?bid='.$bid.'&cid='.$cid.'&fromtype=continue_read_v2',
'book_name'=>'',
];
$data['time'] = $time;
continue;
}
if(!is_numeric($k)) continue;
$record = explode('_', $item);
$latest_read_cid = $record[0];
$book_name = self::bid2BookName($k);
$latest_read_time = $record[count($record) - 1];
$data['seconds'][] =[
'url' => '/reader?bid='.Hashids::encode($k).'&cid='.$latest_read_cid.'&fromtype=continue_read_v2',
'book_name'=>$book_name,
'time'=>$latest_read_time
];
}
$temp = $data['seconds'];
if($temp){
usort($temp, function ($a, $b) {
if ($a['time'] >= $b['time']) return -1;
return 1;
});
}
$temp_res = [];
foreach ($temp as $k=>$it){
$temp_res[] = $it;
if($k>=2) break;
}
$data['seconds'] = $temp_res;
return $data;
}
private function bid2BookName($bid){
$book_name = null;
if(is_null($book_name)){
$book_key = 'wap:string:book:'.$bid;
$book_name = Redis::get($book_key);
Redis::EXPIRE($book_key,3600);
if(!$book_name){
$book_name = '';
$book_info = DB::connection('api_mysql')->table('book_configs')->where('bid',$bid)->select('book_name')->first();
//$book_info = BookConfigService::getBookById($bid);
if($book_info && isset($book_info->book_name)){
$book_name = $book_info->book_name;
Redis::setex($book_key,3600,$book_name);
}
}
}
return $book_name;
}
private function channelIds(){
$str = '5,123,14,13,8';
return explode(',',$str);
}
}