Session.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. <?php
  2. namespace YdbPlatform\Ydb;
  3. use Closure;
  4. use Exception;
  5. use Google\Protobuf\Duration;
  6. use Ydb\Operations\OperationParams;
  7. use Ydb\Table\Query;
  8. use Ydb\Table\QueryCachePolicy;
  9. // use Ydb\Table\StaleModeSettings;
  10. // use Ydb\Table\OnlineModeSettings;
  11. use Ydb\Table\TransactionControl;
  12. use Ydb\Table\TransactionSettings;
  13. use Ydb\Table\SerializableModeSettings;
  14. class Session
  15. {
  16. use Traits\RequestTrait;
  17. use Traits\ParseResultTrait;
  18. use Traits\TypeHelpersTrait;
  19. use Traits\TableHelpersTrait;
  20. use Traits\LoggerTrait;
  21. /**
  22. * @var Table
  23. */
  24. protected $table;
  25. /**
  26. * @var \Ydb\Table\V1\TableServiceClient
  27. */
  28. protected $client;
  29. /**
  30. * @var array
  31. */
  32. protected $meta;
  33. /**
  34. * @var string
  35. */
  36. protected $path;
  37. /**
  38. * @var string
  39. */
  40. protected $session_id;
  41. /**
  42. * @var string
  43. */
  44. protected $tx_id;
  45. /**
  46. * @var \Psr\Log\LoggerInterface|null
  47. */
  48. protected $logger;
  49. /**
  50. * @var bool
  51. */
  52. protected $is_busy = false;
  53. /**
  54. * @var bool
  55. */
  56. protected $is_alive = false;
  57. /**
  58. * @var bool
  59. */
  60. protected $keep_query_in_cache = null;
  61. /**
  62. * @param Table $table
  63. * @param string $session_id
  64. */
  65. public function __construct(Table $table, $session_id)
  66. {
  67. $this->ydb = $table->ydb();
  68. $this->table = $table;
  69. $this->session_id = $session_id;
  70. $this->client = $table->client();
  71. $this->meta = $table->meta();
  72. $this->credentials = $table->credentials();
  73. $this->path = $table->path();
  74. $this->logger = $table->getLogger();
  75. $this->is_alive = true;
  76. }
  77. /**
  78. * @return string
  79. */
  80. public function id()
  81. {
  82. return $this->session_id;
  83. }
  84. /**
  85. * @return bool
  86. */
  87. public function isAlive()
  88. {
  89. return $this->is_alive;
  90. }
  91. /**
  92. * @return bool
  93. */
  94. public function isBusy()
  95. {
  96. return $this->is_busy;
  97. }
  98. /**
  99. * @return bool
  100. */
  101. public function isIdle()
  102. {
  103. return !$this->is_busy;
  104. }
  105. /**
  106. * @return $this
  107. */
  108. public function take()
  109. {
  110. $this->is_busy = true;
  111. $this->table->sessionTaken($this);
  112. return $this;
  113. }
  114. /**
  115. * @return $this
  116. */
  117. public function release()
  118. {
  119. $this->is_busy = false;
  120. $this->table->sessionReleased($this);
  121. return $this;
  122. }
  123. /**
  124. * @return void
  125. * @throws Exception
  126. */
  127. public function delete()
  128. {
  129. if ($this->is_alive)
  130. {
  131. $this->request('DeleteSession', ['session_id' => $this->session_id]);
  132. $this->is_alive = false;
  133. $this->table->dropSession($this->session_id);
  134. }
  135. }
  136. /**
  137. * @return $this
  138. * @throws Exception
  139. */
  140. protected function refresh()
  141. {
  142. $old_session_id = $this->session_id;
  143. $result = $this->request('CreateSession');
  144. $session_id = $result->getSessionId();
  145. $this->logger()->info('YDB: New session created [...' . substr($session_id, -6) . '].');
  146. $this->session_id = $session_id;
  147. $this->table->syncSession($old_session_id);
  148. return $this;
  149. }
  150. /**
  151. * @return array|mixed|null
  152. * @throws Exception
  153. */
  154. public function keepAlive()
  155. {
  156. $result = $this->request('KeepAlive', ['session_id' => $this->session_id]);
  157. return $this->parseResult($result, 'sessionStatus');
  158. }
  159. /**
  160. * @param Closure $closure
  161. * @return mixed
  162. * @throws Exception
  163. */
  164. public function transaction(Closure $closure)
  165. {
  166. $this->beginTransaction();
  167. try
  168. {
  169. $result = $closure($this);
  170. $this->commitTransaction();
  171. return $result;
  172. }
  173. catch (Exception $e)
  174. {
  175. try {
  176. $this->rollbackTransaction();
  177. } catch (Exception $e2) {
  178. }
  179. throw $e;
  180. }
  181. }
  182. /**
  183. * @return mixed
  184. * @throws Exception
  185. */
  186. public function beginTransaction()
  187. {
  188. $serializable_read_write = new SerializableModeSettings;
  189. // $online_read_only = new OnlineModeSettings;
  190. // $stale_read_only = new StaleModeSettings;
  191. $transaction_settings = new TransactionSettings([
  192. 'serializable_read_write' => $serializable_read_write,
  193. ]);
  194. $result = $this->request('BeginTransaction', [
  195. 'session_id' => $this->session_id,
  196. 'tx_settings' => $transaction_settings,
  197. ]);
  198. if ($result && method_exists($result, 'getTxMeta'))
  199. {
  200. $this->tx_id = $result->getTxMeta()->getId();
  201. return $this->tx_id;
  202. }
  203. else
  204. {
  205. throw new Exception('YDB failed to begin transaction');
  206. }
  207. }
  208. /**
  209. * @return bool
  210. * @throws Exception
  211. */
  212. public function commitTransaction()
  213. {
  214. if ($tx_id = $this->tx_id)
  215. {
  216. $this->request('CommitTransaction', [
  217. 'session_id' => $this->session_id,
  218. 'tx_id' => $tx_id,
  219. ]);
  220. }
  221. $this->tx_id = null;
  222. return true;
  223. }
  224. /**
  225. * An alias to commitTransaction.
  226. *
  227. * @return bool
  228. * @throws Exception
  229. */
  230. public function commit()
  231. {
  232. return $this->commitTransaction();
  233. }
  234. /**
  235. * @return bool
  236. * @throws Exception
  237. */
  238. public function rollbackTransaction()
  239. {
  240. if ($tx_id = $this->tx_id)
  241. {
  242. $this->request('RollbackTransaction', [
  243. 'session_id' => $this->session_id,
  244. 'tx_id' => $tx_id,
  245. ]);
  246. }
  247. $this->tx_id = null;
  248. return true;
  249. }
  250. /**
  251. * An alias to rollbackTransaction.
  252. *
  253. * @return bool
  254. * @throws Exception
  255. */
  256. public function rollBack()
  257. {
  258. return $this->rollbackTransaction();
  259. }
  260. /**
  261. * Set whether to keep query in cache.
  262. *
  263. * @return $this
  264. */
  265. public function keepInCache($value = true)
  266. {
  267. $this->keep_query_in_cache = (bool)$value;
  268. return $this;
  269. }
  270. /**
  271. * @param string|\Ydb\Table\Query $yql
  272. * @return YdbQuery
  273. * @throws \YdbPlatform\Ydb\Exception
  274. */
  275. public function newQuery($yql)
  276. {
  277. return new YdbQuery($this, $yql);
  278. }
  279. /**
  280. * @param YdbQuery $query
  281. * @return bool|QueryResult
  282. * @throws \YdbPlatform\Ydb\Exception
  283. */
  284. public function executeQuery(YdbQuery $query)
  285. {
  286. $data = $query->getRequestData();
  287. $data['session_id'] = $this->session_id;
  288. $result = $this->request('ExecuteDataQuery', $data);
  289. return $result ? new QueryResult($result) : true;
  290. }
  291. /**
  292. * @param string|\Ydb\Table\Query $yql
  293. * @param array|null $parameters
  294. * @return bool|QueryResult
  295. * @throws \YdbPlatform\Ydb\Exception
  296. */
  297. public function query($yql, array $parameters = null, array $options = [])
  298. {
  299. $tx_id = $this->tx_id;
  300. if (!$tx_id)
  301. {
  302. $tx_id = $this->beginTransaction();
  303. }
  304. $tx_control = new TransactionControl([
  305. 'tx_id' => $tx_id,
  306. ]);
  307. $query = $this->newQuery($yql)
  308. ->parameters($parameters)
  309. ->txControl($tx_control)
  310. ->keepInCache($this->keep_query_in_cache ?? ($parameters&&count($parameters)>0));
  311. if(isset($options['collectStats'])){
  312. $query->collectStats($options['collectStats']);
  313. }
  314. $operationParams = new OperationParams();
  315. if(isset($options['operation_timeout_ms'])){
  316. $seconds = intdiv( $options['operation_timeout_ms'], 1000); // get seconds
  317. $nanos = fmod($options['operation_timeout_ms'], 1000) * 1000000; // get ns
  318. $operationParams->setOperationTimeout(new Duration([
  319. 'seconds' => $seconds,
  320. 'nanos' => $nanos
  321. ]));
  322. }
  323. if(isset($options['cancel_after_ms'])){
  324. $seconds = intdiv( $options['cancel_after_ms'], 1000); // get seconds
  325. $nanos = fmod($options['operation_timeout_ms'], 1000) * 1000000; // get ns
  326. $operationParams->setCancelAfter(new Duration([
  327. 'seconds' => $seconds,
  328. 'nanos' => $nanos
  329. ]));
  330. }
  331. $query->operationParams($operationParams);
  332. return $this->executeQuery($query);
  333. }
  334. /**
  335. * An alias to query with no result.
  336. *
  337. * @param string|\Ydb\Table\Query $yql
  338. * @param array|null $parameters
  339. * @return bool
  340. * @throws \YdbPlatform\Ydb\Exception
  341. */
  342. public function exec($yql, array $parameters = null)
  343. {
  344. $this->query($yql, $parameters);
  345. return true;
  346. }
  347. /**
  348. * @param string $yql
  349. * @return bool|mixed|void|null
  350. * @throws Exception
  351. */
  352. public function schemeQuery($yql)
  353. {
  354. return $this->request('ExecuteSchemeQuery', [
  355. 'session_id' => $this->session_id,
  356. 'yql_text' => $yql,
  357. ]);
  358. }
  359. /**
  360. * @param string $yql
  361. * @return bool|mixed|void|null
  362. * @throws Exception
  363. */
  364. public function explainQuery($yql)
  365. {
  366. $result = $this->request('ExplainDataQuery', [
  367. 'session_id' => $this->session_id,
  368. 'yql_text' => $yql,
  369. ]);
  370. return $result;
  371. }
  372. /**
  373. * @param string $yql
  374. * @return Statement
  375. * @throws Exception
  376. */
  377. public function prepare($yql)
  378. {
  379. $statement = new Statement($this, $yql);
  380. if ($statement->isCached())
  381. {
  382. return $statement;
  383. }
  384. $result = $this->request('PrepareDataQuery', [
  385. 'session_id' => $this->session_id,
  386. 'yql_text' => $yql,
  387. ]);
  388. $statement->saveInCache();
  389. return $statement;
  390. }
  391. /**
  392. * @param string $path
  393. * @param array $columns
  394. * @param array $options
  395. * @return \Generator
  396. */
  397. public function readTable($path, $columns = [], $options = [])
  398. {
  399. $params = [
  400. 'session_id' => $this->session_id,
  401. 'path' => $this->pathPrefix($path),
  402. 'columns' => $columns,
  403. ];
  404. if (isset($options['row_limit']))
  405. {
  406. $params['row_limit'] = (int)$options['row_limit'];
  407. }
  408. if (isset($options['ordered']))
  409. {
  410. $params['ordered'] = (bool)$options['ordered'];
  411. }
  412. if (isset($options['key_range']))
  413. {
  414. if ($key_range = $this->convertKeyRange($options['key_range']))
  415. {
  416. $params['key_range'] = $key_range;
  417. }
  418. }
  419. return $this->streamRequest('StreamReadTable', $params);
  420. }
  421. /**
  422. * @param string $table
  423. * @param mixed $columns
  424. * @param string|array $primary_key
  425. * @param array $indexes
  426. * @return bool|mixed|void|null
  427. * @throws Exception
  428. */
  429. public function createTable($table, $columns, $primary_key = 'id', $indexes = [])
  430. {
  431. $data = [
  432. 'path' => $this->pathPrefix($table),
  433. 'session_id' => $this->session_id,
  434. ];
  435. if (is_a($columns, YdbTable::class))
  436. {
  437. $data['columns'] = $columns->getColumns();
  438. $data['primary_key'] = $columns->getPrimaryKey();
  439. $data['indexes'] = $columns->getIndexes();
  440. $data['storage_settings'] = $columns->getStorageSettings();
  441. $data['column_families'] = $columns->getColumnFamilies();
  442. $data['attributes'] = $columns->getAttributes();
  443. $data['compaction_policy'] = $columns->getCompactionPolicy();
  444. $data['partitioning_settings'] = $columns->getPartitionSettings();
  445. $data['uniform_partitions'] = $columns->getUniformPartitions();
  446. $data['key_bloom_filter'] = $columns->getKeyBloomFilter();
  447. $data['read_replicas_settings'] = $columns->getReadReplicasSettings();
  448. $data = array_filter($data);
  449. }
  450. else
  451. {
  452. $data['columns'] = $this->convertColumns($columns);
  453. $data['primary_key'] = (array)$primary_key;
  454. $data['indexes'] = $this->convertIndexes($indexes);
  455. }
  456. return $this->request('CreateTable', $data);
  457. }
  458. /**
  459. * @param string $source_table
  460. * @param string $destination_table
  461. * @return bool|mixed|void|null
  462. * @throws Exception
  463. */
  464. public function copyTable($source_table, $destination_table)
  465. {
  466. return $this->request('CopyTable', [
  467. 'source_path' => $this->pathPrefix($source_table),
  468. 'destination_path' => $this->pathPrefix($destination_table),
  469. 'session_id' => $this->session_id,
  470. ]);
  471. }
  472. /**
  473. * @param array $tables
  474. * @return bool|mixed|void|null
  475. * @throws Exception
  476. */
  477. public function copyTables($tables)
  478. {
  479. return $this->request('CopyTables', [
  480. 'tables' => $this->convertTableItems($tables),
  481. 'session_id' => $this->session_id,
  482. ]);
  483. }
  484. /**
  485. * @param string $table
  486. * @return bool|mixed|void|null
  487. * @throws Exception
  488. */
  489. public function dropTable($table)
  490. {
  491. return $this->request('DropTable', [
  492. 'path' => $this->pathPrefix($table),
  493. 'session_id' => $this->session_id,
  494. ]);
  495. }
  496. /**
  497. * @param string $table
  498. * @param array $columns
  499. * @param array $indexes
  500. * @return bool|mixed|void|null
  501. * @throws Exception
  502. */
  503. public function alterTable($table, $columns = [], $indexes = [])
  504. {
  505. return $this->request('AlterTable', [
  506. 'path' => $this->pathPrefix($table),
  507. 'add_columns' => $this->convertColumns($columns['add'] ?? []),
  508. 'drop_columns' => $columns['drop'] ?? [],
  509. 'alter_columns' => $this->convertColumns($columns['alter'] ?? []),
  510. 'add_indexes' => $this->convertIndexes($indexes['add'] ?? []),
  511. 'drop_indexes' => $indexes['drop'] ?? [],
  512. 'session_id' => $this->session_id,
  513. ]);
  514. }
  515. /**
  516. * @param string $table
  517. * @return array|mixed|null
  518. * @throws Exception
  519. */
  520. public function describeTable($table)
  521. {
  522. $result = $this->request('DescribeTable', [
  523. 'session_id' => $this->session_id,
  524. 'path' => $this->pathPrefix($table),
  525. 'include_table_stats' => true,
  526. ]);
  527. return $this->parseResult($result);
  528. }
  529. /**
  530. * @param string $method
  531. * @param array $data
  532. * @return bool|mixed|void|null
  533. * @throws \YdbPlatform\Ydb\Exception
  534. */
  535. protected function request($method, array $data = [])
  536. {
  537. $this->take();
  538. try
  539. {
  540. $result = $this->doRequest('Table', $method, $data);
  541. }
  542. catch (Exception $e)
  543. {
  544. $this->release();
  545. throw $e;
  546. }
  547. $this->release();
  548. return $result;
  549. }
  550. /**
  551. * @param string $method
  552. * @param array $data
  553. * @return \Generator
  554. * @throws \YdbPlatform\Ydb\Exception
  555. */
  556. protected function streamRequest($method, array $data = [])
  557. {
  558. return $this->doStreamRequest('Table', $method, $data);
  559. }
  560. }