diff --git a/CMakeLists.txt b/CMakeLists.txt index 259bd92..dbbbe00 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,7 +60,7 @@ set(BUILD_BENCHMARK OFF CACHE BOOL "Disable clickhouse-cpp benchmarks" FORCE) add_subdirectory(third_party/clickhouse-cpp EXCLUDE_FROM_ALL) # Collect source files -file(GLOB_RECURSE SOURCES src/*.cc) +file(GLOB_RECURSE SOURCES src/*.cc src/*.c) # Build shared library add_library(pg_stat_ch SHARED ${SOURCES}) diff --git a/docker/init/00-schema.sql b/docker/init/00-schema.sql index 4b15ef2..c353915 100644 --- a/docker/init/00-schema.sql +++ b/docker/init/00-schema.sql @@ -53,6 +53,8 @@ CREATE TABLE pg_stat_ch.events_raw query_id Int64 COMMENT '64-bit hash identifying normalized queries. Queries differing only in constants share the same query_id. Use for aggregating statistics across similar queries.', + parent_query_id UInt64 COMMENT 'query_id of the calling query (e.g. the plpgsql function that issued this SPI statement). 0 for top-level queries. Use WHERE parent_query_id = 0 to restrict aggregations to top-level queries and avoid double-counting CPU and duration.', + cmd_type LowCardinality(String) COMMENT 'Command type: SELECT, INSERT, UPDATE, DELETE, MERGE, UTILITY, or UNKNOWN. Use for workload characterization (read-heavy vs write-heavy).', rows UInt64 COMMENT 'Rows returned (SELECT) or affected (INSERT/UPDATE/DELETE). HIGH: large result sets or bulk operations. LOW: point queries. Watch for unexpected HIGH values indicating missing WHERE clauses.', diff --git a/migrations/001_add_parent_query_id.sql b/migrations/001_add_parent_query_id.sql new file mode 100644 index 0000000..3b49bf7 --- /dev/null +++ b/migrations/001_add_parent_query_id.sql @@ -0,0 +1,12 @@ +-- Migration: add parent_query_id column +-- +-- Introduced in pg_stat_ch 0.4.x. Each event now carries the query_id of its +-- calling query (e.g. the plpgsql function that issued an SPI statement). +-- Top-level queries emit 0. Use WHERE parent_query_id = 0 in aggregations to +-- avoid double-counting CPU and duration across nested calls. +-- +-- Run against your ClickHouse instance before upgrading the extension: +-- clickhouse-client < migrations/001_add_parent_query_id.sql + +ALTER TABLE pg_stat_ch.events_raw + ADD COLUMN IF NOT EXISTS parent_query_id UInt64 DEFAULT 0; diff --git a/src/config/guc.cc b/src/config/guc.c similarity index 83% rename from src/config/guc.cc rename to src/config/guc.c index d1cd27e..e4390cd 100644 --- a/src/config/guc.cc +++ b/src/config/guc.c @@ -1,27 +1,23 @@ // pg_stat_ch GUC (Grand Unified Configuration) implementation -extern "C" { #include "postgres.h" #include "utils/guc.h" -} - -#include #include "config/guc.h" // GUC variable storage bool psch_enabled = true; bool psch_use_otel = false; -char* psch_clickhouse_host = nullptr; +char* psch_clickhouse_host = NULL; int psch_clickhouse_port = 9000; -char* psch_clickhouse_user = nullptr; -char* psch_clickhouse_password = nullptr; -char* psch_clickhouse_database = nullptr; +char* psch_clickhouse_user = NULL; +char* psch_clickhouse_password = NULL; +char* psch_clickhouse_database = NULL; bool psch_clickhouse_use_tls = false; bool psch_clickhouse_skip_tls_verify = false; -char* psch_otel_endpoint = nullptr; -char* psch_hostname = nullptr; +char* psch_otel_endpoint = NULL; +char* psch_hostname = NULL; int psch_queue_capacity = 131072; int psch_flush_interval_ms = 200; int psch_batch_max = 200000; @@ -35,7 +31,7 @@ bool psch_debug_force_locked_overflow = false; // Log level options (matches PostgreSQL's server_message_level_options pattern) // clang-format off -static const std::array log_elevel_options = {{ +static const struct config_enum_entry log_elevel_options[] = { {"debug5", DEBUG5, false}, {"debug4", DEBUG4, false}, {"debug3", DEBUG3, false}, @@ -48,16 +44,15 @@ static const std::array log_elevel_options = {{ {"error", ERROR, false}, {"fatal", FATAL, false}, {"panic", PANIC, false}, - {nullptr, 0, false}, -}}; + {NULL, 0, false}, +}; // clang-format on -extern "C" { - // Check hook to ensure queue_capacity is a power of 2. // Parameters follow PostgreSQL GUC check hook signature. -static bool check_psch_queue_capacity(int* newval, void** extra [[maybe_unused]], - GucSource source [[maybe_unused]]) { +static bool check_psch_queue_capacity(int* newval, void** extra, GucSource source) { + (void)extra; + (void)source; // Check if value is positive and a power of 2 if (*newval <= 0) { GUC_check_errdetail("pg_stat_ch.queue_capacity must be positive."); @@ -81,12 +76,12 @@ void PschInitGuc(void) { DefineCustomBoolVariable( "pg_stat_ch.enabled", // name "Enable or disable pg_stat_ch query telemetry collection.", // short_desc - nullptr, // long_desc + NULL, // long_desc &psch_enabled, // valueAddr true, // bootValue PGC_SIGHUP, // context 0, // flags - nullptr, nullptr, nullptr); // hooks + NULL, NULL, NULL); // hooks DefineCustomBoolVariable( "pg_stat_ch.use_otel", @@ -96,131 +91,131 @@ void PschInitGuc(void) { false, PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.clickhouse_host", "ClickHouse server hostname.", - nullptr, + NULL, &psch_clickhouse_host, "localhost", PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.clickhouse_port", "ClickHouse server native protocol port.", - nullptr, + NULL, &psch_clickhouse_port, 9000, // bootValue 1, 65535, // min, max PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.clickhouse_user", "ClickHouse user name.", - nullptr, + NULL, &psch_clickhouse_user, "default", PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.clickhouse_password", "ClickHouse user password.", - nullptr, + NULL, &psch_clickhouse_password, "", PGC_POSTMASTER, GUC_SUPERUSER_ONLY, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.clickhouse_database", "ClickHouse database name for telemetry storage.", - nullptr, + NULL, &psch_clickhouse_database, "pg_stat_ch", PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomBoolVariable( "pg_stat_ch.clickhouse_use_tls", "Enable TLS for ClickHouse connections.", - nullptr, + NULL, &psch_clickhouse_use_tls, false, PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomBoolVariable( "pg_stat_ch.clickhouse_skip_tls_verify", "Skip TLS certificate verification (insecure, for testing only).", - nullptr, + NULL, &psch_clickhouse_skip_tls_verify, false, PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.otel_endpoint", "OpenTelemetry gRPC endpoint (host:port).", - nullptr, + NULL, &psch_otel_endpoint, "localhost:4317", PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.hostname", "Override the hostname of the current machine.", - nullptr, + NULL, &psch_hostname, "", PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.queue_capacity", "Maximum number of events in the shared memory queue (must be a power of 2).", - nullptr, + NULL, &psch_queue_capacity, 131072, // bootValue 1024, 4194304, // min, max PGC_POSTMASTER, 0, - check_psch_queue_capacity, nullptr, nullptr); + check_psch_queue_capacity, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.flush_interval_ms", "Interval in milliseconds between ClickHouse export batches.", - nullptr, + NULL, &psch_flush_interval_ms, 200, // bootValue 100, 60000, // min, max PGC_SIGHUP, GUC_UNIT_MS, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.batch_max", "Maximum number of events per ClickHouse insert batch.", - nullptr, + NULL, &psch_batch_max, 200000, // bootValue 1, 1000000, // min, max PGC_SIGHUP, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_log_queue_size", @@ -232,7 +227,7 @@ void PschInitGuc(void) { 512, 1048576, // min, max PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_log_batch_size", @@ -244,7 +239,7 @@ void PschInitGuc(void) { 1, 131072, // min, max PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_log_max_bytes", @@ -257,7 +252,7 @@ void PschInitGuc(void) { 65536, 64 * 1024 * 1024, // min: 64 KiB, max: 64 MiB PGC_POSTMASTER, GUC_UNIT_BYTE, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_log_delay_ms", @@ -269,7 +264,7 @@ void PschInitGuc(void) { 10, 60000, // min, max PGC_POSTMASTER, GUC_UNIT_MS, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_metric_interval_ms", @@ -281,7 +276,7 @@ void PschInitGuc(void) { 100, 300000, // min, max (100ms to 5min) PGC_POSTMASTER, GUC_UNIT_MS, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomEnumVariable( "pg_stat_ch.log_min_elevel", @@ -290,10 +285,10 @@ void PschInitGuc(void) { "'error' for errors only, or 'debug5' for all messages.", &psch_log_min_elevel, WARNING, - log_elevel_options.data(), + log_elevel_options, PGC_SUSET, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomBoolVariable( "pg_stat_ch.debug_force_locked_overflow", "Force HandleOverflow in locked path (debug/test only).", @@ -303,10 +298,8 @@ void PschInitGuc(void) { false, PGC_SUSET, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); // clang-format on EmitWarningsOnPlaceholders("pg_stat_ch"); } - -} // extern "C" diff --git a/src/export/stats_exporter.cc b/src/export/stats_exporter.cc index e11af99..b961fc7 100644 --- a/src/export/stats_exporter.cc +++ b/src/export/stats_exporter.cc @@ -93,6 +93,7 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor auto col_username = exporter->DbUserColumn(); auto col_pid = exporter->RecordInt32("pid"); auto col_query_id = exporter->RecordInt64("query_id"); + auto col_parent_query_id = exporter->RecordUInt64("parent_query_id"); auto col_cmd_type = exporter->DbOperationColumn(); auto col_rows = exporter->MetricUInt64("rows"); auto col_query = exporter->DbQueryTextColumn(); @@ -148,6 +149,7 @@ void ExportEventStats(const std::vector& events, StatsExporter* expor col_username->Append(std::string(ev.username, ev.username_len)); col_pid->Append(ev.pid); col_query_id->Append(static_cast(ev.queryid)); + col_parent_query_id->Append(static_cast(ev.parent_query_id)); col_cmd_type->Append(CmdTypeToString(ev.cmd_type)); col_rows->Append(ev.rows); diff --git a/src/hooks/hooks.cc b/src/hooks/hooks.c similarity index 68% rename from src/hooks/hooks.cc rename to src/hooks/hooks.c index c76bfc8..8a4c412 100644 --- a/src/hooks/hooks.cc +++ b/src/hooks/hooks.c @@ -1,9 +1,7 @@ // pg_stat_ch executor hooks implementation -#include #include -extern "C" { #include "postgres.h" #include "access/parallel.h" @@ -30,7 +28,6 @@ extern "C" { #if PG_VERSION_NUM >= 150000 #include "jit/jit.h" #endif -} #include "config/guc.h" #include "hooks/hooks.h" @@ -40,29 +37,39 @@ extern "C" { #include "queue/shmem.h" // Previous hook values for chaining -static post_parse_analyze_hook_type prev_post_parse_analyze = nullptr; -static ExecutorStart_hook_type prev_executor_start = nullptr; -static ExecutorRun_hook_type prev_executor_run = nullptr; -static ExecutorFinish_hook_type prev_executor_finish = nullptr; -static ExecutorEnd_hook_type prev_executor_end = nullptr; -static ProcessUtility_hook_type prev_process_utility = nullptr; -static emit_log_hook_type prev_emit_log_hook = nullptr; - -// Track nesting level to identify top-level queries -static int nesting_level = 0; - -// CPU time tracking via getrusage -static struct rusage rusage_start; +static post_parse_analyze_hook_type prev_post_parse_analyze = NULL; +static ExecutorStart_hook_type prev_executor_start = NULL; +static ExecutorRun_hook_type prev_executor_run = NULL; +static ExecutorFinish_hook_type prev_executor_finish = NULL; +static ExecutorEnd_hook_type prev_executor_end = NULL; +static ProcessUtility_hook_type prev_process_utility = NULL; +static emit_log_hook_type prev_emit_log_hook = NULL; + +// Per-query frame pushed at ExecutorStart / PschProcessUtility and popped at +// ExecutorEnd / end of PschProcessUtility. The stack naturally tracks nesting +// depth and provides each query with its own rusage baseline and start time, +// fixing the prior single-static design that produced overlapping CPU deltas +// for nested SPI calls. +// +// Fixed-size to avoid heap allocation: malloc can fail in OOM conditions +// with effects that are incompatible with PostgreSQL's longjmp-based error +// handling. Real-world nesting depth is small; 64 levels is a generous +// ceiling even for deeply recursive plpgsql. +typedef struct PschQueryFrame { + struct rusage rusage_start; + TimestampTz query_start_ts; + uint64 queryid; // used as parent_query_id by the next inner query + int64 cpu_user_us; // set at pop time + int64 cpu_sys_us; // set at pop time +} PschQueryFrame; + +#define PSCH_MAX_QUERY_NESTING_DEPTH 8 +static PschQueryFrame query_stack[PSCH_MAX_QUERY_NESTING_DEPTH]; +static int query_stack_depth = 0; // Deadlock prevention for emit_log_hook static bool disable_error_capture = false; -// Track whether the current query started at top level -static bool current_query_is_top_level = false; - -// Track query start time for duration calculation -static TimestampTz query_start_ts = 0; - // System initialization flag - set after hooks are installed and shmem is ready static bool system_init = false; @@ -71,7 +78,7 @@ static void ResolveNames(PschEvent* event); static uint8 CopyName(char* dst, size_t dst_size, const char* src) { size_t len = strlcpy(dst, src, dst_size); - return static_cast(Min(len, dst_size - 1)); + return (uint8)Min(len, dst_size - 1); } // Cache for session-stable values to avoid repeated catalog lookups on every query. @@ -80,7 +87,7 @@ static uint8 CopyName(char* dst, size_t dst_size, const char* src) { // re-resolved when userid changes (handles SET ROLE). This also carries the // session-local registry of normalized statements waiting to be consumed by // ExecutorEnd, ProcessUtility, or emit_log_hook. -struct PschBackendState { +typedef struct PschBackendState { bool initialized; char datname[NAMEDATALEN]; uint8 datname_len; @@ -90,8 +97,9 @@ struct PschBackendState { char client_addr[46]; // INET6_ADDRSTRLEN uint8 client_addr_len; PschNormalizedQueryState normalized_queries; -}; -static PschBackendState backend_state = {}; +} PschBackendState; + +static PschBackendState backend_state = {0}; // Resolve and cache the current username. On initial resolve, falls back to // "" if resolution fails. On SET ROLE re-resolve, keeps the existing @@ -99,7 +107,7 @@ static PschBackendState backend_state = {}; // leaves cached_userid unchanged so future calls can retry resolution. static void CacheUsername(Oid userid, bool fallback_on_null) { const char* username = GetUserNameFromId(userid, true); - if (username != nullptr) { + if (username != NULL) { backend_state.username_len = CopyName(backend_state.username, sizeof(backend_state.username), username); backend_state.cached_userid = userid; @@ -125,10 +133,10 @@ static void EnsureBackendCache(void) { // Database name (session-stable) const char* datname = get_database_name(MyDatabaseId); backend_state.datname_len = CopyName(backend_state.datname, sizeof(backend_state.datname), - datname != nullptr ? datname : ""); + datname != NULL ? datname : ""); // Client address (session-stable) - backend_state.client_addr_len = static_cast( + backend_state.client_addr_len = (uint8)( GetClientAddress(backend_state.client_addr, sizeof(backend_state.client_addr))); // Username (may change via SET ROLE) @@ -170,8 +178,8 @@ static PschCmdType ConvertCmdType(CmdType cmd) { } static int64 TimeDiffMicrosec(struct timeval end, struct timeval start) { - return (static_cast(end.tv_sec - start.tv_sec) * 1000000LL) + - static_cast(end.tv_usec - start.tv_usec); + return ((int64)(end.tv_sec - start.tv_sec) * 1000000LL) + + (int64)(end.tv_usec - start.tv_usec); } // Unpack SQLSTATE code from PostgreSQL's packed format to string @@ -184,7 +192,7 @@ static void UnpackSqlState(int sql_state, char* buf) { } static size_t TrimTrailing(char* str, size_t len) { - while (len > 0 && isspace(static_cast(str[len - 1]))) { + while (len > 0 && isspace((unsigned char)str[len - 1])) { len--; } str[len] = '\0'; @@ -195,15 +203,18 @@ static size_t TrimTrailing(char* str, size_t len) { // This avoids memmove by skipping leading whitespace BEFORE the copy. // Returns the final length of the trimmed string in dst. static size_t CopyTrimmed(char* dst, size_t dst_size, const char* src) { - if (src == nullptr || dst_size == 0) { + size_t src_len; + size_t len; + + if (src == NULL || dst_size == 0) { if (dst_size > 0) dst[0] = '\0'; return 0; } - while (*src != '\0' && isspace(static_cast(*src))) + while (*src != '\0' && isspace((unsigned char)*src)) src++; - size_t src_len = strlcpy(dst, src, dst_size); - size_t len = Min(src_len, dst_size - 1); + src_len = strlcpy(dst, src, dst_size); + len = Min(src_len, dst_size - 1); return TrimTrailing(dst, len); } @@ -217,7 +228,7 @@ static PgBackendStatus* GetBackendStatus(void) { int num_backends = pgstat_fetch_stat_numbackends(); for (int i = 1; i <= num_backends; i++) { local_beentry = pgstat_get_local_beentry_by_index(i); - if (local_beentry == nullptr) { + if (local_beentry == NULL) { continue; } PgBackendStatus* beentry = &local_beentry->backendStatus; @@ -225,19 +236,19 @@ static PgBackendStatus* GetBackendStatus(void) { return beentry; } } - return nullptr; + return NULL; #endif } static int GetApplicationName(char* buf, int buf_size) { // Try application_name GUC first (always up-to-date) - if (application_name != nullptr && application_name[0] != '\0') { - return static_cast(CopyTrimmed(buf, buf_size, application_name)); + if (application_name != NULL && application_name[0] != '\0') { + return (int)CopyTrimmed(buf, buf_size, application_name); } PgBackendStatus* beentry = GetBackendStatus(); - if (beentry != nullptr && beentry->st_appname != nullptr) { - return static_cast(CopyTrimmed(buf, buf_size, beentry->st_appname)); + if (beentry != NULL && beentry->st_appname != NULL) { + return (int)CopyTrimmed(buf, buf_size, beentry->st_appname); } buf[0] = '\0'; @@ -245,31 +256,35 @@ static int GetApplicationName(char* buf, int buf_size) { } static int GetClientAddress(char* buf, int buf_size) { + char remote_host[NI_MAXHOST]; + int ret; + buf[0] = '\0'; PgBackendStatus* beentry = GetBackendStatus(); - if (beentry == nullptr) { + if (beentry == NULL) { return 0; } - std::array remote_host{}; - - int ret = pg_getnameinfo_all(&beentry->st_clientaddr.addr, beentry->st_clientaddr.salen, - remote_host.data(), remote_host.size(), nullptr, 0, - NI_NUMERICHOST | NI_NUMERICSERV); + remote_host[0] = '\0'; + ret = pg_getnameinfo_all(&beentry->st_clientaddr.addr, beentry->st_clientaddr.salen, + remote_host, NI_MAXHOST, NULL, 0, + NI_NUMERICHOST | NI_NUMERICSERV); if (ret != 0 || remote_host[0] == '\0') { return 0; } // Handle local connections - if (strcmp(remote_host.data(), "[local]") == 0) { + if (strcmp(remote_host, "[local]") == 0) { size_t src_len = strlcpy(buf, "127.0.0.1", buf_size); - return static_cast(Min(src_len, static_cast(buf_size - 1))); + return (int)Min(src_len, (size_t)(buf_size - 1)); } - size_t src_len = strlcpy(buf, remote_host.data(), buf_size); - return static_cast(Min(src_len, static_cast(buf_size - 1))); + { + size_t src_len = strlcpy(buf, remote_host, buf_size); + return (int)Min(src_len, (size_t)(buf_size - 1)); + } } static void CopyBufferUsage(PschEvent* event, const BufferUsage* buf) { @@ -322,24 +337,24 @@ static void InitEventPartial(PschEvent* event) { // Zero mid section: application_name through query_len (~114 bytes) const size_t mid_offset = offsetof(PschEvent, application_name); const size_t mid_size = offsetof(PschEvent, query) - mid_offset; - memset(reinterpret_cast(event) + mid_offset, 0, mid_size); + memset((char*)event + mid_offset, 0, mid_size); event->query[0] = '\0'; } -static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, bool top_level, +static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, uint64 parent_query_id, PschCmdType cmd_type) { InitEventPartial(event); event->ts_start = ts_start; event->dbid = MyDatabaseId; event->userid = GetUserId(); event->pid = MyProcPid; - event->top_level = top_level; + event->parent_query_id = parent_query_id; event->cmd_type = cmd_type; ResolveNames(event); } static void CopyClientContext(PschEvent* event) { - event->application_name_len = static_cast( + event->application_name_len = (uint8)( GetApplicationName(event->application_name, sizeof(event->application_name))); EnsureBackendCache(); @@ -348,7 +363,7 @@ static void CopyClientContext(PschEvent* event) { event->client_addr_len = backend_state.client_addr_len; } else { event->client_addr_len = - static_cast(GetClientAddress(event->client_addr, sizeof(event->client_addr))); + (uint8)(GetClientAddress(event->client_addr, sizeof(event->client_addr))); } } @@ -358,19 +373,21 @@ static void CopyClientContext(PschEvent* event) { // stmt_len. CleanQuerytext trims that down to just the current statement, but // it does not parameterize literals. This helper is the raw-text fallback when // we have no normalized entry for the statement. -static void CopyRawStatementText(PschEvent* event, const PschStatementKey& statement_key) { - if (statement_key.source_text == nullptr) { +static void CopyRawStatementText(PschEvent* event, PschStatementKey statement_key) { + const char* query_text; + + if (statement_key.source_text == NULL) { return; } - const char* query_text = statement_key.source_text; + query_text = statement_key.source_text; if (statement_key.stmt_location >= 0) { int query_loc = statement_key.stmt_location; int query_len = statement_key.stmt_len; query_text = CleanQuerytext(query_text, &query_loc, &query_len); } - event->query_len = static_cast(CopyTrimmed(event->query, PSCH_MAX_QUERY_LEN, query_text)); + event->query_len = (uint16)CopyTrimmed(event->query, PSCH_MAX_QUERY_LEN, query_text); } // Copy query text into the event buffer, preferring a previously normalized @@ -381,7 +398,7 @@ static void CopyRawStatementText(PschEvent* event, const PschStatementKey& state // normalized entry stashed at parse time. If no match exists, it falls back to // CopyRawStatementText, which preserves the literal SQL text for the current // statement only. -static void CopyQueryText(PschEvent* event, const PschStatementKey& statement_key) { +static void CopyQueryText(PschEvent* event, PschStatementKey statement_key) { if (PschCopyNormalizedQueryForStatement(&backend_state.normalized_queries, event->query, sizeof(event->query), &event->query_len, statement_key, false)) { @@ -406,92 +423,128 @@ static void ResolveNames(PschEvent* event) { } // Fallback: resolve fresh (cache not yet initialized, e.g. emit_log_hook early) - const char* datname = nullptr; - const char* username = nullptr; + const char* datname = NULL; + const char* username = NULL; if (IsTransactionState()) { datname = get_database_name(event->dbid); username = GetUserNameFromId(event->userid, true); } event->datname_len = - CopyName(event->datname, sizeof(event->datname), datname != nullptr ? datname : ""); + CopyName(event->datname, sizeof(event->datname), datname != NULL ? datname : ""); event->username_len = CopyName(event->username, sizeof(event->username), - username != nullptr ? username : ""); + username != NULL ? username : ""); } // Copy JIT instrumentation to event (PG15+) -static void CopyJitInstrumentation([[maybe_unused]] PschEvent* event, - [[maybe_unused]] QueryDesc* query_desc) { +static void CopyJitInstrumentation(PschEvent* event, QueryDesc* query_desc) { #if PG_VERSION_NUM >= 150000 - if (query_desc->estate->es_jit != nullptr) { + if (query_desc->estate->es_jit != NULL) { JitInstrumentation* jit = &query_desc->estate->es_jit->instr; - event->jit_functions = static_cast(jit->created_functions); + event->jit_functions = (int32)jit->created_functions; event->jit_generation_time_us = - static_cast(INSTR_TIME_GET_MICROSEC(jit->generation_counter)); + (int32)INSTR_TIME_GET_MICROSEC(jit->generation_counter); event->jit_inlining_time_us = - static_cast(INSTR_TIME_GET_MICROSEC(jit->inlining_counter)); + (int32)INSTR_TIME_GET_MICROSEC(jit->inlining_counter); event->jit_optimization_time_us = - static_cast(INSTR_TIME_GET_MICROSEC(jit->optimization_counter)); + (int32)INSTR_TIME_GET_MICROSEC(jit->optimization_counter); event->jit_emission_time_us = - static_cast(INSTR_TIME_GET_MICROSEC(jit->emission_counter)); + (int32)INSTR_TIME_GET_MICROSEC(jit->emission_counter); #if PG_VERSION_NUM >= 170000 - event->jit_deform_time_us = static_cast(INSTR_TIME_GET_MICROSEC(jit->deform_counter)); + event->jit_deform_time_us = (int32)INSTR_TIME_GET_MICROSEC(jit->deform_counter); #endif } +#else + (void)event; + (void)query_desc; #endif } // Copy parallel worker info to event (PG18+) -static void CopyParallelWorkerInfo([[maybe_unused]] PschEvent* event, - [[maybe_unused]] QueryDesc* query_desc) { +static void CopyParallelWorkerInfo(PschEvent* event, QueryDesc* query_desc) { #if PG_VERSION_NUM >= 180000 - if (query_desc->estate != nullptr) { + if (query_desc->estate != NULL) { event->parallel_workers_planned = - static_cast(query_desc->estate->es_parallel_workers_to_launch); + (int16)query_desc->estate->es_parallel_workers_to_launch; event->parallel_workers_launched = - static_cast(query_desc->estate->es_parallel_workers_launched); + (int16)query_desc->estate->es_parallel_workers_launched; } +#else + (void)event; + (void)query_desc; #endif } -static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, int64 cpu_user_us, - int64 cpu_sys_us) { - InitBaseEvent(event, query_start_ts, current_query_is_top_level, - ConvertCmdType(query_desc->operation)); +// Push a frame onto the query stack, capturing timestamp and rusage baseline. +// If depth exceeds the cap the frame is not written, but depth is still incremented +// so the corresponding pop stays balanced. +static void QueryStackPush(uint64 queryid) { + if (query_stack_depth < PSCH_MAX_QUERY_NESTING_DEPTH) { + query_stack[query_stack_depth].queryid = queryid; + query_stack[query_stack_depth].query_start_ts = GetCurrentTimestamp(); + getrusage(RUSAGE_SELF, &query_stack[query_stack_depth].rusage_start); + } + query_stack_depth++; +} + +// Pop a frame from the query stack and compute its CPU delta in place. +// Returns a pointer into the array, or null if the stack was empty or depth +// had exceeded the cap (no frame was written). cpu_user_us/cpu_sys_us are +// zeroed on failure or null frame. +static PschQueryFrame* QueryStackPop() { + if (query_stack_depth == 0) + return NULL; + query_stack_depth--; + if (query_stack_depth >= PSCH_MAX_QUERY_NESTING_DEPTH) + return NULL; + PschQueryFrame* frame = &query_stack[query_stack_depth]; + frame->cpu_user_us = 0; + frame->cpu_sys_us = 0; + struct rusage rusage_end; + if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { + frame->cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, frame->rusage_start.ru_utime); + frame->cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, frame->rusage_start.ru_stime); + } + return frame; +} + +static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, + const PschQueryFrame* frame, uint64 parent_query_id) { + PschStatementKey statement_key; + TimestampTz query_start_ts = frame ? frame->query_start_ts : 0; + InitBaseEvent(event, query_start_ts, parent_query_id, ConvertCmdType(query_desc->operation)); event->queryid = query_desc->plannedstmt->queryId; event->rows = query_desc->estate->es_processed; - event->cpu_user_time_us = cpu_user_us; - event->cpu_sys_time_us = cpu_sys_us; + event->cpu_user_time_us = frame ? frame->cpu_user_us : 0; + event->cpu_sys_time_us = frame ? frame->cpu_sys_us : 0; // Instrumentation data (duration, buffer, WAL) - if (query_desc->totaltime != nullptr) { + if (query_desc->totaltime != NULL) { #if PG_VERSION_NUM >= 190000 - event->duration_us = static_cast(INSTR_TIME_GET_MICROSEC(query_desc->totaltime->total)); + event->duration_us = (uint64)INSTR_TIME_GET_MICROSEC(query_desc->totaltime->total); #else - event->duration_us = static_cast(query_desc->totaltime->total * 1000000.0); + event->duration_us = (uint64)(query_desc->totaltime->total * 1000000.0); #endif CopyBufferUsage(event, &query_desc->totaltime->bufusage); CopyIoTiming(event, &query_desc->totaltime->bufusage); CopyWalUsage(event, &query_desc->totaltime->walusage); } else { - event->duration_us = static_cast(GetCurrentTimestamp() - query_start_ts); + event->duration_us = (uint64)(GetCurrentTimestamp() - query_start_ts); } CopyJitInstrumentation(event, query_desc); CopyParallelWorkerInfo(event, query_desc); CopyClientContext(event); - const PschStatementKey statement_key = + statement_key = PschMakeStatementKey(query_desc->sourceText, query_desc->plannedstmt->stmt_location, query_desc->plannedstmt->stmt_len); CopyQueryText(event, statement_key); } -extern "C" { - // Remove a pending normalized entry for one statement when execution exits // without building a normal executor/utility event from it. static void ForgetNormalizedStatement(const char* source_text, int stmt_location, int stmt_len) { - const PschStatementKey statement_key = PschMakeStatementKey(source_text, stmt_location, stmt_len); + PschStatementKey statement_key = PschMakeStatementKey(source_text, stmt_location, stmt_len); PschForgetNormalizedQueryForStatement(&backend_state.normalized_queries, statement_key); } @@ -499,12 +552,12 @@ static void ForgetNormalizedStatement(const char* source_text, int stmt_location // The JumbleState (with constant locations) is only available here, so we // must generate the normalized text now and stash it for ExecutorEnd. static void PschPostParseAnalyze(ParseState* pstate, Query* query, JumbleState* jstate) { - if (prev_post_parse_analyze != nullptr) { + if (prev_post_parse_analyze != NULL) { prev_post_parse_analyze(pstate, query, jstate); } // Only normalize if enabled and the query has constants to replace. - if (!psch_enabled || IsParallelWorker() || jstate == nullptr || jstate->clocations_count <= 0) { + if (!psch_enabled || IsParallelWorker() || jstate == NULL || jstate->clocations_count <= 0) { return; } @@ -523,8 +576,8 @@ static void PschPostParseAnalyze(ParseState* pstate, Query* query, JumbleState* // ExecutorEnd or ProcessUtility copies it into the exported event. MemoryContext oldcxt = MemoryContextSwitchTo(TopMemoryContext); char* normalized_query = PschNormalizeQuery(query_text, query_loc, &query_len, jstate); - if (normalized_query != nullptr) { - const PschStatementKey statement_key = + if (normalized_query != NULL) { + PschStatementKey statement_key = PschMakeStatementKey(source_text, stmt_location, stmt_len); PschRememberNormalizedQuery(&backend_state.normalized_queries, statement_key, normalized_query, query_len); @@ -534,7 +587,7 @@ static void PschPostParseAnalyze(ParseState* pstate, Query* query, JumbleState* static void PschExecutorStart(QueryDesc* query_desc, int eflags) { if (IsParallelWorker()) { - if (prev_executor_start != nullptr) { + if (prev_executor_start != NULL) { prev_executor_start(query_desc, eflags); } else { standard_ExecutorStart(query_desc, eflags); @@ -542,26 +595,20 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { return; } - // Record if this is a top-level query (before nesting_level changes in Run) - if (nesting_level == 0) { - current_query_is_top_level = true; - query_start_ts = GetCurrentTimestamp(); - // Capture CPU time baseline for top-level queries - if (psch_enabled) { - getrusage(RUSAGE_SELF, &rusage_start); - } - } else { - current_query_is_top_level = false; - } + // Push a per-query frame. Each frame captures its own rusage baseline so + // that nested SPI calls each get an accurate, non-overlapping CPU delta. + // The stack depth implicitly tracks nesting; the frame below us on the stack + // supplies the parent_query_id for this query's event. + QueryStackPush(query_desc->plannedstmt->queryId); - if (prev_executor_start != nullptr) { + if (prev_executor_start != NULL) { prev_executor_start(query_desc, eflags); } else { standard_ExecutorStart(query_desc, eflags); } if (psch_enabled && query_desc->plannedstmt->queryId != UINT64CONST(0)) { - if (query_desc->totaltime == nullptr) { + if (query_desc->totaltime == NULL) { MemoryContext oldcxt = MemoryContextSwitchTo(query_desc->estate->es_query_cxt); #if PG_VERSION_NUM < 140000 query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL); @@ -575,78 +622,52 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { #if PG_VERSION_NUM >= 180000 static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint64 count) { + if (prev_executor_run != NULL) { + prev_executor_run(query_desc, direction, count); + } else { + standard_ExecutorRun(query_desc, direction, count); + } +} #else static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint64 count, bool execute_once) { -#endif - if (IsParallelWorker()) { -#if PG_VERSION_NUM >= 180000 - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count); - } else { - standard_ExecutorRun(query_desc, direction, count); - } -#else - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count, execute_once); - } else { - standard_ExecutorRun(query_desc, direction, count, execute_once); - } -#endif - return; + if (prev_executor_run != NULL) { + prev_executor_run(query_desc, direction, count, execute_once); + } else { + standard_ExecutorRun(query_desc, direction, count, execute_once); } - - nesting_level++; - PG_TRY(); - { -#if PG_VERSION_NUM >= 180000 - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count); - } else { - standard_ExecutorRun(query_desc, direction, count); - } -#else - if (prev_executor_run != nullptr) { - prev_executor_run(query_desc, direction, count, execute_once); - } else { - standard_ExecutorRun(query_desc, direction, count, execute_once); - } +} #endif + +static void PschExecutorFinish(QueryDesc* query_desc) { + if (prev_executor_finish != NULL) { + prev_executor_finish(query_desc); + } else { + standard_ExecutorFinish(query_desc); } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); } -static void PschExecutorFinish(QueryDesc* query_desc) { +static void PschExecutorEnd(QueryDesc* query_desc) { + // Parallel workers never push a frame — handle them separately. if (IsParallelWorker()) { - if (prev_executor_finish != nullptr) { - prev_executor_finish(query_desc); + ForgetNormalizedStatement(query_desc->sourceText, query_desc->plannedstmt->stmt_location, + query_desc->plannedstmt->stmt_len); + if (prev_executor_end != NULL) { + prev_executor_end(query_desc); } else { - standard_ExecutorFinish(query_desc); + standard_ExecutorEnd(query_desc); } return; } - nesting_level++; - PG_TRY(); - { - if (prev_executor_finish != nullptr) { - prev_executor_finish(query_desc); - } else { - standard_ExecutorFinish(query_desc); - } - } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); -} + // Pop the frame pushed in ExecutorStart. Null if depth exceeded the cap — + // the event is still emitted but with zero CPU/timestamp. + const PschQueryFrame* frame = QueryStackPop(); -static void PschExecutorEnd(QueryDesc* query_desc) { - if (!psch_enabled || IsParallelWorker() || query_desc->plannedstmt->queryId == UINT64CONST(0)) { + if (!psch_enabled || query_desc->plannedstmt->queryId == UINT64CONST(0)) { ForgetNormalizedStatement(query_desc->sourceText, query_desc->plannedstmt->stmt_location, query_desc->plannedstmt->stmt_len); - if (prev_executor_end != nullptr) { + if (prev_executor_end != NULL) { prev_executor_end(query_desc); } else { standard_ExecutorEnd(query_desc); @@ -654,24 +675,22 @@ static void PschExecutorEnd(QueryDesc* query_desc) { return; } - if (query_desc->totaltime != nullptr) { + if (query_desc->totaltime != NULL) { InstrEndLoop(query_desc->totaltime); } - // Compute CPU time delta from getrusage - int64 cpu_user_us = 0; - int64 cpu_sys_us = 0; - struct rusage rusage_end; - if (getrusage(RUSAGE_SELF, &rusage_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_end.ru_utime, rusage_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_end.ru_stime, rusage_start.ru_stime); - } + // The frame below us on the stack is our caller; its queryid is our parent. + int parent_idx = query_stack_depth - 1; + uint64 parent_query_id = + (parent_idx >= 0 && parent_idx < PSCH_MAX_QUERY_NESTING_DEPTH) + ? query_stack[parent_idx].queryid + : 0; PschEvent event; - BuildEventFromQueryDesc(query_desc, &event, cpu_user_us, cpu_sys_us); + BuildEventFromQueryDesc(query_desc, &event, frame, parent_query_id); PschEnqueueEvent(&event); - if (prev_executor_end != nullptr) { + if (prev_executor_end != NULL) { prev_executor_end(query_desc); } else { standard_ExecutorEnd(query_desc); @@ -679,21 +698,22 @@ static void PschExecutorEnd(QueryDesc* query_desc) { } // Build a PschEvent for utility statements (no QueryDesc available) -static void BuildEventForUtility(PschEvent* event, const char* queryString, TimestampTz start_ts, - int stmt_location, int stmt_len, uint64 duration_us, - bool is_top_level, uint64 rows, BufferUsage* bufusage, - WalUsage* walusage, int64 cpu_user_us, int64 cpu_sys_us) { - InitBaseEvent(event, start_ts, is_top_level, PSCH_CMD_UTILITY); +static void BuildEventForUtility(PschEvent* event, const char* queryString, + const PschQueryFrame* frame, int stmt_location, int stmt_len, + uint64 duration_us, uint64 parent_query_id, uint64 rows, + BufferUsage* bufusage, WalUsage* walusage) { + PschStatementKey statement_key; + InitBaseEvent(event, frame ? frame->query_start_ts : 0, parent_query_id, PSCH_CMD_UTILITY); event->duration_us = duration_us; event->rows = rows; - event->cpu_user_time_us = cpu_user_us; - event->cpu_sys_time_us = cpu_sys_us; + event->cpu_user_time_us = frame ? frame->cpu_user_us : 0; + event->cpu_sys_time_us = frame ? frame->cpu_sys_us : 0; CopyBufferUsage(event, bufusage); CopyIoTiming(event, bufusage); CopyWalUsage(event, walusage); CopyClientContext(event); - const PschStatementKey statement_key = PschMakeStatementKey(queryString, stmt_location, stmt_len); + statement_key = PschMakeStatementKey(queryString, stmt_location, stmt_len); CopyQueryText(event, statement_key); } @@ -732,7 +752,7 @@ static bool ShouldTrackUtility(Node* parsetree) { } static uint64 GetUtilityRowCount(QueryCompletion* qc) { - if (qc == nullptr) { + if (qc == NULL) { return 0; } switch (qc->commandTag) { @@ -746,21 +766,6 @@ static uint64 GetUtilityRowCount(QueryCompletion* qc) { } } -static void ExecuteUtilityWithNesting(PlannedStmt* pstmt, const char* queryString, -#if PG_VERSION_NUM >= 140000 - bool readOnlyTree, -#endif - ProcessUtilityContext context, ParamListInfo params, - QueryEnvironment* queryEnv, DestReceiver* dest, - QueryCompletion* qc) { - nesting_level++; - PG_TRY(); - { CALL_PROCESS_UTILITY(); } - PG_FINALLY(); - { nesting_level--; } - PG_END_TRY(); -} - // ProcessUtility hook - captures DDL and utility statements #if PG_VERSION_NUM >= 140000 static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, bool readOnlyTree, @@ -781,28 +786,33 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, return; } - // Capture state before execution - bool is_top_level = (nesting_level == 0); - TimestampTz start_ts = GetCurrentTimestamp(); + // Capture parent before pushing our own frame, then push so that any + // executor hooks fired from within this utility (e.g. CREATE TABLE AS SELECT) + // see us on the stack and correctly link themselves as our children. + int parent_idx = query_stack_depth - 1; + uint64 parent_query_id = + (parent_idx >= 0 && parent_idx < PSCH_MAX_QUERY_NESTING_DEPTH) + ? query_stack[parent_idx].queryid + : 0; + + QueryStackPush(0); // utility statements have no queryId + int stmt_location = pstmt->stmt_location; int stmt_len = pstmt->stmt_len; BufferUsage bufusage_start = pgBufferUsage; WalUsage walusage_start = pgWalUsage; - struct rusage rusage_util_start; - getrusage(RUSAGE_SELF, &rusage_util_start); instr_time start_time; INSTR_TIME_SET_CURRENT(start_time); -#if PG_VERSION_NUM >= 140000 - ExecuteUtilityWithNesting(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, qc); -#else - ExecuteUtilityWithNesting(pstmt, queryString, context, params, queryEnv, dest, qc); -#endif + CALL_PROCESS_UTILITY(); instr_time duration; INSTR_TIME_SET_CURRENT(duration); INSTR_TIME_SUBTRACT(duration, start_time); + // Pop our frame. Null if depth exceeded the cap. + const PschQueryFrame* popped = QueryStackPop(); + BufferUsage bufusage_delta; WalUsage walusage_delta; MemSet(&bufusage_delta, 0, sizeof(BufferUsage)); @@ -810,18 +820,10 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, BufferUsageAccumDiff(&bufusage_delta, &pgBufferUsage, &bufusage_start); WalUsageAccumDiff(&walusage_delta, &pgWalUsage, &walusage_start); - int64 cpu_user_us = 0; - int64 cpu_sys_us = 0; - struct rusage rusage_util_end; - if (getrusage(RUSAGE_SELF, &rusage_util_end) == 0) { - cpu_user_us = TimeDiffMicrosec(rusage_util_end.ru_utime, rusage_util_start.ru_utime); - cpu_sys_us = TimeDiffMicrosec(rusage_util_end.ru_stime, rusage_util_start.ru_stime); - } - PschEvent event; - BuildEventForUtility(&event, queryString, start_ts, stmt_location, stmt_len, - INSTR_TIME_GET_MICROSEC(duration), is_top_level, GetUtilityRowCount(qc), - &bufusage_delta, &walusage_delta, cpu_user_us, cpu_sys_us); + BuildEventForUtility(&event, queryString, popped, stmt_location, stmt_len, + INSTR_TIME_GET_MICROSEC(duration), parent_query_id, GetUtilityRowCount(qc), + &bufusage_delta, &walusage_delta); PschEnqueueEvent(&event); } @@ -831,7 +833,7 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, // Returns false during early initialization, in background workers, or when disabled. static bool ShouldCaptureLog(ErrorData* edata) { // Basic preconditions - if (edata == nullptr || !system_init || !psch_enabled || disable_error_capture) { + if (edata == NULL || !system_init || !psch_enabled || disable_error_capture) { return false; } @@ -841,7 +843,7 @@ static bool ShouldCaptureLog(ErrorData* edata) { } // PostgreSQL bootstrapping checks - MyProc indicates PGPROC allocation complete - if (MyProc == nullptr || IsParallelWorker()) { + if (MyProc == NULL || IsParallelWorker()) { return false; } @@ -850,8 +852,8 @@ static bool ShouldCaptureLog(ErrorData* edata) { // - IsUnderPostmaster: not single-user mode or bootstrap // - psch_shared_state: shared memory must be ready // - MyBgworkerEntry: skip background workers (not user queries) - if (MyDatabaseId == InvalidOid || !IsUnderPostmaster || psch_shared_state == nullptr || - MyBgworkerEntry != nullptr) { + if (MyDatabaseId == InvalidOid || !IsUnderPostmaster || psch_shared_state == NULL || + MyBgworkerEntry != NULL) { return false; } @@ -867,14 +869,16 @@ static bool ShouldCaptureLog(ErrorData* edata) { // message, SQLSTATE, and client/session metadata. static void CaptureLogEvent(ErrorData* edata) { PschEvent event; - InitBaseEvent(&event, GetCurrentTimestamp(), (nesting_level == 0), PSCH_CMD_UNKNOWN); + uint64 parent_query_id = + query_stack_depth > 0 ? query_stack[query_stack_depth - 1].queryid : 0; + InitBaseEvent(&event, GetCurrentTimestamp(), parent_query_id, PSCH_CMD_UNKNOWN); UnpackSqlState(edata->sqlerrcode, event.err_sqlstate); - event.err_elevel = static_cast(edata->elevel); + event.err_elevel = (uint8)edata->elevel; - if (edata->message != nullptr) { + if (edata->message != NULL) { event.err_message_len = - static_cast(CopyTrimmed(event.err_message, PSCH_MAX_ERR_MSG_LEN, edata->message)); + (uint16)CopyTrimmed(event.err_message, PSCH_MAX_ERR_MSG_LEN, edata->message); } CopyClientContext(&event); @@ -892,7 +896,7 @@ static void CaptureLogEvent(ErrorData* edata) { // This ensures we capture the final, potentially modified log message. static void PschEmitLogHook(ErrorData* edata) { // Chain to previous hook first (allows log transformation by other extensions) - if (prev_emit_log_hook != nullptr) { + if (prev_emit_log_hook != NULL) { prev_emit_log_hook(edata); } @@ -902,7 +906,7 @@ static void PschEmitLogHook(ErrorData* edata) { } // Reset recursion guard on ERROR+ (transaction will abort) - if (edata != nullptr && edata->elevel >= ERROR) { + if (edata != NULL && edata->elevel >= ERROR) { disable_error_capture = false; } } @@ -940,5 +944,3 @@ void PschInstallHooks(void) { // Mark system as initialized - emit_log_hook will now capture messages system_init = true; } - -} // extern "C" diff --git a/src/hooks/query_normalize.cc b/src/hooks/query_normalize.c similarity index 90% rename from src/hooks/query_normalize.cc rename to src/hooks/query_normalize.c index a285aa9..75b2891 100644 --- a/src/hooks/query_normalize.cc +++ b/src/hooks/query_normalize.c @@ -4,20 +4,18 @@ // are static in contrib/pg_stat_statements/pg_stat_statements.c. We reproduce // them here so pg_stat_ch can normalize independently of pg_stat_statements. -extern "C" { #include "postgres.h" #include "lib/stringinfo.h" #include "nodes/queryjumble.h" #include "parser/scanner.h" -} #include "hooks/query_normalize.h" // Comparator for qsorting LocationLen structs by location. static int CompLocation(const void* a, const void* b) { - int l = (static_cast(a))->location; - int r = (static_cast(b))->location; + int l = ((const LocationLen*)a)->location; + int r = ((const LocationLen*)b)->location; if (l < r) { return -1; } @@ -33,11 +31,14 @@ static bool ShouldPreserveExternalParam(const JumbleState* jstate, int index) { return jstate->clocations[index].extern_param && !jstate->has_squashed_lists; } #else -static bool IsSquashedConstant(const LocationLen* /* loc */) { +static bool IsSquashedConstant(const LocationLen* loc) { + (void)loc; return false; } -static bool ShouldPreserveExternalParam(const JumbleState* /* jstate */, int /* index */) { +static bool ShouldPreserveExternalParam(const JumbleState* jstate, int index) { + (void)jstate; + (void)index; return false; } #endif @@ -95,7 +96,7 @@ static void FillInConstantLengths(JumbleState* jstate, const char* query, int qu break; } } - locs[i].length = static_cast(strlen(yyextra.scanbuf + loc)); + locs[i].length = (int)strlen(yyextra.scanbuf + loc); break; } } @@ -109,8 +110,8 @@ static void FillInConstantLengths(JumbleState* jstate, const char* query, int qu } char* PschNormalizeQuery(const char* query, int query_loc, int* query_len_p, JumbleState* jstate) { - if (jstate == nullptr || jstate->clocations_count <= 0) { - return nullptr; + if (jstate == NULL || jstate->clocations_count <= 0) { + return NULL; } int query_len = *query_len_p; diff --git a/src/hooks/query_normalize_state.cc b/src/hooks/query_normalize_state.c similarity index 59% rename from src/hooks/query_normalize_state.cc rename to src/hooks/query_normalize_state.c index 914f51e..a97fbd7 100644 --- a/src/hooks/query_normalize_state.cc +++ b/src/hooks/query_normalize_state.c @@ -1,26 +1,21 @@ // Statement-scoped normalized query registry. -#include -#include -#include - -#include "absl/hash/hash.h" - -extern "C" { #include "postgres.h" +#include "common/hashfn.h" #include "utils/memutils.h" -} #include "hooks/query_normalize_state.h" -struct PschStatementKeyHash { - size_t operator()(const PschStatementKey& key) const { - return absl::HashOf( - std::string_view(key.source_text != nullptr ? key.source_text : "", key.source_text_len), - key.stmt_location, key.stmt_len); - } -}; +// Compute a hash for a statement key combining text, location, and length. +// Used as a fast prefilter before bytewise comparison. +static size_t ComputeStatementHash(const char* source_text, size_t source_text_len, + int stmt_location, int stmt_len) { + uint32 h = hash_bytes((const unsigned char*)source_text, (int)source_text_len); + h = h * 31 + (uint32)stmt_location; + h = h * 31 + (uint32)stmt_len; + return (size_t)h; +} struct PschNormalizedQueryEntry { PschStatementKey key; @@ -31,15 +26,20 @@ struct PschNormalizedQueryEntry { }; PschStatementKey PschMakeStatementKey(const char* source_text, int stmt_location, int stmt_len) { - PschStatementKey key = {source_text, source_text != nullptr ? strlen(source_text) : 0, 0, - stmt_location, stmt_len}; - key.statement_hash = (source_text != nullptr) ? PschStatementKeyHash{}(key) : 0; + PschStatementKey key; + key.source_text = source_text; + key.source_text_len = source_text != NULL ? strlen(source_text) : 0; + key.stmt_location = stmt_location; + key.stmt_len = stmt_len; + key.statement_hash = + (source_text != NULL) + ? ComputeStatementHash(source_text, key.source_text_len, stmt_location, stmt_len) + : 0; return key; } -static bool ExactStatementMatch(const PschNormalizedQueryEntry* entry, - const PschStatementKey& key) { - if (entry == nullptr || key.source_text == nullptr) { +static bool ExactStatementMatch(const PschNormalizedQueryEntry* entry, PschStatementKey key) { + if (entry == NULL || key.source_text == NULL) { return false; } @@ -58,7 +58,7 @@ static bool ExactStatementMatch(const PschNormalizedQueryEntry* entry, static void RemoveEntry(PschNormalizedQueryState* state, PschNormalizedQueryEntry* prev, PschNormalizedQueryEntry* entry) { - if (prev != nullptr) { + if (prev != NULL) { prev->next = entry->next; } else { state->head = entry->next; @@ -71,24 +71,28 @@ static void RemoveEntry(PschNormalizedQueryState* state, PschNormalizedQueryEntr static bool CopyEntryText(const PschNormalizedQueryEntry* entry, char* dst, size_t dst_size, uint16* out_len) { - if (entry == nullptr || dst == nullptr || dst_size == 0 || out_len == nullptr) { + size_t len; + + if (entry == NULL || dst == NULL || dst_size == 0 || out_len == NULL) { return false; } - size_t len = Min(static_cast(entry->normalized_len), dst_size - 1); + len = Min((size_t)entry->normalized_len, dst_size - 1); memcpy(dst, entry->normalized_query, len); dst[len] = '\0'; - *out_len = static_cast(len); + *out_len = (uint16)len; return true; } -void PschRememberNormalizedQuery(PschNormalizedQueryState* state, const PschStatementKey& key, +void PschRememberNormalizedQuery(PschNormalizedQueryState* state, PschStatementKey key, char* normalized_query, int normalized_len) { - if (state == nullptr || key.source_text == nullptr || normalized_query == nullptr) { + PschNormalizedQueryEntry* entry; + + if (state == NULL || key.source_text == NULL || normalized_query == NULL) { return; } - for (PschNormalizedQueryEntry* entry = state->head; entry != nullptr; entry = entry->next) { + for (entry = state->head; entry != NULL; entry = entry->next) { if (!ExactStatementMatch(entry, key)) { continue; } @@ -100,8 +104,7 @@ void PschRememberNormalizedQuery(PschNormalizedQueryState* state, const PschStat return; } - PschNormalizedQueryEntry* entry = - static_cast(MemoryContextAlloc(TopMemoryContext, sizeof(*entry))); + entry = (PschNormalizedQueryEntry*)MemoryContextAlloc(TopMemoryContext, sizeof(*entry)); entry->key = key; entry->source_text_copy = MemoryContextStrdup(TopMemoryContext, key.source_text); entry->normalized_query = normalized_query; @@ -112,14 +115,15 @@ void PschRememberNormalizedQuery(PschNormalizedQueryState* state, const PschStat bool PschCopyNormalizedQueryForStatement(PschNormalizedQueryState* state, char* dst, size_t dst_size, uint16* out_len, - const PschStatementKey& key, bool consume) { - if (state == nullptr || key.source_text == nullptr) { + PschStatementKey key, bool consume) { + PschNormalizedQueryEntry* prev = NULL; + PschNormalizedQueryEntry* entry; + + if (state == NULL || key.source_text == NULL) { return false; } - PschNormalizedQueryEntry* prev = nullptr; - for (PschNormalizedQueryEntry* entry = state->head; entry != nullptr; - prev = entry, entry = entry->next) { + for (entry = state->head; entry != NULL; prev = entry, entry = entry->next) { if (!ExactStatementMatch(entry, key)) { continue; } @@ -134,15 +138,16 @@ bool PschCopyNormalizedQueryForStatement(PschNormalizedQueryState* state, char* return false; } -void PschForgetNormalizedQueryForStatement(PschNormalizedQueryState* state, - const PschStatementKey& key) { - if (state == nullptr || key.source_text == nullptr) { +void PschForgetNormalizedQueryForStatement(PschNormalizedQueryState* state, PschStatementKey key) { + PschNormalizedQueryEntry* prev = NULL; + PschNormalizedQueryEntry* entry; + + if (state == NULL || key.source_text == NULL) { return; } - PschNormalizedQueryEntry* prev = nullptr; - PschNormalizedQueryEntry* entry = state->head; - while (entry != nullptr) { + entry = state->head; + while (entry != NULL) { PschNormalizedQueryEntry* next = entry->next; if (ExactStatementMatch(entry, key)) { RemoveEntry(state, prev, entry); diff --git a/src/hooks/query_normalize_state.h b/src/hooks/query_normalize_state.h index 77e6234..0c199a6 100644 --- a/src/hooks/query_normalize_state.h +++ b/src/hooks/query_normalize_state.h @@ -28,75 +28,40 @@ #ifndef PG_STAT_CH_SRC_HOOKS_QUERY_NORMALIZE_STATE_H_ #define PG_STAT_CH_SRC_HOOKS_QUERY_NORMALIZE_STATE_H_ -#include - -extern "C" { #include "postgres.h" -} // Exact statement identity for the normalization registry. // // The hash is a fast prefilter for executor/utility lookups. We still fall // back to bytewise source-text comparison after the hash/offset checks, so // collisions cannot attach the wrong normalized query text. -struct PschStatementKey { +typedef struct PschStatementKey { const char* source_text; size_t source_text_len; size_t statement_hash; int stmt_location; int stmt_len; -}; +} PschStatementKey; // Build a statement key from PostgreSQL's source string plus statement slice. PschStatementKey PschMakeStatementKey(const char* source_text, int stmt_location, int stmt_len); -struct PschNormalizedQueryEntry; +typedef struct PschNormalizedQueryEntry PschNormalizedQueryEntry; -struct PschNormalizedQueryState { +typedef struct PschNormalizedQueryState { PschNormalizedQueryEntry* head; -}; +} PschNormalizedQueryState; // Stash normalized SQL text for one parsed statement. -// -// Called from post_parse_analyze_hook immediately after PschNormalizeQuery() -// succeeds. We have to save the normalized text here because JumbleState only -// exists at parse time, but the actual event is built later in ExecutorEnd or -// ProcessUtility. -// -// The key is "statement identity", not just query text: -// - source_text identifies which SQL string we parsed -// - stmt_location / stmt_len identify which statement inside that string -// -// Example: -// source_text = "SELECT 1; SELECT 2" -// stmt_location/stm_len distinguish the first SELECT from the second. -void PschRememberNormalizedQuery(PschNormalizedQueryState* state, const PschStatementKey& key, +void PschRememberNormalizedQuery(PschNormalizedQueryState* state, PschStatementKey key, char* normalized_query, int normalized_len); // Copy normalized text for one exact statement match. -// -// Called from ExecutorEnd and ProcessUtility when building the final event. -// These paths know the original source_text plus stmt_location/stm_len, so they -// can do an exact lookup. -// -// When consume is true, remove the matched entry after copying it. That mode is -// useful for one-shot statements where a later reuse would indicate stale state. -// When consume is false, keep the entry for cached-plan or SPI cases where the -// same statement identity may execute again. -// -// Example: -// A plpgsql function caches "SELECT child(depth - 1) + 42 WHERE 7 = 7". -// The first and third executions should both see the same normalized form. bool PschCopyNormalizedQueryForStatement(PschNormalizedQueryState* state, char* dst, size_t dst_size, uint16* out_len, - const PschStatementKey& key, bool consume); + PschStatementKey key, bool consume); // Forget one pending normalized entry by exact statement identity. -// -// Called on executor / utility early-return paths that skipped normal event -// construction. This is the "we are done with this statement, do not reuse its -// normalized text by accident" cleanup path. -void PschForgetNormalizedQueryForStatement(PschNormalizedQueryState* state, - const PschStatementKey& key); +void PschForgetNormalizedQueryForStatement(PschNormalizedQueryState* state, PschStatementKey key); #endif // PG_STAT_CH_SRC_HOOKS_QUERY_NORMALIZE_STATE_H_ diff --git a/src/pg_stat_ch.cc b/src/pg_stat_ch.c similarity index 97% rename from src/pg_stat_ch.cc rename to src/pg_stat_ch.c index 8afc4ed..9c530a0 100644 --- a/src/pg_stat_ch.cc +++ b/src/pg_stat_ch.c @@ -2,7 +2,6 @@ // // This is the main entry point for the pg_stat_ch extension. -extern "C" { #include "postgres.h" #include "access/htup_details.h" @@ -11,7 +10,6 @@ extern "C" { #include "miscadmin.h" #include "utils/builtins.h" #include "utils/timestamp.h" -} #include "pg_stat_ch/pg_stat_ch.h" @@ -20,8 +18,6 @@ extern "C" { #include "queue/shmem.h" #include "worker/bgworker.h" -extern "C" { - PG_MODULE_MAGIC; // Extension initialization - called when shared library is loaded @@ -74,7 +70,7 @@ Datum pg_stat_ch_stats(PG_FUNCTION_ARGS) { HeapTuple tuple; // Build tuple descriptor - if (get_call_result_type(fcinfo, nullptr, &tupdesc) != TYPEFUNC_COMPOSITE) { + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("function returning record called in context that cannot " "accept type record"))); @@ -158,5 +154,3 @@ Datum pg_stat_ch_flush(PG_FUNCTION_ARGS) { PschSignalFlush(); PG_RETURN_VOID(); } - -} // extern "C" diff --git a/src/queue/event.h b/src/queue/event.h index 7289836..92842e4 100644 --- a/src/queue/event.h +++ b/src/queue/event.h @@ -52,7 +52,7 @@ extern "C" { #define PSCH_MAX_CLIENT_ADDR_LEN (INET6_ADDRSTRLEN - 1) // Command type values (matching PostgreSQL's CmdType enum) -enum PschCmdType { +typedef enum PschCmdType { PSCH_CMD_UNKNOWN = 0, PSCH_CMD_SELECT = 1, PSCH_CMD_UPDATE = 2, @@ -61,9 +61,11 @@ enum PschCmdType { PSCH_CMD_MERGE = 5, PSCH_CMD_UTILITY = 6, PSCH_CMD_NOTHING = 7 -}; +} PschCmdType; // Event structure stored in shared memory queue (~2KB fixed size) +// Forward-declare typedef so C code can use PschEvent without the struct tag +typedef struct PschEvent PschEvent; // // VERSION-SPECIFIC FIELDS: All fields are unconditionally present regardless of // PostgreSQL version. This keeps the struct size fixed for ring buffer simplicity. @@ -75,16 +77,16 @@ struct PschEvent { uint64 duration_us; // Execution duration in microseconds // Identity - Oid dbid; // Database OID - Oid userid; // User OID - char datname[64]; // Database name (NAMEDATALEN=64, resolved at capture) - uint8 datname_len; // Actual length - char username[64]; // User name (NAMEDATALEN=64, resolved at capture) - uint8 username_len; // Actual length - int32 pid; // Backend process ID - uint64 queryid; // Query ID (from pg_stat_statements) - bool top_level; // True if this is a top-level query - PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) + Oid dbid; // Database OID + Oid userid; // User OID + char datname[64]; // Database name (NAMEDATALEN=64, resolved at capture) + uint8 datname_len; // Actual length + char username[64]; // User name (NAMEDATALEN=64, resolved at capture) + uint8 username_len; // Actual length + int32 pid; // Backend process ID + uint64 queryid; // Query ID (from pg_stat_statements) + uint64 parent_query_id; // Query ID of the calling query (0 if top-level) + PschCmdType cmd_type; // Command type (SELECT, UPDATE, etc.) // Results uint64 rows; // Number of rows affected/returned diff --git a/src/queue/local_batch.cc b/src/queue/local_batch.c similarity index 83% rename from src/queue/local_batch.cc rename to src/queue/local_batch.c index b65b4fa..df06edf 100644 --- a/src/queue/local_batch.cc +++ b/src/queue/local_batch.c @@ -9,21 +9,17 @@ // PschLocalBatchShutdown is registered later (on first Add), so it runs FIRST — // flushing buffered events before stats are logged. -extern "C" { #include "postgres.h" #include "access/xact.h" #include "storage/ipc.h" -} - -#include #include "queue/local_batch.h" #include "queue/shmem.h" #define PSCH_LOCAL_BATCH_CAPACITY 8 -static std::array local_batch; +static PschEvent local_batch[PSCH_LOCAL_BATCH_CAPACITY]; static int local_batch_count = 0; static bool local_batch_initialized = false; @@ -33,13 +29,11 @@ static void PschLocalBatchShutdown(int code, Datum arg); // Register callbacks lazily on first use. static void PschLocalBatchInit(void) { - RegisterXactCallback(PschXactCallback, nullptr); + RegisterXactCallback(PschXactCallback, NULL); on_shmem_exit(PschLocalBatchShutdown, 0); local_batch_initialized = true; } -extern "C" { - int PschLocalBatchFlush(void) { if (local_batch_count == 0) { return 0; @@ -47,7 +41,7 @@ int PschLocalBatchFlush(void) { int count = local_batch_count; local_batch_count = 0; - return PschEnqueueBatch(local_batch.data(), count); + return PschEnqueueBatch(local_batch, count); } void PschLocalBatchAdd(const PschEvent* event) { @@ -64,10 +58,9 @@ void PschLocalBatchAdd(const PschEvent* event) { local_batch_count++; } -} // extern "C" - // Flush on transaction end (COMMIT, ABORT, PREPARE) -static void PschXactCallback(XactEvent event, [[maybe_unused]] void* arg) { +static void PschXactCallback(XactEvent event, void* arg) { + (void)arg; if (local_batch_count == 0) { return; } @@ -86,6 +79,8 @@ static void PschXactCallback(XactEvent event, [[maybe_unused]] void* arg) { } // Flush on backend shutdown to avoid losing buffered events -static void PschLocalBatchShutdown([[maybe_unused]] int code, [[maybe_unused]] Datum arg) { +static void PschLocalBatchShutdown(int code, Datum arg) { + (void)code; + (void)arg; PschLocalBatchFlush(); } diff --git a/src/queue/shmem.cc b/src/queue/shmem.c similarity index 92% rename from src/queue/shmem.cc rename to src/queue/shmem.c index 81e5ffd..90b83c2 100644 --- a/src/queue/shmem.cc +++ b/src/queue/shmem.c @@ -29,7 +29,6 @@ // When queue is full, we log a warning once (overflow_logged flag) and drop // events. This prevents log spam while alerting operators to capacity issues. -extern "C" { #include "postgres.h" #include "miscadmin.h" @@ -38,24 +37,22 @@ extern "C" { #include "storage/shmem.h" #include "utils/memutils.h" #include "utils/timestamp.h" -} #include "config/guc.h" #include "hooks/hooks.h" #include "queue/local_batch.h" #include "queue/shmem.h" -PschSharedState* psch_shared_state = nullptr; +PschSharedState* psch_shared_state = NULL; // Previous hook values for chaining -static shmem_startup_hook_type prev_shmem_startup_hook = nullptr; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; #if PG_VERSION_NUM >= 150000 -static shmem_request_hook_type prev_shmem_request_hook = nullptr; +static shmem_request_hook_type prev_shmem_request_hook = NULL; #endif static inline PschEvent* GetRingBuffer(void) { - return reinterpret_cast(reinterpret_cast(psch_shared_state) + - sizeof(PschSharedState)); + return (PschEvent*)((char*)psch_shared_state + sizeof(PschSharedState)); } // Handle queue overflow: increment dropped counter and log warning once @@ -73,7 +70,7 @@ static inline PschEvent* GetRingBuffer(void) { // DETECTING OVERFLOW: Check pg_stat_ch_stats().dropped > 0 or look for the WARNING // in PostgreSQL logs. The overflow_logged flag prevents log spam by warning only once // until stats are reset. -static void HandleOverflow() { +static void HandleOverflow(void) { pg_atomic_fetch_add_u64(&psch_shared_state->dropped, 1); // Log overflow warning once to avoid log spam (pg_stat_monitor pattern) @@ -121,8 +118,7 @@ static bool TryEnqueueLocked(const PschEvent* event, uint32 capacity) { // 3. Copy mid section: application_name, client_addr, query_len const size_t mid_offset = offsetof(PschEvent, application_name); const size_t mid_size = offsetof(PschEvent, query) - mid_offset; - memcpy(reinterpret_cast(slot) + mid_offset, - reinterpret_cast(event) + mid_offset, mid_size); + memcpy((char*)slot + mid_offset, (const char*)event + mid_offset, mid_size); // 4. Copy actual query content (typically 20-200 bytes vs 2KB buffer) if (event->query_len > 0) { @@ -143,8 +139,10 @@ static bool TryEnqueueLocked(const PschEvent* event, uint32 capacity) { return true; } -static void PschShmemShutdown([[maybe_unused]] int code, [[maybe_unused]] Datum arg) { - if (psch_shared_state != nullptr) { +static void PschShmemShutdown(int code, Datum arg) { + (void)code; + (void)arg; + if (psch_shared_state != NULL) { elog(LOG, "pg_stat_ch: shutdown (enqueued=%lu, dropped=%lu, exported=%lu)", pg_atomic_read_u64(&psch_shared_state->enqueued), pg_atomic_read_u64(&psch_shared_state->dropped), @@ -152,8 +150,6 @@ static void PschShmemShutdown([[maybe_unused]] int code, [[maybe_unused]] Datum } } -extern "C" { - Size PschShmemSize(void) { Size size = sizeof(PschSharedState); size = add_size(size, mul_size(psch_queue_capacity, sizeof(PschEvent))); @@ -167,7 +163,7 @@ static void RequestSharedResources(void) { #if PG_VERSION_NUM >= 150000 static void PschShmemRequestHook(void) { - if (prev_shmem_request_hook != nullptr) { + if (prev_shmem_request_hook != NULL) { prev_shmem_request_hook(); } RequestSharedResources(); @@ -201,16 +197,15 @@ static void InitializeSharedState(void) { static void PschShmemStartupHook(void) { bool found; - if (prev_shmem_startup_hook != nullptr) { + if (prev_shmem_startup_hook != NULL) { prev_shmem_startup_hook(); } LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - psch_shared_state = - static_cast(ShmemInitStruct("pg_stat_ch", PschShmemSize(), &found)); + psch_shared_state = (PschSharedState*)ShmemInitStruct("pg_stat_ch", PschShmemSize(), &found); - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { LWLockRelease(AddinShmemInitLock); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("pg_stat_ch: could not create shared memory segment"))); @@ -268,7 +263,7 @@ void PschInstallShmemHooks(void) { // drop until stats are reset. This is better than blocking producers or growing // the queue unboundedly. bool PschEnqueueEvent(const PschEvent* event) { - if (psch_shared_state == nullptr || !psch_enabled) { + if (psch_shared_state == NULL || !psch_enabled) { return false; } @@ -312,7 +307,7 @@ bool PschEnqueueEvent(const PschEvent* event) { // Enqueue a batch of events under a single lock acquisition. // Called by PschLocalBatchFlush to amortize lock overhead across multiple events. int PschEnqueueBatch(const PschEvent* events, int count) { - if (psch_shared_state == nullptr || !psch_enabled || count == 0) + if (psch_shared_state == NULL || !psch_enabled || count == 0) return 0; PschSuppressErrorCapture(true); @@ -361,7 +356,7 @@ int PschEnqueueBatch(const PschEvent* events, int count) { // at 1M events/sec. Even after wraparound, (head - tail) arithmetic works correctly // because both wrap at the same point (2^64). bool PschDequeueEvent(PschEvent* event) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return false; } @@ -403,7 +398,7 @@ bool PschDequeueEvent(PschEvent* event) { void PschGetStats(uint64* enqueued, uint64* dropped, uint64* exported, uint32* queue_size, uint32* queue_capacity, uint64* send_failures, TimestampTz* last_success_ts, char* last_error_buf, size_t last_error_buf_size, TimestampTz* last_error_ts) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { *enqueued = 0; *dropped = 0; *exported = 0; @@ -427,7 +422,7 @@ void PschGetStats(uint64* enqueued, uint64* dropped, uint64* exported, uint32* q uint64 head = pg_atomic_read_u64(&psch_shared_state->head); uint64 tail = pg_atomic_read_u64(&psch_shared_state->tail); - *queue_size = static_cast(head - tail); + *queue_size = (uint32)(head - tail); *queue_capacity = psch_shared_state->capacity; // Copy exporter timestamps and error text under lock to prevent torn reads. @@ -449,7 +444,7 @@ void PschGetStats(uint64* enqueued, uint64* dropped, uint64* exported, uint32* q // OVERFLOW FLAG RESET: Clearing overflow_logged allows the warning to be logged // again on the next overflow, which is useful after capacity has been increased. void PschResetStats(void) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return; } @@ -468,7 +463,7 @@ void PschResetStats(void) { // Record a successful export (updates timestamp) // Lock protects last_success_ts from torn reads by PschGetStats/PschResetStats. void PschRecordExportSuccess(void) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return; } LWLockAcquire(psch_shared_state->lock, LW_EXCLUSIVE); @@ -480,14 +475,14 @@ void PschRecordExportSuccess(void) { // Lock protects last_error_ts and last_error_text from torn reads by // PschGetStats/PschResetStats. send_failures is atomic so it's safe outside the lock. void PschRecordExportFailure(const char* error_msg) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return; } pg_atomic_fetch_add_u64(&psch_shared_state->send_failures, 1); LWLockAcquire(psch_shared_state->lock, LW_EXCLUSIVE); psch_shared_state->last_error_ts = GetCurrentTimestamp(); - if (error_msg != nullptr) { + if (error_msg != NULL) { strlcpy(psch_shared_state->last_error_text, error_msg, sizeof(psch_shared_state->last_error_text)); } @@ -495,17 +490,15 @@ void PschRecordExportFailure(const char* error_msg) { } int PschGetBgworkerPid(void) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return 0; } - return static_cast(pg_atomic_read_u32(&psch_shared_state->bgworker_pid)); + return (int)pg_atomic_read_u32(&psch_shared_state->bgworker_pid); } void PschSetBgworkerPid(int pid) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return; } - pg_atomic_write_u32(&psch_shared_state->bgworker_pid, static_cast(pid)); + pg_atomic_write_u32(&psch_shared_state->bgworker_pid, (uint32)pid); } - -} // extern "C" diff --git a/src/queue/shmem.h b/src/queue/shmem.h index 6cb2973..a26805d 100644 --- a/src/queue/shmem.h +++ b/src/queue/shmem.h @@ -32,6 +32,7 @@ extern "C" { // [ring buffer] // rarely changed producer cache line consumer cache // line +typedef struct PschSharedState PschSharedState; struct PschSharedState { // === Rarely-changed fields (initialization only) === LWLock* lock; // Protects writes to the ring buffer (multi-producer)