123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- #include <yql/essentials/parser/pg_wrapper/interface/raw_parser.h>
- #include "arena_ctx.h"
- #include <util/generic/scope.h>
- #include <fcntl.h>
- #include <stdint.h>
- #ifndef WIN32
- #include <pthread.h>
- #endif
- #include <signal.h>
- #define TypeName PG_TypeName
- #define SortBy PG_SortBy
- #define Sort PG_Sort
- #define Unique PG_Unique
- #undef SIZEOF_SIZE_T
- extern "C" {
- #include "postgres.h"
- #include "access/session.h"
- #include "access/xact.h"
- #include "catalog/namespace.h"
- #include "mb/pg_wchar.h"
- #include "nodes/pg_list.h"
- #include "nodes/parsenodes.h"
- #include "nodes/value.h"
- #include "parser/parser.h"
- #include "utils/guc.h"
- #include "utils/palloc.h"
- #include "utils/memutils.h"
- #include "utils/memdebug.h"
- #include "utils/resowner.h"
- #include "utils/timestamp.h"
- #include "utils/guc_hooks.h"
- #include "port/pg_bitutils.h"
- #include "port/pg_crc32c.h"
- #include "postmaster/postmaster.h"
- #include "storage/latch.h"
- #include "storage/proc.h"
- #include "miscadmin.h"
- #include "tcop/tcopprot.h"
- #include "tcop/utility.h"
- #include "thread_inits.h"
- #undef Abs
- #undef Min
- #undef Max
- #undef TypeName
- #undef SortBy
- #undef LOG
- #undef INFO
- #undef NOTICE
- #undef WARNING
- #undef ERROR
- #undef FATAL
- #undef PANIC
- #undef open
- #undef fopen
- #undef bind
- #undef locale_t
- }
- #include "utils.h"
- extern "C" {
- extern __thread Latch LocalLatchData;
- extern void destroy_timezone_hashtable();
- extern void destroy_typecache_hashtable();
- extern void free_current_locale_conv();
- extern void RE_cleanup_cache();
- const char *progname;
- #define STDERR_BUFFER_LEN 4096
- #define DEBUG
- typedef struct {
- char* message; // exception message
- char* funcname; // source function of exception (e.g. SearchSysCache)
- char* filename; // source of exception (e.g. parse.l)
- int lineno; // source of exception (e.g. 104)
- int cursorpos; // char in query at which exception occurred
- char* context; // additional context (optional, can be NULL)
- } PgQueryError;
- typedef struct {
- List *tree;
- char* stderr_buffer;
- PgQueryError* error;
- } PgQueryInternalParsetreeAndError;
- PgQueryInternalParsetreeAndError pg_query_raw_parse(const char* input) {
- PgQueryInternalParsetreeAndError result = { 0 };
- char stderr_buffer[STDERR_BUFFER_LEN + 1] = { 0 };
- #ifndef DEBUG
- int stderr_global;
- int stderr_pipe[2];
- #endif
- #ifndef DEBUG
- // Setup pipe for stderr redirection
- if (pipe(stderr_pipe) != 0) {
- PgQueryError* error = (PgQueryError*)malloc(sizeof(PgQueryError));
- error->message = strdup("Failed to open pipe, too many open file descriptors");
- result.error = error;
- return result;
- }
- fcntl(stderr_pipe[0], F_SETFL, fcntl(stderr_pipe[0], F_GETFL) | O_NONBLOCK);
- // Redirect stderr to the pipe
- stderr_global = dup(STDERR_FILENO);
- dup2(stderr_pipe[1], STDERR_FILENO);
- close(stderr_pipe[1]);
- #endif
- PG_TRY();
- {
- result.tree = raw_parser(input, RAW_PARSE_DEFAULT);
- #ifndef DEBUG
- // Save stderr for result
- read(stderr_pipe[0], stderr_buffer, STDERR_BUFFER_LEN);
- #endif
- result.stderr_buffer = strdup(stderr_buffer);
- }
- PG_CATCH();
- {
- ErrorData* error_data;
- PgQueryError* error;
- error_data = CopyErrorData();
- // Note: This is intentionally malloc so exiting the memory context doesn't free this
- error = (PgQueryError*)malloc(sizeof(PgQueryError));
- error->message = strdup(error_data->message);
- error->filename = strdup(error_data->filename);
- error->funcname = strdup(error_data->funcname);
- error->context = NULL;
- error->lineno = error_data->lineno;
- error->cursorpos = error_data->cursorpos;
- result.error = error;
- FlushErrorState();
- }
- PG_END_TRY();
- #ifndef DEBUG
- // Restore stderr, close pipe
- dup2(stderr_global, STDERR_FILENO);
- close(stderr_pipe[0]);
- close(stderr_global);
- #endif
- return result;
- }
- void pg_query_free_error(PgQueryError *error) {
- free(error->message);
- free(error->funcname);
- free(error->filename);
- if (error->context) {
- free(error->context);
- }
- free(error);
- }
- }
- namespace NYql {
- static struct TGlobalInit {
- TGlobalInit() {
- pg_crc32c crc = 0;
- pg_popcount32(0);
- pg_popcount64(0);
- COMP_CRC32C(crc,"",0);
- }
- } GlobalInit;
- void PGParse(const TString& input, IPGParseEvents& events) {
- pg_thread_init();
- PgQueryInternalParsetreeAndError parsetree_and_error;
- TArenaMemoryContext arena;
- auto prevErrorContext = ErrorContext;
- ErrorContext = CurrentMemoryContext;
- Y_DEFER {
- ErrorContext = prevErrorContext;
- };
- parsetree_and_error = pg_query_raw_parse(input.c_str());
- Y_DEFER {
- if (parsetree_and_error.error) {
- pg_query_free_error(parsetree_and_error.error);
- }
- free(parsetree_and_error.stderr_buffer);
- };
- if (parsetree_and_error.error) {
- TPosition position(0, 1);
- // cursorpos is about codepoints, not bytes
- TTextWalker walker(position, true);
- auto cursorpos = parsetree_and_error.error->cursorpos;
- size_t codepoints = 0;
- if (cursorpos >= 0) {
- for (size_t i = 0; i < input.size(); ++i) {
- if (codepoints == cursorpos) {
- break;
- }
- if (!TTextWalker::IsUtf8Intermediate(input[i])) {
- ++codepoints;
- }
- walker.Advance(input[i]);
- }
- }
- events.OnError(TIssue(position, "ERROR: " + TString(parsetree_and_error.error->message) + "\n"));
- } else {
- events.OnResult(parsetree_and_error.tree);
- }
- }
- TString PrintPGTree(const List* raw) {
- auto str = nodeToString(raw);
- Y_DEFER {
- pfree(str);
- };
- return TString(str);
- }
- TString GetCommandName(Node* node) {
- return CreateCommandName(node);
- }
- }
- extern "C" void setup_pg_thread_cleanup() {
- struct TThreadCleanup {
- ~TThreadCleanup() {
- NYql::TExtensionsRegistry::Instance().CleanupThread();
- destroy_timezone_hashtable();
- destroy_typecache_hashtable();
- RE_cleanup_cache();
- free_current_locale_conv();
- ResourceOwnerDelete(CurrentResourceOwner);
- MemoryContextDelete(TopMemoryContext);
- free(MyProc);
- }
- };
- static thread_local TThreadCleanup ThreadCleanup;
- Log_error_verbosity = PGERROR_DEFAULT;
- SetDatabaseEncoding(PG_UTF8);
- SetClientEncoding(PG_UTF8);
- InitializeClientEncoding();
- MemoryContextInit();
- auto owner = ResourceOwnerCreate(NULL, "TopTransaction");
- TopTransactionResourceOwner = owner;
- CurTransactionResourceOwner = owner;
- CurrentResourceOwner = owner;
- MyProcPid = getpid();
- MyStartTimestamp = GetCurrentTimestamp();
- MyStartTime = timestamptz_to_time_t(MyStartTimestamp);
- InitializeLatchSupport();
- MyLatch = &LocalLatchData;
- InitLatch(MyLatch);
- InitializeLatchWaitSet();
- MyProc = (PGPROC*)malloc(sizeof(PGPROC));
- Zero(*MyProc);
- StartTransactionCommand();
- InitializeSession();
- work_mem = MAX_KILOBYTES; // a way to postpone spilling for tuple stores
- assign_max_stack_depth(1024, nullptr);
- MyDatabaseId = 3; // from catalog.pg_database
- namespace_search_path = pstrdup("public");
- InitializeSessionUserId(nullptr, 1);
- };
|