SyncSubscribe.php 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. <?php
  2. namespace Modules\ContentManage\Services\CpManage;
  3. use GuzzleHttp\Client;
  4. use Modules\Common\Support\Trace\CustomizeLogger;
  5. use Modules\Common\Support\Trace\TraceContext;
  6. use Modules\ContentManage\Exceptions\ChannelBusinessException;
  7. use Modules\ContentManage\Exceptions\Errors;
  8. use Modules\ContentManage\Models\Cp\CpSubscribeStatisticDataModel;
  9. class SyncSubscribe
  10. {
  11. private $httpClient = null;
  12. public $syncUrl;
  13. public $syncStartDate;
  14. public $syncEndDate;
  15. private $logger;
  16. private $perPage = 200;
  17. public function __construct()
  18. {
  19. $this->httpClient = new Client(['timeout' => 10]);
  20. $this->logger = CustomizeLogger::getSubscribeLogger();
  21. }
  22. private function getSyncUrl() {
  23. return $this->syncUrl ?:
  24. join('/', [
  25. trim(config('contentManage.zhushuyunpublicapi.public_domain'), '/'),
  26. 'outapi/zhiyu/getSubsByZhiyu'
  27. ]);
  28. }
  29. private function buildQuery($page = 0)
  30. {
  31. $query = [];
  32. if ($this->syncStartDate) {
  33. $query['start_time'] = $this->syncStartDate;
  34. }
  35. if ($this->syncEndDate) {
  36. $query['end_time'] = $this->syncEndDate;
  37. }
  38. $query['is_all'] = 0;
  39. $query['page'] = $page;
  40. $query['per_page'] = $this->perPage;
  41. $query['timestamp'] = time();
  42. $privateKey = config('contentManage.zhushuyunpublicapi.external_private_key');
  43. $query['sign'] = md5(md5($query['timestamp'] . $privateKey) . $privateKey);
  44. return $query;
  45. }
  46. public function sync()
  47. {
  48. try {
  49. $this->doSync();
  50. } catch (\Exception $exception) {
  51. $this->logger->error('同步失败,再次重试', [
  52. 'traceInfo' => app(TraceContext::class)->getTraceInfo(),
  53. 'errCode' => $exception->getCode(), 'errMsg' => $exception->getMessage()
  54. ]);
  55. usleep(500000);
  56. $this->doSync();
  57. }
  58. }
  59. private function doSync()
  60. {
  61. $page = 0;
  62. $dataCollect = collect();
  63. while (true) {
  64. try {
  65. $result = $this->httpClient->get($this->getSyncUrl(), [
  66. 'query' => $this->buildQuery($page)
  67. ]);
  68. } catch (\Exception $exception) {
  69. $this->logger->error('请求上游接口报错,准备重试', [
  70. 'traceInfo' => app(TraceContext::class)->getTraceInfo(),
  71. 'errCode' => $exception->getCode(), 'errMsg' => $exception->getMessage()
  72. ]);
  73. usleep(500000);
  74. $result = $this->httpClient->get($this->getSyncUrl(), [
  75. 'query' => $this->buildQuery($page)
  76. ]);
  77. }
  78. $parsedResult = $this->parseResult($result);
  79. if (0 == count($parsedResult['data'])) {
  80. break;
  81. }
  82. foreach ($parsedResult['data'] as $tempData) {
  83. $tempData['bid'] = 0;
  84. if($dataCollect->has($tempData['zhiyu_book_id'])) {
  85. $t = $dataCollect->get($tempData['zhiyu_book_id']);
  86. $t['yestoday_available_amount'] += $tempData['yestoday_available_amount'];
  87. $t['yestoday_last_amount'] += $tempData['yestoday_last_amount'];
  88. $t['yestoday_total_amount'] += $tempData['yestoday_total_amount'];
  89. $dataCollect->put($tempData['zhiyu_book_id'], $t);
  90. } else {
  91. $dataCollect->put($tempData['zhiyu_book_id'], $tempData);
  92. }
  93. }
  94. $page = $parsedResult['current_page'] + 1;
  95. if ($page > $parsedResult['last_page']) {
  96. break;
  97. }
  98. }
  99. $this->saveResult($dataCollect);
  100. }
  101. /**
  102. * note1:
  103. * 接口中的书籍id(zhiyu_book_id)去植宇书库zy_books去查,
  104. * @param $datas
  105. */
  106. private function saveResult($datas)
  107. {
  108. foreach ($datas as $data) {
  109. $bookSubscribe = new BookSubscribe($data);
  110. $insertData = $bookSubscribe->dealSubscribeInfo();
  111. if(is_null($insertData)) {
  112. continue;
  113. }
  114. CpSubscribeStatisticDataModel::updateOrCreate([
  115. 'cp_name' => $insertData['cp_name'], 'bid' => $insertData['zy_bid'],
  116. 'calculate_date' => $insertData['calculate_date']
  117. ], [
  118. 'month' => $insertData['month'],
  119. 'settlement_date' => date_sub(date_create($insertData['calculate_date']),
  120. date_interval_create_from_date_string('1 day'))->format('Y-m-d'),
  121. 'yesterday_available_amount' => $insertData['yesterday_available_amount'],
  122. 'yesterday_final_amount' => $insertData['yesterday_final_amount'],
  123. 'yesterday_total_coins' => $insertData['yesterday_total_coins'],
  124. 'book_settlement_type' => $insertData['book_settlement_type'],
  125. 'data_source_id' => $data['id'] ?? 0,
  126. 'data_source_from' => 'zhuishuyun',
  127. 'data_source_bid' => $data['bid'] ?? 0,
  128. ]);
  129. }
  130. }
  131. private function parseResult($rawResult)
  132. {
  133. $httpStatus = $rawResult->getStatusCode();
  134. if (200 != $httpStatus) {
  135. ChannelBusinessException::throwError(Errors::REQUEST_HTTP_STATUS_ERROR);
  136. }
  137. $rawContent = $rawResult->getBody()->getContents();
  138. $parsedContent = \json_decode($rawContent, true);
  139. $this->logger->debug('请求接口结果', [
  140. 'traceInfo' => app(TraceContext::class)->getTraceInfo(),
  141. 'url' => $this->syncUrl,
  142. 'parsedContent' => $parsedContent,
  143. ]);
  144. $code = $parsedContent['code'] ?? -1;
  145. if (0 != $code) {
  146. ChannelBusinessException::throwError(Errors::REQUEST_CODE_STATUS_ERROR);
  147. }
  148. return [
  149. 'current_page' => $parsedContent['data']['current_page'],
  150. 'last_page' => $parsedContent['data']['last_page'],
  151. 'data' => $parsedContent['data']['data']
  152. ];
  153. }
  154. }