123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- <?php
- namespace YdbPlatform\Ydb\Traits;
- use Ydb\StatusIds\StatusCode;
- use YdbPlatform\Ydb\Issue;
- use YdbPlatform\Ydb\Exception;
- use YdbPlatform\Ydb\QueryResult;
- use YdbPlatform\Ydb\Ydb;
- trait RequestTrait
- {
- /**
- * @var string
- */
- protected $last_request_service;
- /**
- * @var string
- */
- protected $last_request_method;
- /**
- * @var array
- */
- protected $last_request_data;
- /**
- * @var int
- */
- protected $last_request_try_count = 0;
- /**
- * @var Ydb
- */
- protected $ydb;
- /**
- * @var int
- */
- protected $lastDiscovery = 0;
- /**
- * Make a request to the service with the given method.
- *
- * @param string $service
- * @param string $method
- * @param array $data
- * @return bool|mixed|void|null
- * @throws Exception
- */
- protected function doRequest($service, $method, array $data = [])
- {
- $this->checkDiscovery();
- if(!empty($data["skip_get_token"])){
- unset($data["skip_get_token"]);
- } else {
- $this->meta['x-ydb-auth-ticket'] = [$this->credentials->token()];
- }
- $this->saveLastRequest($service, $method, $data);
- $requestClass = '\\Ydb\\' . $service . '\\' . $method . 'Request';
- switch ($method) {
- case 'BulkUpsert':
- case 'CommitTransaction':
- case 'RollbackTransaction':
- $resultClass = null;
- break;
- case 'PrepareDataQuery':
- $resultClass = '\\Ydb\\' . $service . '\\PrepareQueryResult';
- break;
- case 'ExecuteDataQuery':
- $resultClass = '\\Ydb\\' . $service . '\\ExecuteQueryResult';
- break;
- case 'ExplainDataQuery':
- $resultClass = '\\Ydb\\' . $service . '\\ExplainQueryResult';
- break;
- default:
- $resultClass = '\\Ydb\\' . $service . '\\' . $method . 'Result';
- }
- $request = new $requestClass($data);
- $this->logger()->debug(
- 'YDB: Sending API request [' . $requestClass . '].',
- json_decode($request->serializeToJsonString(), true)
- );
- $call = $this->client->$method($request, $this->meta);
- if (method_exists($call, 'wait')) {
- list($response, $status) = $call->wait();
- $this->handleGrpcStatus($service, $method, $status);
- return $this->processResponse($service, $method, $response, $resultClass);
- }
- return null;
- }
- /**
- * Make a stream request to the service with the given method.
- *
- * @param string $service
- * @param string $method
- * @param array $data
- * @return \Generator
- * @throws Exception
- */
- protected function doStreamRequest($service, $method, $data = [])
- {
- $this->checkDiscovery();
- if(!empty($data["skip_get_token"])){
- unset($data["skip_get_token"]);
- } else {
- $this->meta['x-ydb-auth-ticket'] = [$this->credentials->token()];
- }
- if (method_exists($this, 'take')) {
- $this->take();
- }
- $requestClass = '\\Ydb\\' . $service . '\\' . $method . 'Request';
- switch ($method) {
- case 'StreamReadTable':
- $requestClass = '\\Ydb\\' . $service . '\\ReadTableRequest';
- $resultClass = '\\Ydb\\' . $service . '\\ReadTableResult';
- break;
- case 'StreamExecuteScanQuery':
- $requestClass = '\\Ydb\\' . $service . '\\ExecuteScanQueryRequest';
- $resultClass = '\\Ydb\\' . $service . '\\ExecuteScanQueryPartialResult';
- break;
- default:
- $resultClass = '\\Ydb\\' . $service . '\\' . $method . 'Result';
- }
- $request = new $requestClass($data);
- $call = $this->client->$method($request, $this->meta);
- if (method_exists($call, 'responses')) {
- // $status = $call->getStatus();
- // $this->checkStatus($service, $method, $status);
- foreach ($call->responses() as $response) {
- $result = $this->processResponse($service, $method, $response, $resultClass);
- yield $result ? new QueryResult($result) : true;
- }
- }
- if (method_exists($this, 'release')) {
- $this->release();
- }
- }
- /**
- * Check response status.
- *
- * @param string $service
- * @param string $method
- * @param object $status
- * @throws Exception
- */
- protected function handleGrpcStatus($service, $method, $status)
- {
- if (isset($status->code) && $status->code !== 0) {
- $message = 'YDB ' . $service . ' ' . $method . ' (status code GRPC_'.
- (isset(self::$grpcExceptions[$status->code])?self::$grpcNames[$status->code]:$status->code)
- .' ' . $status->code . '): ' . ($status->details ?? 'no details');
- $this->logger->error($message);
- if ($this->ydb->needDiscovery()){
- try{
- $this->ydb->discover();
- }catch (\Exception $e){}
- }
- $endpoint = $this->ydb->endpoint();
- if ($this->ydb->needDiscovery() && count($this->ydb->cluster()->all()) > 0){
- $endpoint = $this->ydb->cluster()->all()[array_rand($this->ydb->cluster()->all())]->endpoint();
- }
- $this->client = new $this->client($endpoint,[
- 'credentials' => $this->ydb->iam()->getCredentials()
- ]);
- if (isset(self::$grpcExceptions[$status->code])) {
- throw new self::$grpcExceptions[$status->code]($message);
- } else {
- throw new \Exception($message);
- }
- }
- }
- /**
- * Process a response from the service.
- *
- * @param string $service
- * @param string $method
- * @param object $response
- * @param string $resultClass
- * @return bool|mixed|void
- * @throws Exception
- */
- protected function processResponse($service, $method, $response, $resultClass)
- {
- if (method_exists($response, 'getOperation')) {
- $response = $response->getOperation();
- }
- if (!method_exists($response, 'getStatus') || !method_exists($response, 'getResult')) {
- return $response;
- }
- $statusCode = $response->getStatus();
- if ($statusCode == StatusCode::SUCCESS) {
- $result = $response->getResult();
- if ($result === null) {
- return true;
- }
- if (is_object($result)) {
- if ($resultClass && class_exists($resultClass)) {
- $jsonResult = $result->serializeToJsonString();
- $this->logger()->debug('YDB: Received API response [' . $resultClass . '].', json_decode($jsonResult, true));
- $result = new $resultClass;
- $result->mergeFromJsonString($jsonResult);
- }
- }
- $this->resetLastRequest();
- return $result;
- }
- $statusName = StatusCode::name($statusCode);
- $issues = [];
- foreach ($response->getIssues() as $issue) {
- $issues[] = (new Issue($issue))->toString();
- }
- $message = implode("\n", $issues);
- $this->logger()->error(
- 'YDB: Service [' . $service . '] method [' . $method . '] Failed to receive a valid response.',
- [
- 'status' => $statusCode . ' (' . $statusName . ')',
- 'message' => $message,
- ]
- );
- $msg = 'YDB ' . $service . ' ' . $method . ' (YDB_' . $statusCode . ' ' . $statusName . '): ' . $message;
- if (isset(self::$ydbExceptions[$statusCode])) {
- throw new self::$ydbExceptions[$statusCode]($msg);
- } else {
- throw new \Exception($msg);
- }
- }
- /**
- * Retry the last request.
- *
- * @param int $sleep
- * @throws Exception
- */
- protected function retryLastRequest($sleep = 100)
- {
- if ($this->last_request_service && $this->last_request_method) {
- $this->logger()->info('Going to retry the last request!');
- usleep(max($this->last_request_try_count, 1) * $sleep * 1000); // waiting 100 ms more
- return $this->doRequest($this->last_request_service, $this->last_request_method, $this->last_request_data);
- }
- }
- /**
- * Save the last request to perform a retry.
- *
- * @param string $service
- * @param method $method
- * @param array $data
- */
- protected function saveLastRequest($service, $method, array $data = [])
- {
- $this->last_request_service = $service;
- $this->last_request_method = $method;
- $this->last_request_data = $data;
- $this->last_request_try_count++;
- }
- /**
- * Reset the last saved request.
- */
- protected function resetLastRequest()
- {
- $this->last_request_service = null;
- $this->last_request_method = null;
- $this->last_request_data = null;
- $this->last_request_try_count = 0;
- }
- protected function checkDiscovery(){
- if ($this->ydb->needDiscovery() && time()-$this->lastDiscovery>$this->ydb->discoveryInterval()){
- try{
- $this->lastDiscovery = time();
- $this->ydb->discover();
- } catch (\Exception $e){
- }
- }
- }
- public static $ydbExceptions = [
- StatusCode::STATUS_CODE_UNSPECIFIED => \YdbPlatform\Ydb\Exceptions\Ydb\StatusCodeUnspecified::class,
- StatusCode::BAD_REQUEST => \YdbPlatform\Ydb\Exceptions\Ydb\BadRequestException::class,
- StatusCode::UNAUTHORIZED => \YdbPlatform\Ydb\Exceptions\Ydb\UnauthorizedException::class,
- StatusCode::INTERNAL_ERROR => \YdbPlatform\Ydb\Exceptions\Ydb\InternalErrorException::class,
- StatusCode::ABORTED => \YdbPlatform\Ydb\Exceptions\Ydb\AbortedException::class,
- StatusCode::UNAVAILABLE => \YdbPlatform\Ydb\Exceptions\Ydb\UnavailableException::class,
- StatusCode::OVERLOADED => \YdbPlatform\Ydb\Exceptions\Ydb\OverloadedException::class,
- StatusCode::SCHEME_ERROR => \YdbPlatform\Ydb\Exceptions\Ydb\SchemeErrorException::class,
- StatusCode::GENERIC_ERROR => \YdbPlatform\Ydb\Exceptions\Ydb\GenericErrorException::class,
- StatusCode::TIMEOUT => \YdbPlatform\Ydb\Exceptions\Ydb\TimeoutException::class,
- StatusCode::BAD_SESSION => \YdbPlatform\Ydb\Exceptions\Ydb\BadSessionException::class,
- StatusCode::PRECONDITION_FAILED => \YdbPlatform\Ydb\Exceptions\Ydb\PreconditionFailedException::class,
- StatusCode::ALREADY_EXISTS => \YdbPlatform\Ydb\Exceptions\Ydb\AlreadyExistsException::class,
- StatusCode::NOT_FOUND => \YdbPlatform\Ydb\Exceptions\Ydb\NotFoundException::class,
- StatusCode::SESSION_EXPIRED => \YdbPlatform\Ydb\Exceptions\Ydb\SessionExpiredException::class,
- StatusCode::CANCELLED => \YdbPlatform\Ydb\Exceptions\Ydb\CancelledException::class,
- StatusCode::UNDETERMINED => \YdbPlatform\Ydb\Exceptions\Ydb\UndeterminedException::class,
- StatusCode::UNSUPPORTED => \YdbPlatform\Ydb\Exceptions\Ydb\UnsupportedException::class,
- StatusCode::SESSION_BUSY => \YdbPlatform\Ydb\Exceptions\Ydb\SessionBusyException::class,
- ];
- public static $grpcExceptions = [
- 1 => \YdbPlatform\Ydb\Exceptions\Grpc\CanceledException::class,
- 2 => \YdbPlatform\Ydb\Exceptions\Grpc\UnknownException::class,
- 3 => \YdbPlatform\Ydb\Exceptions\Grpc\InvalidArgumentException::class,
- 4 => \YdbPlatform\Ydb\Exceptions\Grpc\DeadlineExceededException::class,
- 5 => \YdbPlatform\Ydb\Exceptions\Grpc\NotFoundException::class,
- 6 => \YdbPlatform\Ydb\Exceptions\Grpc\AlreadyExistsException::class,
- 7 => \YdbPlatform\Ydb\Exceptions\Grpc\PermissionDeniedException::class,
- 8 => \YdbPlatform\Ydb\Exceptions\Grpc\ResourceExhaustedException::class,
- 9 => \YdbPlatform\Ydb\Exceptions\Grpc\FailedPreconditionException::class,
- 10 => \YdbPlatform\Ydb\Exceptions\Grpc\AbortedException::class,
- 11 => \YdbPlatform\Ydb\Exceptions\Grpc\OutOfRangeException::class,
- 12 => \YdbPlatform\Ydb\Exceptions\Grpc\UnimplementedException::class,
- 13 => \YdbPlatform\Ydb\Exceptions\Grpc\InternalException::class,
- 14 => \YdbPlatform\Ydb\Exceptions\Grpc\UnavailableException::class,
- 15 => \YdbPlatform\Ydb\Exceptions\Grpc\DataLossException::class,
- 16 => \YdbPlatform\Ydb\Exceptions\Grpc\UnauthenticatedException::class
- ];
- public static $grpcNames = [
- 1 => "CANCELLED",
- 2 => "UNKNOWN",
- 3 => "INVALID_ARGUMENT",
- 4 => "DEADLINE_EXCEEDED",
- 5 => "NOT_FOUND",
- 6 => "ALREADY_EXISTS",
- 7 => "PERMISSION_DENIED",
- 8 => "RESOURCE_EXHAUSTED",
- 9 => "FAILED_PRECONDITION",
- 10 => "ABORTED",
- 11 => "OUT_OF_RANGE",
- 12 => "UNIMPLEMENTED",
- 13 => "INTERNAL",
- 14 => "UNAVAILABLE",
- 15 => "DATA_LOSS",
- 16 => "UNAUTHENTICATED"
- ];
- }
|