diff --git a/src/config/guc.cc b/src/config/guc.c similarity index 86% rename from src/config/guc.cc rename to src/config/guc.c index 7016fdf..53a9d8f 100644 --- a/src/config/guc.cc +++ b/src/config/guc.c @@ -1,28 +1,25 @@ // pg_stat_ch GUC (Grand Unified Configuration) implementation -extern "C" { #include "postgres.h" #include "utils/guc.h" -} -#include -#include +#include #include "config/guc.h" // GUC variable storage bool psch_enabled = true; bool psch_use_otel = false; -char* psch_clickhouse_host = nullptr; +char* psch_clickhouse_host = NULL; int psch_clickhouse_port = 9000; -char* psch_clickhouse_user = nullptr; -char* psch_clickhouse_password = nullptr; -char* psch_clickhouse_database = nullptr; +char* psch_clickhouse_user = NULL; +char* psch_clickhouse_password = NULL; +char* psch_clickhouse_database = NULL; bool psch_clickhouse_use_tls = false; bool psch_clickhouse_skip_tls_verify = false; -char* psch_otel_endpoint = nullptr; -char* psch_hostname = nullptr; +char* psch_otel_endpoint = NULL; +char* psch_hostname = NULL; int psch_queue_capacity = 131072; int psch_string_area_size = 64; // MB, for DSA string storage int psch_flush_interval_ms = 500; @@ -39,12 +36,12 @@ int psch_normalize_cache_max = 32768; double psch_sample_rate = 1.0; bool psch_otel_arrow_passthrough = false; int psch_otel_max_block_bytes = 3 * 1024 * 1024; // 3 MiB (max: 16 MiB) -char* psch_extra_attributes = nullptr; -char* psch_debug_arrow_dump_dir = nullptr; +char* psch_extra_attributes = NULL; +char* psch_debug_arrow_dump_dir = NULL; // Log level options (matches PostgreSQL's server_message_level_options pattern) // clang-format off -static const std::array log_elevel_options = {{ +static const struct config_enum_entry log_elevel_options[] = { {"debug5", DEBUG5, false}, {"debug4", DEBUG4, false}, {"debug3", DEBUG3, false}, @@ -57,16 +54,14 @@ static const std::array log_elevel_options = {{ {"error", ERROR, false}, {"fatal", FATAL, false}, {"panic", PANIC, false}, - {nullptr, 0, false}, -}}; + {NULL, 0, false}, +}; // clang-format on -extern "C" { - // Check hook to ensure queue_capacity is a power of 2. // Parameters follow PostgreSQL GUC check hook signature. -static bool check_psch_queue_capacity(int* newval, void** extra [[maybe_unused]], - GucSource source [[maybe_unused]]) { +static bool check_psch_queue_capacity(int* newval, void** extra pg_attribute_unused(), + GucSource source pg_attribute_unused()) { // Check if value is positive and a power of 2 if (*newval <= 0) { GUC_check_errdetail("pg_stat_ch.queue_capacity must be positive."); @@ -90,12 +85,12 @@ void PschInitGuc(void) { DefineCustomBoolVariable( "pg_stat_ch.enabled", // name "Enable or disable pg_stat_ch query telemetry collection.", // short_desc - nullptr, // long_desc + NULL, // long_desc &psch_enabled, // valueAddr true, // bootValue PGC_SIGHUP, // context 0, // flags - nullptr, nullptr, nullptr); // hooks + NULL, NULL, NULL); // hooks DefineCustomBoolVariable( "pg_stat_ch.use_otel", @@ -105,109 +100,109 @@ void PschInitGuc(void) { false, PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.clickhouse_host", "ClickHouse server hostname.", - nullptr, + NULL, &psch_clickhouse_host, "localhost", PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.clickhouse_port", "ClickHouse server native protocol port.", - nullptr, + NULL, &psch_clickhouse_port, 9000, // bootValue 1, 65535, // min, max PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.clickhouse_user", "ClickHouse user name.", - nullptr, + NULL, &psch_clickhouse_user, "default", PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.clickhouse_password", "ClickHouse user password.", - nullptr, + NULL, &psch_clickhouse_password, "", PGC_POSTMASTER, GUC_SUPERUSER_ONLY, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.clickhouse_database", "ClickHouse database name for telemetry storage.", - nullptr, + NULL, &psch_clickhouse_database, "pg_stat_ch", PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomBoolVariable( "pg_stat_ch.clickhouse_use_tls", "Enable TLS for ClickHouse connections.", - nullptr, + NULL, &psch_clickhouse_use_tls, false, PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomBoolVariable( "pg_stat_ch.clickhouse_skip_tls_verify", "Skip TLS certificate verification (insecure, for testing only).", - nullptr, + NULL, &psch_clickhouse_skip_tls_verify, false, PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.otel_endpoint", "OpenTelemetry gRPC endpoint (host:port).", - nullptr, + NULL, &psch_otel_endpoint, "localhost:4317", PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.hostname", "Override the hostname of the current machine.", - nullptr, + NULL, &psch_hostname, "", PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.queue_capacity", "Maximum number of events in the shared memory queue (must be a power of 2).", - nullptr, + NULL, &psch_queue_capacity, 131072, // bootValue 1024, 4194304, // min, max PGC_POSTMASTER, 0, - check_psch_queue_capacity, nullptr, nullptr); + check_psch_queue_capacity, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.string_area_size", @@ -219,29 +214,29 @@ void PschInitGuc(void) { 8, 1024, // min: 8 MB, max: 1024 MB PGC_POSTMASTER, GUC_UNIT_MB, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.flush_interval_ms", "Interval in milliseconds between ClickHouse export batches.", - nullptr, + NULL, &psch_flush_interval_ms, 500, // bootValue 100, 60000, // min, max PGC_SIGHUP, GUC_UNIT_MS, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.batch_max", "Maximum number of events per ClickHouse insert batch.", - nullptr, + NULL, &psch_batch_max, 200000, // bootValue 1, 1000000, // min, max PGC_SIGHUP, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_log_queue_size", @@ -253,7 +248,7 @@ void PschInitGuc(void) { 512, 1048576, // min, max PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_log_batch_size", @@ -265,7 +260,7 @@ void PschInitGuc(void) { 1, 131072, // min, max PGC_POSTMASTER, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_log_max_bytes", @@ -277,7 +272,7 @@ void PschInitGuc(void) { 65536, 64 * 1024 * 1024, // min: 64 KiB, max: 64 MiB PGC_POSTMASTER, GUC_UNIT_BYTE, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_log_delay_ms", @@ -289,7 +284,7 @@ void PschInitGuc(void) { 10, 60000, // min, max PGC_POSTMASTER, GUC_UNIT_MS, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_metric_interval_ms", @@ -301,7 +296,7 @@ void PschInitGuc(void) { 100, 300000, // min, max (100ms to 5min) PGC_POSTMASTER, GUC_UNIT_MS, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomEnumVariable( "pg_stat_ch.log_min_elevel", @@ -310,10 +305,10 @@ void PschInitGuc(void) { "'error' for errors only, or 'debug5' for all messages.", &psch_log_min_elevel, WARNING, - log_elevel_options.data(), + log_elevel_options, PGC_SUSET, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.min_duration_us", "Minimum query duration in microseconds to always capture.", @@ -324,7 +319,7 @@ void PschInitGuc(void) { 0, INT_MAX, // min, max PGC_SUSET, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.normalize_cache_max", @@ -336,7 +331,7 @@ void PschInitGuc(void) { 64, 65536, // min, max PGC_SUSET, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomRealVariable( "pg_stat_ch.sample_rate", @@ -348,7 +343,7 @@ void PschInitGuc(void) { 0.0, 1.0, // min, max PGC_SUSET, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomBoolVariable( "pg_stat_ch.otel_arrow_passthrough", @@ -359,7 +354,7 @@ void PschInitGuc(void) { false, PGC_SIGHUP, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomIntVariable( "pg_stat_ch.otel_max_block_bytes", @@ -372,7 +367,7 @@ void PschInitGuc(void) { 65536, 16 * 1024 * 1024, // min: 64 KiB, max: 16 MiB PGC_SIGHUP, GUC_UNIT_BYTE, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.extra_attributes", @@ -383,7 +378,7 @@ void PschInitGuc(void) { "", PGC_SIGHUP, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomStringVariable( "pg_stat_ch.debug_arrow_dump_dir", @@ -394,7 +389,7 @@ void PschInitGuc(void) { "", PGC_SIGHUP, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); DefineCustomBoolVariable( "pg_stat_ch.debug_force_locked_overflow", @@ -405,10 +400,8 @@ void PschInitGuc(void) { false, PGC_SUSET, 0, - nullptr, nullptr, nullptr); + NULL, NULL, NULL); // clang-format on EmitWarningsOnPlaceholders("pg_stat_ch"); } - -} // extern "C" diff --git a/src/hooks/hooks.cc b/src/hooks/hooks.c similarity index 84% rename from src/hooks/hooks.cc rename to src/hooks/hooks.c index c2e771c..6d99d3a 100644 --- a/src/hooks/hooks.cc +++ b/src/hooks/hooks.c @@ -1,9 +1,7 @@ // pg_stat_ch executor hooks implementation -#include #include -extern "C" { #include "postgres.h" #include "access/parallel.h" @@ -31,11 +29,8 @@ extern "C" { #if PG_VERSION_NUM >= 150000 #include "jit/jit.h" #endif -} -extern "C" { #include "hooks/query_normalize_state.h" -} #include "config/guc.h" #include "hooks/hooks.h" @@ -45,13 +40,13 @@ extern "C" { #include "queue/shmem.h" // Previous hook values for chaining -static post_parse_analyze_hook_type prev_post_parse_analyze = nullptr; -static ExecutorStart_hook_type prev_executor_start = nullptr; -static ExecutorRun_hook_type prev_executor_run = nullptr; -static ExecutorFinish_hook_type prev_executor_finish = nullptr; -static ExecutorEnd_hook_type prev_executor_end = nullptr; -static ProcessUtility_hook_type prev_process_utility = nullptr; -static emit_log_hook_type prev_emit_log_hook = nullptr; +static post_parse_analyze_hook_type prev_post_parse_analyze = NULL; +static ExecutorStart_hook_type prev_executor_start = NULL; +static ExecutorRun_hook_type prev_executor_run = NULL; +static ExecutorFinish_hook_type prev_executor_finish = NULL; +static ExecutorEnd_hook_type prev_executor_end = NULL; +static ProcessUtility_hook_type prev_process_utility = NULL; +static emit_log_hook_type prev_emit_log_hook = NULL; // Track nesting level to identify top-level queries static int nesting_level = 0; @@ -80,7 +75,7 @@ static void ResolveNames(PschEvent* event); // re-resolved when userid changes (handles SET ROLE). This also carries the // session-local registry of parse-time query text looked up later by // ExecutorEnd or ProcessUtility. -struct PschBackendState { +typedef struct PschBackendState { bool initialized; char datname[NAMEDATALEN]; uint8 datname_len; @@ -90,8 +85,8 @@ struct PschBackendState { char client_addr[46]; // INET6_ADDRSTRLEN uint8 client_addr_len; PschNormalizedQueryCache normalize_cache; -}; -static PschBackendState backend_state = {}; +} PschBackendState; +static PschBackendState backend_state = {0}; // Resolve and cache the current username. On initial resolve, falls back to // "" if resolution fails. On SET ROLE re-resolve, keeps the existing @@ -99,7 +94,7 @@ static PschBackendState backend_state = {}; // leaves cached_userid unchanged so future calls can retry resolution. static void CacheUsername(Oid userid, bool fallback_on_null) { const char* username = GetUserNameFromId(userid, true); - if (username != nullptr) { + if (username != NULL) { backend_state.username_len = PschCopyName(backend_state.username, sizeof(backend_state.username), username); backend_state.cached_userid = userid; @@ -125,10 +120,10 @@ static void EnsureBackendCache(void) { // Database name (session-stable) const char* datname = get_database_name(MyDatabaseId); backend_state.datname_len = PschCopyName(backend_state.datname, sizeof(backend_state.datname), - datname != nullptr ? datname : ""); + datname != NULL ? datname : ""); // Client address (session-stable) - backend_state.client_addr_len = static_cast( + backend_state.client_addr_len = (uint8)( GetClientAddress(backend_state.client_addr, sizeof(backend_state.client_addr))); // Username (may change via SET ROLE) @@ -170,8 +165,8 @@ static PschCmdType ConvertCmdType(CmdType cmd) { } static int64 TimeDiffMicrosec(struct timeval end, struct timeval start) { - return (static_cast(end.tv_sec - start.tv_sec) * 1000000LL) + - static_cast(end.tv_usec - start.tv_usec); + return ((int64)(end.tv_sec - start.tv_sec) * 1000000LL) + + (int64)(end.tv_usec - start.tv_usec); } // Unpack SQLSTATE code from PostgreSQL's packed format to string @@ -193,7 +188,7 @@ static PgBackendStatus* GetBackendStatus(void) { int num_backends = pgstat_fetch_stat_numbackends(); for (int i = 1; i <= num_backends; i++) { local_beentry = pgstat_get_local_beentry_by_index(i); - if (local_beentry == nullptr) { + if (local_beentry == NULL) { continue; } PgBackendStatus* beentry = &local_beentry->backendStatus; @@ -201,19 +196,19 @@ static PgBackendStatus* GetBackendStatus(void) { return beentry; } } - return nullptr; + return NULL; #endif } static int GetApplicationName(char* buf, int buf_size) { // Try application_name GUC first (always up-to-date) - if (application_name != nullptr && application_name[0] != '\0') { - return static_cast(PschCopyTrimmed(buf, buf_size, application_name)); + if (application_name != NULL && application_name[0] != '\0') { + return (int)(PschCopyTrimmed(buf, buf_size, application_name)); } PgBackendStatus* beentry = GetBackendStatus(); - if (beentry != nullptr && beentry->st_appname != nullptr) { - return static_cast(PschCopyTrimmed(buf, buf_size, beentry->st_appname)); + if (beentry != NULL && beentry->st_appname != NULL) { + return (int)(PschCopyTrimmed(buf, buf_size, beentry->st_appname)); } buf[0] = '\0'; @@ -224,14 +219,14 @@ static int GetClientAddress(char* buf, int buf_size) { buf[0] = '\0'; PgBackendStatus* beentry = GetBackendStatus(); - if (beentry == nullptr) { + if (beentry == NULL) { return 0; } - std::array remote_host{}; + char remote_host[NI_MAXHOST] = {0}; int ret = pg_getnameinfo_all(&beentry->st_clientaddr.addr, beentry->st_clientaddr.salen, - remote_host.data(), remote_host.size(), nullptr, 0, + remote_host, sizeof(remote_host), NULL, 0, NI_NUMERICHOST | NI_NUMERICSERV); if (ret != 0 || remote_host[0] == '\0') { @@ -239,13 +234,13 @@ static int GetClientAddress(char* buf, int buf_size) { } // Handle local connections - if (strcmp(remote_host.data(), "[local]") == 0) { + if (strcmp(remote_host, "[local]") == 0) { size_t src_len = strlcpy(buf, "127.0.0.1", buf_size); - return static_cast(Min(src_len, static_cast(buf_size - 1))); + return (int)(Min(src_len, (size_t)(buf_size - 1))); } - size_t src_len = strlcpy(buf, remote_host.data(), buf_size); - return static_cast(Min(src_len, static_cast(buf_size - 1))); + size_t src_len = strlcpy(buf, remote_host, buf_size); + return (int)(Min(src_len, (size_t)(buf_size - 1))); } // Check whether an event should be captured based on duration thresholds @@ -255,7 +250,7 @@ static bool ShouldSampleEvent(uint64 duration_us) { if (psch_min_duration_us == 0 && psch_sample_rate >= 1.0) { return true; } - if (duration_us >= static_cast(psch_min_duration_us)) { + if (duration_us >= (uint64)(psch_min_duration_us)) { return true; } if (psch_sample_rate <= 0.0) { @@ -329,7 +324,7 @@ static void InitBaseEvent(PschEvent* event, TimestampTz ts_start, bool top_level } static void CopyClientContext(PschEvent* event) { - event->application_name_len = static_cast( + event->application_name_len = (uint8)( GetApplicationName(event->application_name, sizeof(event->application_name))); EnsureBackendCache(); @@ -338,7 +333,7 @@ static void CopyClientContext(PschEvent* event) { event->client_addr_len = backend_state.client_addr_len; } else { event->client_addr_len = - static_cast(GetClientAddress(event->client_addr, sizeof(event->client_addr))); + (uint8)(GetClientAddress(event->client_addr, sizeof(event->client_addr))); } } @@ -369,50 +364,50 @@ static void ResolveNames(PschEvent* event) { } // Fallback: resolve fresh (cache not yet initialized, e.g. emit_log_hook early) - const char* datname = nullptr; - const char* username = nullptr; + const char* datname = NULL; + const char* username = NULL; if (IsTransactionState()) { datname = get_database_name(event->dbid); username = GetUserNameFromId(event->userid, true); } event->datname_len = PschCopyName(event->datname, sizeof(event->datname), - datname != nullptr ? datname : ""); + datname != NULL ? datname : ""); event->username_len = PschCopyName(event->username, sizeof(event->username), - username != nullptr ? username : ""); + username != NULL ? username : ""); } // Copy JIT instrumentation to event (PG15+) -static void CopyJitInstrumentation([[maybe_unused]] PschEvent* event, - [[maybe_unused]] QueryDesc* query_desc) { +static void CopyJitInstrumentation(PschEvent* event pg_attribute_unused(), + QueryDesc* query_desc pg_attribute_unused()) { #if PG_VERSION_NUM >= 150000 - if (query_desc->estate->es_jit != nullptr) { + if (query_desc->estate->es_jit != NULL) { JitInstrumentation* jit = &query_desc->estate->es_jit->instr; - event->jit_functions = static_cast(jit->created_functions); + event->jit_functions = (int32)(jit->created_functions); event->jit_generation_time_us = - static_cast(INSTR_TIME_GET_MICROSEC(jit->generation_counter)); + (int32)(INSTR_TIME_GET_MICROSEC(jit->generation_counter)); event->jit_inlining_time_us = - static_cast(INSTR_TIME_GET_MICROSEC(jit->inlining_counter)); + (int32)(INSTR_TIME_GET_MICROSEC(jit->inlining_counter)); event->jit_optimization_time_us = - static_cast(INSTR_TIME_GET_MICROSEC(jit->optimization_counter)); + (int32)(INSTR_TIME_GET_MICROSEC(jit->optimization_counter)); event->jit_emission_time_us = - static_cast(INSTR_TIME_GET_MICROSEC(jit->emission_counter)); + (int32)(INSTR_TIME_GET_MICROSEC(jit->emission_counter)); #if PG_VERSION_NUM >= 170000 - event->jit_deform_time_us = static_cast(INSTR_TIME_GET_MICROSEC(jit->deform_counter)); + event->jit_deform_time_us = (int32)(INSTR_TIME_GET_MICROSEC(jit->deform_counter)); #endif } #endif } // Copy parallel worker info to event (PG18+) -static void CopyParallelWorkerInfo([[maybe_unused]] PschEvent* event, - [[maybe_unused]] QueryDesc* query_desc) { +static void CopyParallelWorkerInfo(PschEvent* event pg_attribute_unused(), + QueryDesc* query_desc pg_attribute_unused()) { #if PG_VERSION_NUM >= 180000 - if (query_desc->estate != nullptr) { + if (query_desc->estate != NULL) { event->parallel_workers_planned = - static_cast(query_desc->estate->es_parallel_workers_to_launch); + (int16)(query_desc->estate->es_parallel_workers_to_launch); event->parallel_workers_launched = - static_cast(query_desc->estate->es_parallel_workers_launched); + (int16)(query_desc->estate->es_parallel_workers_launched); } #endif } @@ -427,17 +422,17 @@ static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, int event->cpu_sys_time_us = cpu_sys_us; // Instrumentation data (duration, buffer, WAL) - if (query_desc->totaltime != nullptr) { + if (query_desc->totaltime != NULL) { #if PG_VERSION_NUM >= 190000 - event->duration_us = static_cast(INSTR_TIME_GET_MICROSEC(query_desc->totaltime->total)); + event->duration_us = (uint64)(INSTR_TIME_GET_MICROSEC(query_desc->totaltime->total)); #else - event->duration_us = static_cast(query_desc->totaltime->total * 1000000.0); + event->duration_us = (uint64)(query_desc->totaltime->total * 1000000.0); #endif CopyBufferUsage(event, &query_desc->totaltime->bufusage); CopyIoTiming(event, &query_desc->totaltime->bufusage); CopyWalUsage(event, &query_desc->totaltime->walusage); } else { - event->duration_us = static_cast(GetCurrentTimestamp() - query_start_ts); + event->duration_us = (uint64)(GetCurrentTimestamp() - query_start_ts); } CopyJitInstrumentation(event, query_desc); @@ -446,14 +441,12 @@ static void BuildEventFromQueryDesc(QueryDesc* query_desc, PschEvent* event, int CopyQueryText(event, query_desc->plannedstmt->queryId); } -extern "C" { - // post_parse_analyze_hook — decide query text at parse time. // The JumbleState (with constant locations) is only available here, so we // must generate any normalized form now and stash the final exported text for // ExecutorEnd. static void PschPostParseAnalyze(ParseState* pstate, Query* query, JumbleState* jstate) { - if (prev_post_parse_analyze != nullptr) { + if (prev_post_parse_analyze != NULL) { prev_post_parse_analyze(pstate, query, jstate); } @@ -478,17 +471,17 @@ static void PschPostParseAnalyze(ParseState* pstate, Query* query, JumbleState* // Allocate the normalized/trimmed text in CurrentMemoryContext (typically the // query context). PschRememberNormalizedQuery copies it into the cache's own // long-lived context, so this allocation can be short-lived. - char* exported_query = nullptr; - if (jstate != nullptr && jstate->clocations_count > 0) { + char* exported_query = NULL; + if (jstate != NULL && jstate->clocations_count > 0) { exported_query = PschNormalizeQuery(query_text, query_loc, &query_len, jstate); } else { exported_query = PschCopyTrimmedStatement(query_text, query_len); - if (exported_query != nullptr) { - query_len = static_cast(strlen(exported_query)); + if (exported_query != NULL) { + query_len = (int)(strlen(exported_query)); } } - if (exported_query != nullptr) { + if (exported_query != NULL) { PschRememberNormalizedQuery(&backend_state.normalize_cache, query->queryId, exported_query, query_len); pfree(exported_query); @@ -497,7 +490,7 @@ static void PschPostParseAnalyze(ParseState* pstate, Query* query, JumbleState* static void PschExecutorStart(QueryDesc* query_desc, int eflags) { if (IsParallelWorker()) { - if (prev_executor_start != nullptr) { + if (prev_executor_start != NULL) { prev_executor_start(query_desc, eflags); } else { standard_ExecutorStart(query_desc, eflags); @@ -517,14 +510,14 @@ static void PschExecutorStart(QueryDesc* query_desc, int eflags) { current_query_is_top_level = false; } - if (prev_executor_start != nullptr) { + if (prev_executor_start != NULL) { prev_executor_start(query_desc, eflags); } else { standard_ExecutorStart(query_desc, eflags); } if (psch_enabled && query_desc->plannedstmt->queryId != UINT64CONST(0)) { - if (query_desc->totaltime == nullptr) { + if (query_desc->totaltime == NULL) { MemoryContext oldcxt = MemoryContextSwitchTo(query_desc->estate->es_query_cxt); #if PG_VERSION_NUM < 140000 query_desc->totaltime = InstrAlloc(1, INSTRUMENT_ALL); @@ -544,13 +537,13 @@ static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint #endif if (IsParallelWorker()) { #if PG_VERSION_NUM >= 180000 - if (prev_executor_run != nullptr) { + if (prev_executor_run != NULL) { prev_executor_run(query_desc, direction, count); } else { standard_ExecutorRun(query_desc, direction, count); } #else - if (prev_executor_run != nullptr) { + if (prev_executor_run != NULL) { prev_executor_run(query_desc, direction, count, execute_once); } else { standard_ExecutorRun(query_desc, direction, count, execute_once); @@ -563,13 +556,13 @@ static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint PG_TRY(); { #if PG_VERSION_NUM >= 180000 - if (prev_executor_run != nullptr) { + if (prev_executor_run != NULL) { prev_executor_run(query_desc, direction, count); } else { standard_ExecutorRun(query_desc, direction, count); } #else - if (prev_executor_run != nullptr) { + if (prev_executor_run != NULL) { prev_executor_run(query_desc, direction, count, execute_once); } else { standard_ExecutorRun(query_desc, direction, count, execute_once); @@ -583,7 +576,7 @@ static void PschExecutorRun(QueryDesc* query_desc, ScanDirection direction, uint static void PschExecutorFinish(QueryDesc* query_desc) { if (IsParallelWorker()) { - if (prev_executor_finish != nullptr) { + if (prev_executor_finish != NULL) { prev_executor_finish(query_desc); } else { standard_ExecutorFinish(query_desc); @@ -594,7 +587,7 @@ static void PschExecutorFinish(QueryDesc* query_desc) { nesting_level++; PG_TRY(); { - if (prev_executor_finish != nullptr) { + if (prev_executor_finish != NULL) { prev_executor_finish(query_desc); } else { standard_ExecutorFinish(query_desc); @@ -607,7 +600,7 @@ static void PschExecutorFinish(QueryDesc* query_desc) { static void PschExecutorEnd(QueryDesc* query_desc) { if (!psch_enabled || IsParallelWorker() || query_desc->plannedstmt->queryId == UINT64CONST(0)) { - if (prev_executor_end != nullptr) { + if (prev_executor_end != NULL) { prev_executor_end(query_desc); } else { standard_ExecutorEnd(query_desc); @@ -615,24 +608,24 @@ static void PschExecutorEnd(QueryDesc* query_desc) { return; } - if (query_desc->totaltime != nullptr) { + if (query_desc->totaltime != NULL) { InstrEndLoop(query_desc->totaltime); } // Compute duration early for sampling filter uint64 duration_us; - if (query_desc->totaltime != nullptr) { + if (query_desc->totaltime != NULL) { #if PG_VERSION_NUM >= 190000 - duration_us = static_cast(INSTR_TIME_GET_MICROSEC(query_desc->totaltime->total)); + duration_us = (uint64)(INSTR_TIME_GET_MICROSEC(query_desc->totaltime->total)); #else - duration_us = static_cast(query_desc->totaltime->total * 1000000.0); + duration_us = (uint64)(query_desc->totaltime->total * 1000000.0); #endif } else { - duration_us = static_cast(GetCurrentTimestamp() - query_start_ts); + duration_us = (uint64)(GetCurrentTimestamp() - query_start_ts); } if (!ShouldSampleEvent(duration_us)) { - if (prev_executor_end != nullptr) { + if (prev_executor_end != NULL) { prev_executor_end(query_desc); } else { standard_ExecutorEnd(query_desc); @@ -653,7 +646,7 @@ static void PschExecutorEnd(QueryDesc* query_desc) { BuildEventFromQueryDesc(query_desc, &event, cpu_user_us, cpu_sys_us); PschEnqueueEvent(&event); - if (prev_executor_end != nullptr) { + if (prev_executor_end != NULL) { prev_executor_end(query_desc); } else { standard_ExecutorEnd(query_desc); @@ -721,7 +714,7 @@ static bool ShouldTrackUtility(Node* parsetree) { } static uint64 GetUtilityRowCount(QueryCompletion* qc) { - if (qc == nullptr) { + if (qc == NULL) { return 0; } switch (qc->commandTag) { @@ -821,7 +814,7 @@ static void PschProcessUtility(PlannedStmt* pstmt, const char* queryString, // Returns false during early initialization, in background workers, or when disabled. static bool ShouldCaptureLog(ErrorData* edata) { // Basic preconditions - if (edata == nullptr || !system_init || !psch_enabled || disable_error_capture) { + if (edata == NULL || !system_init || !psch_enabled || disable_error_capture) { return false; } @@ -831,7 +824,7 @@ static bool ShouldCaptureLog(ErrorData* edata) { } // PostgreSQL bootstrapping checks - MyProc indicates PGPROC allocation complete - if (MyProc == nullptr || IsParallelWorker()) { + if (MyProc == NULL || IsParallelWorker()) { return false; } @@ -840,8 +833,8 @@ static bool ShouldCaptureLog(ErrorData* edata) { // - IsUnderPostmaster: not single-user mode or bootstrap // - psch_shared_state: shared memory must be ready // - MyBgworkerEntry: skip background workers (not user queries) - if (MyDatabaseId == InvalidOid || !IsUnderPostmaster || psch_shared_state == nullptr || - MyBgworkerEntry != nullptr) { + if (MyDatabaseId == InvalidOid || !IsUnderPostmaster || psch_shared_state == NULL || + MyBgworkerEntry != NULL) { return false; } @@ -860,10 +853,10 @@ static void CaptureLogEvent(ErrorData* edata) { InitBaseEvent(&event, GetCurrentTimestamp(), (nesting_level == 0), PSCH_CMD_UNKNOWN); UnpackSqlState(edata->sqlerrcode, event.err_sqlstate); - event.err_elevel = static_cast(edata->elevel); + event.err_elevel = (uint8)(edata->elevel); - if (edata->message != nullptr) { - event.err_message_len = static_cast( + if (edata->message != NULL) { + event.err_message_len = (uint16)( PschCopyTrimmed(event.err_message, PSCH_MAX_ERR_MSG_LEN, edata->message)); } @@ -882,7 +875,7 @@ static void CaptureLogEvent(ErrorData* edata) { // This ensures we capture the final, potentially modified log message. static void PschEmitLogHook(ErrorData* edata) { // Chain to previous hook first (allows log transformation by other extensions) - if (prev_emit_log_hook != nullptr) { + if (prev_emit_log_hook != NULL) { prev_emit_log_hook(edata); } @@ -892,7 +885,7 @@ static void PschEmitLogHook(ErrorData* edata) { } // Reset recursion guard on ERROR+ (transaction will abort) - if (edata != nullptr && edata->elevel >= ERROR) { + if (edata != NULL && edata->elevel >= ERROR) { disable_error_capture = false; } } @@ -930,5 +923,3 @@ void PschInstallHooks(void) { // Mark system as initialized - emit_log_hook will now capture messages system_init = true; } - -} // extern "C" diff --git a/src/hooks/query_normalize.cc b/src/hooks/query_normalize.c similarity index 91% rename from src/hooks/query_normalize.cc rename to src/hooks/query_normalize.c index 1c28e31..577cb24 100644 --- a/src/hooks/query_normalize.cc +++ b/src/hooks/query_normalize.c @@ -4,21 +4,19 @@ // are static in contrib/pg_stat_statements/pg_stat_statements.c. We reproduce // them here so pg_stat_ch can normalize independently of pg_stat_statements. -extern "C" { #include "postgres.h" #include "lib/stringinfo.h" #include "nodes/queryjumble.h" #include "parser/scanner.h" #include "utils/memutils.h" -} #include "hooks/query_normalize.h" // Comparator for qsorting LocationLen structs by location. static int CompLocation(const void* a, const void* b) { - int l = (static_cast(a))->location; - int r = (static_cast(b))->location; + int l = ((const LocationLen*)(a))->location; + int r = ((const LocationLen*)(b))->location; if (l < r) { return -1; } @@ -34,11 +32,12 @@ static bool ShouldPreserveExternalParam(const JumbleState* jstate, int index) { return jstate->clocations[index].extern_param && !jstate->has_squashed_lists; } #else -static bool IsSquashedConstant(const LocationLen* /* loc */) { +static bool IsSquashedConstant(const LocationLen* loc pg_attribute_unused()) { return false; } -static bool ShouldPreserveExternalParam(const JumbleState* /* jstate */, int /* index */) { +static bool ShouldPreserveExternalParam(const JumbleState* jstate pg_attribute_unused(), + int index pg_attribute_unused()) { return false; } #endif @@ -96,7 +95,7 @@ static void FillInConstantLengths(JumbleState* jstate, const char* query, int qu break; } } - locs[i].length = static_cast(strlen(yyextra.scanbuf + loc)); + locs[i].length = (int)(strlen(yyextra.scanbuf + loc)); break; } } @@ -110,8 +109,8 @@ static void FillInConstantLengths(JumbleState* jstate, const char* query, int qu } char* PschNormalizeQuery(const char* query, int query_loc, int* query_len_p, JumbleState* jstate) { - if (jstate == nullptr || jstate->clocations_count <= 0) { - return nullptr; + if (jstate == NULL || jstate->clocations_count <= 0) { + return NULL; } int query_len = *query_len_p; diff --git a/src/hooks/string_utils.h b/src/hooks/string_utils.h index b9061a7..05f08bf 100644 --- a/src/hooks/string_utils.h +++ b/src/hooks/string_utils.h @@ -6,23 +6,21 @@ #ifndef PG_STAT_CH_SRC_HOOKS_STRING_UTILS_H_ #define PG_STAT_CH_SRC_HOOKS_STRING_UTILS_H_ -#include -#include +#include +#include -extern "C" { #include "postgres.h" -} // Copy src into a fixed-size name buffer. Returns the number of bytes // written (excluding the NUL), clamped to dst_size - 1. -inline uint8 PschCopyName(char* dst, size_t dst_size, const char* src) { +static inline uint8 PschCopyName(char* dst, size_t dst_size, const char* src) { size_t len = strlcpy(dst, src, dst_size); - return static_cast(Min(len, dst_size - 1)); + return (uint8)(Min(len, dst_size - 1)); } // Trim trailing whitespace in-place. Returns the new length. -inline size_t PschTrimTrailing(char* str, size_t len) { - while (len > 0 && isspace(static_cast(str[len - 1]))) { +static inline size_t PschTrimTrailing(char* str, size_t len) { + while (len > 0 && isspace((unsigned char)(str[len - 1]))) { len--; } str[len] = '\0'; @@ -31,14 +29,14 @@ inline size_t PschTrimTrailing(char* str, size_t len) { // Copy src to dst, skipping leading whitespace and trimming trailing. // Returns the final length of the trimmed string in dst. -inline size_t PschCopyTrimmed(char* dst, size_t dst_size, const char* src) { - if (src == nullptr || dst_size == 0) { +static inline size_t PschCopyTrimmed(char* dst, size_t dst_size, const char* src) { + if (src == NULL || dst_size == 0) { if (dst_size > 0) { dst[0] = '\0'; } return 0; } - while (*src != '\0' && isspace(static_cast(*src))) { + while (*src != '\0' && isspace((unsigned char)(*src))) { src++; } size_t src_len = strlcpy(dst, src, dst_size); @@ -47,22 +45,22 @@ inline size_t PschCopyTrimmed(char* dst, size_t dst_size, const char* src) { } // Allocate a trimmed copy of a statement slice (leading + trailing whitespace -// stripped). Returns a palloc'd string, or nullptr if the input is empty. -inline char* PschCopyTrimmedStatement(const char* query_text, int query_len) { - if (query_text == nullptr || query_len <= 0) { - return nullptr; +// stripped). Returns a palloc'd string, or NULL if the input is empty. +static inline char* PschCopyTrimmedStatement(const char* query_text, int query_len) { + if (query_text == NULL || query_len <= 0) { + return NULL; } - while (query_len > 0 && isspace(static_cast(*query_text))) { + while (query_len > 0 && isspace((unsigned char)(*query_text))) { query_text++; query_len--; } - while (query_len > 0 && isspace(static_cast(query_text[query_len - 1]))) { + while (query_len > 0 && isspace((unsigned char)(query_text[query_len - 1]))) { query_len--; } - char* dst = static_cast(palloc(query_len + 1)); + char* dst = (char*)(palloc(query_len + 1)); if (query_len > 0) { memcpy(dst, query_text, query_len); } diff --git a/src/pg_stat_ch.cc b/src/pg_stat_ch.c similarity index 97% rename from src/pg_stat_ch.cc rename to src/pg_stat_ch.c index 81c04ca..196b9ec 100644 --- a/src/pg_stat_ch.cc +++ b/src/pg_stat_ch.c @@ -2,7 +2,6 @@ // // This is the main entry point for the pg_stat_ch extension. -extern "C" { #include "postgres.h" #include "access/htup_details.h" @@ -11,7 +10,6 @@ extern "C" { #include "miscadmin.h" #include "utils/builtins.h" #include "utils/timestamp.h" -} #include "pg_stat_ch/pg_stat_ch.h" @@ -20,8 +18,6 @@ extern "C" { #include "queue/shmem.h" #include "worker/bgworker.h" -extern "C" { - PG_MODULE_MAGIC; // Extension initialization - called when shared library is loaded @@ -75,7 +71,7 @@ Datum pg_stat_ch_stats(PG_FUNCTION_ARGS) { HeapTuple tuple; // Build tuple descriptor - if (get_call_result_type(fcinfo, nullptr, &tupdesc) != TYPEFUNC_COMPOSITE) { + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("function returning record called in context that cannot " "accept type record"))); @@ -162,5 +158,3 @@ Datum pg_stat_ch_flush(PG_FUNCTION_ARGS) { PschSignalFlush(); PG_RETURN_VOID(); } - -} // extern "C" diff --git a/src/queue/event.h b/src/queue/event.h index 0418c8d..a617915 100644 --- a/src/queue/event.h +++ b/src/queue/event.h @@ -52,7 +52,7 @@ extern "C" { #define PSCH_MAX_CLIENT_ADDR_LEN (INET6_ADDRSTRLEN - 1) // Command type values (matching PostgreSQL's CmdType enum) -enum PschCmdType { +typedef enum PschCmdType { PSCH_CMD_UNKNOWN = 0, PSCH_CMD_SELECT = 1, PSCH_CMD_UPDATE = 2, @@ -61,9 +61,9 @@ enum PschCmdType { PSCH_CMD_MERGE = 5, PSCH_CMD_UTILITY = 6, PSCH_CMD_NOTHING = 7 -}; +} PschCmdType; -static inline const char* PschCmdTypeToString(enum PschCmdType cmd) { +static inline const char* PschCmdTypeToString(PschCmdType cmd) { switch (cmd) { case PSCH_CMD_SELECT: return "SELECT"; @@ -90,7 +90,7 @@ static inline const char* PschCmdTypeToString(enum PschCmdType cmd) { // PostgreSQL version. This keeps the struct size fixed for ring buffer simplicity. // Fields marked with "PGxx+" are zero when running on older versions. The exporter // sends all fields; ClickHouse handles NULL/zero appropriately in aggregations. -struct PschEvent { +typedef struct PschEvent { // Timing information TimestampTz ts_start; // Query start timestamp (microseconds since epoch) uint64 duration_us; // Execution duration in microseconds @@ -168,7 +168,7 @@ struct PschEvent { // block-copied between PschEvent and PschRingEntry with a single memcpy. char err_message[PSCH_MAX_ERR_MSG_LEN]; // Error message text (truncated if necessary) char query[PSCH_MAX_QUERY_LEN]; // Query text (null-terminated) -}; +} PschEvent; // Ensure the struct has expected size characteristics #define PSCH_EVENT_SIZE sizeof(PschEvent) diff --git a/src/queue/local_batch.cc b/src/queue/local_batch.c similarity index 83% rename from src/queue/local_batch.cc rename to src/queue/local_batch.c index b65b4fa..87a549c 100644 --- a/src/queue/local_batch.cc +++ b/src/queue/local_batch.c @@ -9,21 +9,17 @@ // PschLocalBatchShutdown is registered later (on first Add), so it runs FIRST — // flushing buffered events before stats are logged. -extern "C" { #include "postgres.h" #include "access/xact.h" #include "storage/ipc.h" -} - -#include #include "queue/local_batch.h" #include "queue/shmem.h" #define PSCH_LOCAL_BATCH_CAPACITY 8 -static std::array local_batch; +static PschEvent local_batch[PSCH_LOCAL_BATCH_CAPACITY]; static int local_batch_count = 0; static bool local_batch_initialized = false; @@ -33,13 +29,11 @@ static void PschLocalBatchShutdown(int code, Datum arg); // Register callbacks lazily on first use. static void PschLocalBatchInit(void) { - RegisterXactCallback(PschXactCallback, nullptr); + RegisterXactCallback(PschXactCallback, NULL); on_shmem_exit(PschLocalBatchShutdown, 0); local_batch_initialized = true; } -extern "C" { - int PschLocalBatchFlush(void) { if (local_batch_count == 0) { return 0; @@ -47,7 +41,7 @@ int PschLocalBatchFlush(void) { int count = local_batch_count; local_batch_count = 0; - return PschEnqueueBatch(local_batch.data(), count); + return PschEnqueueBatch(local_batch, count); } void PschLocalBatchAdd(const PschEvent* event) { @@ -64,10 +58,8 @@ void PschLocalBatchAdd(const PschEvent* event) { local_batch_count++; } -} // extern "C" - // Flush on transaction end (COMMIT, ABORT, PREPARE) -static void PschXactCallback(XactEvent event, [[maybe_unused]] void* arg) { +static void PschXactCallback(XactEvent event, void* arg pg_attribute_unused()) { if (local_batch_count == 0) { return; } @@ -86,6 +78,7 @@ static void PschXactCallback(XactEvent event, [[maybe_unused]] void* arg) { } // Flush on backend shutdown to avoid losing buffered events -static void PschLocalBatchShutdown([[maybe_unused]] int code, [[maybe_unused]] Datum arg) { +static void PschLocalBatchShutdown(int code pg_attribute_unused(), + Datum arg pg_attribute_unused()) { PschLocalBatchFlush(); } diff --git a/src/queue/psch_dsa.cc b/src/queue/psch_dsa.c similarity index 80% rename from src/queue/psch_dsa.cc rename to src/queue/psch_dsa.c index e7b4f35..dc45031 100644 --- a/src/queue/psch_dsa.cc +++ b/src/queue/psch_dsa.c @@ -2,14 +2,12 @@ // // See psch_dsa.h for the shared memory layout diagram and lifecycle overview. -extern "C" { #include "postgres.h" #include "storage/ipc.h" #include "storage/lwlock.h" #include "utils/dsa.h" #include "utils/memutils.h" -} #include "config/guc.h" #include "queue/psch_dsa.h" @@ -18,28 +16,27 @@ extern "C" { // Process-local DSA handle. Each backend/bgworker gets its own via // dsa_attach_in_place(). This is NOT shared state — it contains // process-local page tables, free-list caches, etc. -static dsa_area* psch_dsa = nullptr; +static dsa_area* psch_dsa = NULL; static bool psch_dsa_exit_hook_registered = false; // dsa_attach_in_place() increments the shared refcount for this in-place area, // but because there is no containing DSM segment, PostgreSQL won't release it // automatically. We must detach the backend-local handle and drop the shared // reference explicitly on backend/bgworker exit (see pgstat_detach_shmem()). -static void PschDsaDetachOnExit([[maybe_unused]] int code, [[maybe_unused]] Datum arg) { - if (psch_dsa != nullptr) { +static void PschDsaDetachOnExit(int code pg_attribute_unused(), + Datum arg pg_attribute_unused()) { + if (psch_dsa != NULL) { dsa_detach(psch_dsa); - psch_dsa = nullptr; + psch_dsa = NULL; } - if (psch_shared_state != nullptr && psch_shared_state->raw_dsa_area != nullptr) { + if (psch_shared_state != NULL && psch_shared_state->raw_dsa_area != NULL) { dsa_release_in_place(psch_shared_state->raw_dsa_area); } psch_dsa_exit_hook_registered = false; } -extern "C" { - Size PschDsaShmemSize(void) { return (Size)psch_string_area_size * 1024 * 1024; } @@ -58,10 +55,10 @@ void PschDsaInit(PschSharedState* state, void* dsa_place) { } void PschDsaAttach(void) { - if (psch_dsa != nullptr) { + if (psch_dsa != NULL) { return; } - if (psch_shared_state == nullptr || psch_shared_state->raw_dsa_area == nullptr) { + if (psch_shared_state == NULL || psch_shared_state->raw_dsa_area == NULL) { return; } // Attach in TopMemoryContext so the dsa_area handle survives transaction @@ -80,7 +77,7 @@ void PschDsaAttach(void) { } dsa_area* PschDsaGetArea(void) { - if (psch_dsa == nullptr) { + if (psch_dsa == NULL) { PschDsaAttach(); } return psch_dsa; @@ -91,13 +88,13 @@ dsa_pointer PschDsaAllocString(const char* src, uint16 len, uint16 max_len) { return InvalidDsaPointer; } dsa_area* dsa = PschDsaGetArea(); - if (dsa == nullptr) { + if (dsa == NULL) { return InvalidDsaPointer; } - uint16 clamped = Min(len, static_cast(max_len - 1)); + uint16 clamped = Min(len, (uint16)(max_len - 1)); dsa_pointer dp = dsa_allocate_extended(dsa, clamped + 1, DSA_ALLOC_NO_OOM); if (DsaPointerIsValid(dp)) { - char* dst = static_cast(dsa_get_address(dsa, dp)); + char* dst = (char*)dsa_get_address(dsa, dp); memcpy(dst, src, clamped); dst[clamped] = '\0'; } else { @@ -110,13 +107,13 @@ void PschDsaResolveString(dsa_pointer dp, uint16 src_len, char* dst_buf, uint16 uint16* out_len) { if (DsaPointerIsValid(dp)) { dsa_area* dsa = PschDsaGetArea(); - if (dsa == nullptr) { + if (dsa == NULL) { dst_buf[0] = '\0'; *out_len = 0; return; } - char* src = static_cast(dsa_get_address(dsa, dp)); - uint16 len = Min(src_len, static_cast(max_len - 1)); + char* src = (char*)dsa_get_address(dsa, dp); + uint16 len = Min(src_len, (uint16)(max_len - 1)); memcpy(dst_buf, src, len); dst_buf[len] = '\0'; *out_len = len; @@ -126,5 +123,3 @@ void PschDsaResolveString(dsa_pointer dp, uint16 src_len, char* dst_buf, uint16 *out_len = 0; } } - -} // extern "C" diff --git a/src/queue/psch_dsa.h b/src/queue/psch_dsa.h index 59f89c4..77db84f 100644 --- a/src/queue/psch_dsa.h +++ b/src/queue/psch_dsa.h @@ -59,7 +59,7 @@ extern "C" { // // Ring buffer slots (PschRingEntry[capacity]) follow immediately after this // struct in the shared memory segment. -struct PschSharedState { +typedef struct PschSharedState { // --- Init-time fields (written once by postmaster) ----------------------- LWLock* lock; void* raw_dsa_area; @@ -89,7 +89,7 @@ struct PschSharedState { TimestampTz last_error_ts; char last_error_text[256]; pg_atomic_uint32 bgworker_pid; -}; +} PschSharedState; // Global pointer to shared state (set in shmem startup) extern PschSharedState* psch_shared_state; @@ -101,7 +101,7 @@ Size PschDsaShmemSize(void); // Called once by the postmaster during InitializeSharedState(). // `dsa_place` must point to a MAXALIGN'd address with at least // PschDsaShmemSize() bytes available. -void PschDsaInit(struct PschSharedState* state, void* dsa_place); +void PschDsaInit(PschSharedState* state, void* dsa_place); // Attach to the DSA area (lazy, idempotent). // Must be called before PschDsaAllocString / PschDsaResolveString. diff --git a/src/queue/ring_entry.h b/src/queue/ring_entry.h index 0dd6656..faf1c4c 100644 --- a/src/queue/ring_entry.h +++ b/src/queue/ring_entry.h @@ -25,7 +25,7 @@ extern "C" { // Compact ring buffer slot. All numeric fields through _padding3 have the // same types and order as PschEvent so the prefix can be block-copied. The // two 2KB char arrays are replaced by 8-byte dsa_pointer values. -struct PschRingEntry { +typedef struct PschRingEntry { // === Timing === TimestampTz ts_start; uint64 duration_us; @@ -102,38 +102,39 @@ struct PschRingEntry { // --- DSA pointers replace the two trailing char arrays in PschEvent --- dsa_pointer err_message_dsa; // was: char err_message[PSCH_MAX_ERR_MSG_LEN] dsa_pointer query_dsa; // was: char query[PSCH_MAX_QUERY_LEN] -}; +} PschRingEntry; -#ifdef __cplusplus -} - -// Verify that the fixed prefix of PschRingEntry has the same binary layout as -// PschEvent up to (and including) query_len — the last field before the two -// variable-length char arrays / dsa_pointers diverge. If a field is added to +// Verify that fixed prefix of PschRingEntry has same binary layout as +// PschEvent up to (and including) query_len, last field before two +// variable-length char arrays / dsa_pointers diverge. If field is added to // PschEvent without updating PschRingEntry, these will fire at compile time. -static_assert(offsetof(PschEvent, ts_start) == offsetof(PschRingEntry, ts_start), - "ts_start offset mismatch"); -static_assert(offsetof(PschEvent, queryid) == offsetof(PschRingEntry, queryid), - "queryid offset mismatch"); -static_assert(offsetof(PschEvent, rows) == offsetof(PschRingEntry, rows), "rows offset mismatch"); -static_assert(offsetof(PschEvent, shared_blks_hit) == offsetof(PschRingEntry, shared_blks_hit), - "shared_blks_hit offset mismatch"); -static_assert(offsetof(PschEvent, wal_records) == offsetof(PschRingEntry, wal_records), - "wal_records offset mismatch"); -static_assert(offsetof(PschEvent, err_sqlstate) == offsetof(PschRingEntry, err_sqlstate), - "err_sqlstate offset mismatch"); -static_assert(offsetof(PschEvent, err_message_len) == offsetof(PschRingEntry, err_message_len), - "err_message_len offset mismatch"); -static_assert(offsetof(PschEvent, application_name) == offsetof(PschRingEntry, application_name), - "application_name offset mismatch"); -static_assert(offsetof(PschEvent, query_len) == offsetof(PschRingEntry, query_len), - "query_len offset mismatch"); -// The fields after query_len diverge: PschEvent has char arrays, PschRingEntry -// has dsa_pointers. This is intentional — the fixed prefix up to query_len is -// block-copied with a single memcpy. -static_assert(offsetof(PschEvent, err_message) == offsetof(PschRingEntry, err_message_dsa), - "err_message/err_message_dsa start offset mismatch"); +// Use StaticAssertDecl so checks fire in both C and C++ translation units. +StaticAssertDecl(offsetof(PschEvent, ts_start) == offsetof(PschRingEntry, ts_start), + "ts_start offset mismatch"); +StaticAssertDecl(offsetof(PschEvent, queryid) == offsetof(PschRingEntry, queryid), + "queryid offset mismatch"); +StaticAssertDecl(offsetof(PschEvent, rows) == offsetof(PschRingEntry, rows), + "rows offset mismatch"); +StaticAssertDecl(offsetof(PschEvent, shared_blks_hit) == offsetof(PschRingEntry, shared_blks_hit), + "shared_blks_hit offset mismatch"); +StaticAssertDecl(offsetof(PschEvent, wal_records) == offsetof(PschRingEntry, wal_records), + "wal_records offset mismatch"); +StaticAssertDecl(offsetof(PschEvent, err_sqlstate) == offsetof(PschRingEntry, err_sqlstate), + "err_sqlstate offset mismatch"); +StaticAssertDecl(offsetof(PschEvent, err_message_len) == offsetof(PschRingEntry, err_message_len), + "err_message_len offset mismatch"); +StaticAssertDecl(offsetof(PschEvent, application_name) == offsetof(PschRingEntry, application_name), + "application_name offset mismatch"); +StaticAssertDecl(offsetof(PschEvent, query_len) == offsetof(PschRingEntry, query_len), + "query_len offset mismatch"); +// Fields after query_len diverge: PschEvent has char arrays, PschRingEntry +// has dsa_pointers. Intentional, fixed prefix up to query_len is +// block-copied with single memcpy. +StaticAssertDecl(offsetof(PschEvent, err_message) == offsetof(PschRingEntry, err_message_dsa), + "err_message/err_message_dsa start offset mismatch"); +#ifdef __cplusplus +} #endif #endif // PG_STAT_CH_SRC_QUEUE_RING_ENTRY_H_ diff --git a/src/queue/shmem.cc b/src/queue/shmem.c similarity index 92% rename from src/queue/shmem.cc rename to src/queue/shmem.c index 2b08c74..0c650a8 100644 --- a/src/queue/shmem.cc +++ b/src/queue/shmem.c @@ -29,7 +29,6 @@ // When queue is full, we log a warning once (overflow_logged flag) and drop // events. This prevents log spam while alerting operators to capacity issues. -extern "C" { #include "postgres.h" #include "miscadmin.h" @@ -38,7 +37,6 @@ extern "C" { #include "storage/shmem.h" #include "utils/memutils.h" #include "utils/timestamp.h" -} #include "config/guc.h" #include "hooks/hooks.h" @@ -47,17 +45,16 @@ extern "C" { #include "queue/ring_entry.h" #include "queue/shmem.h" -PschSharedState* psch_shared_state = nullptr; +PschSharedState* psch_shared_state = NULL; // Previous hook values for chaining -static shmem_startup_hook_type prev_shmem_startup_hook = nullptr; +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; #if PG_VERSION_NUM >= 150000 -static shmem_request_hook_type prev_shmem_request_hook = nullptr; +static shmem_request_hook_type prev_shmem_request_hook = NULL; #endif static inline PschRingEntry* GetRingBuffer(void) { - return reinterpret_cast(reinterpret_cast(psch_shared_state) + - sizeof(PschSharedState)); + return (PschRingEntry*)((char*)psch_shared_state + sizeof(PschSharedState)); } // Handle queue overflow: increment dropped counter and log warning once @@ -75,7 +72,7 @@ static inline PschRingEntry* GetRingBuffer(void) { // DETECTING OVERFLOW: Check pg_stat_ch_stats().dropped > 0 or look for the WARNING // in PostgreSQL logs. The overflow_logged flag prevents log spam by warning only once // until stats are reset. -static void HandleOverflow() { +static void HandleOverflow(void) { pg_atomic_fetch_add_u64(&psch_shared_state->dropped, 1); // Log overflow warning once to avoid log spam (pg_stat_monitor pattern) @@ -89,7 +86,7 @@ static void HandleOverflow() { // Size of the fixed-field prefix shared between PschEvent and PschRingEntry. // Both structs have identical layout from offset 0 through query_len — the // last field before the variable-length data diverges (char arrays vs -// dsa_pointers). Verified by static_assert in ring_entry.h. +// dsa_pointers). Verified by static assert in ring_entry.h. static const size_t kFixedPrefixSize = offsetof(PschRingEntry, err_message_dsa); // Check queue fullness and enqueue event if space available. @@ -138,17 +135,18 @@ static bool TryEnqueueLocked(const PschEvent* event, uint32 capacity) { return true; } -static void PschShmemShutdown([[maybe_unused]] int code, [[maybe_unused]] Datum arg) { - if (psch_shared_state != nullptr) { - elog(LOG, "pg_stat_ch: shutdown (enqueued=%lu, dropped=%lu, exported=%lu)", +static void PschShmemShutdown(int code pg_attribute_unused(), + Datum arg pg_attribute_unused()) { + if (psch_shared_state != NULL) { + elog(LOG, + "pg_stat_ch: shutdown (enqueued=" UINT64_FORMAT ", dropped=" UINT64_FORMAT + ", exported=" UINT64_FORMAT ")", pg_atomic_read_u64(&psch_shared_state->enqueued), pg_atomic_read_u64(&psch_shared_state->dropped), pg_atomic_read_u64(&psch_shared_state->exported)); } } -extern "C" { - Size PschShmemSize(void) { // Layout: [PschSharedState] [PschRingEntry × capacity] [DSA area] // See psch_dsa.h for diagram. DSA start is MAXALIGN'd for internal alignment. @@ -166,7 +164,7 @@ static void RequestSharedResources(void) { #if PG_VERSION_NUM >= 150000 static void PschShmemRequestHook(void) { - if (prev_shmem_request_hook != nullptr) { + if (prev_shmem_request_hook != NULL) { prev_shmem_request_hook(); } RequestSharedResources(); @@ -197,7 +195,7 @@ static void InitializeSharedState(void) { // Create DSA area for variable-length string storage. // See psch_dsa.h for the shared memory layout diagram. - char* dsa_place = reinterpret_cast(psch_shared_state) + + char* dsa_place = (char*)psch_shared_state + MAXALIGN(sizeof(PschSharedState) + psch_queue_capacity * sizeof(PschRingEntry)); PschDsaInit(psch_shared_state, dsa_place); @@ -209,16 +207,15 @@ static void InitializeSharedState(void) { static void PschShmemStartupHook(void) { bool found; - if (prev_shmem_startup_hook != nullptr) { + if (prev_shmem_startup_hook != NULL) { prev_shmem_startup_hook(); } LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - psch_shared_state = - static_cast(ShmemInitStruct("pg_stat_ch", PschShmemSize(), &found)); + psch_shared_state = (PschSharedState*)ShmemInitStruct("pg_stat_ch", PschShmemSize(), &found); - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { LWLockRelease(AddinShmemInitLock); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("pg_stat_ch: could not create shared memory segment"))); @@ -276,7 +273,7 @@ void PschInstallShmemHooks(void) { // drop until stats are reset. This is better than blocking producers or growing // the queue unboundedly. bool PschEnqueueEvent(const PschEvent* event) { - if (psch_shared_state == nullptr || !psch_enabled) { + if (psch_shared_state == NULL || !psch_enabled) { return false; } @@ -320,7 +317,7 @@ bool PschEnqueueEvent(const PschEvent* event) { // Enqueue a batch of events under a single lock acquisition. // Called by PschLocalBatchFlush to amortize lock overhead across multiple events. int PschEnqueueBatch(const PschEvent* events, int count) { - if (psch_shared_state == nullptr || !psch_enabled || count == 0) + if (psch_shared_state == NULL || !psch_enabled || count == 0) return 0; PschSuppressErrorCapture(true); @@ -374,7 +371,7 @@ int PschEnqueueBatch(const PschEvent* events, int count) { // at 1M events/sec. Even after wraparound, (head - tail) arithmetic works correctly // because both wrap at the same point (2^64). bool PschDequeueEvent(PschEvent* event) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return false; } @@ -425,7 +422,7 @@ void PschGetStats(uint64* enqueued, uint64* dropped, uint64* exported, uint32* q uint32* queue_capacity, uint64* send_failures, TimestampTz* last_success_ts, char* last_error_buf, size_t last_error_buf_size, TimestampTz* last_error_ts, uint64* dsa_oom_count) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { *enqueued = 0; *dropped = 0; *exported = 0; @@ -451,7 +448,7 @@ void PschGetStats(uint64* enqueued, uint64* dropped, uint64* exported, uint32* q uint64 head = pg_atomic_read_u64(&psch_shared_state->head); uint64 tail = pg_atomic_read_u64(&psch_shared_state->tail); - *queue_size = static_cast(head - tail); + *queue_size = (uint32)(head - tail); *queue_capacity = psch_shared_state->capacity; // Copy exporter timestamps and error text under lock to prevent torn reads. @@ -473,7 +470,7 @@ void PschGetStats(uint64* enqueued, uint64* dropped, uint64* exported, uint32* q // OVERFLOW FLAG RESET: Clearing overflow_logged allows the warning to be logged // again on the next overflow, which is useful after capacity has been increased. void PschResetStats(void) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return; } @@ -493,7 +490,7 @@ void PschResetStats(void) { // Record a successful export (updates timestamp) // Lock protects last_success_ts from torn reads by PschGetStats/PschResetStats. void PschRecordExportSuccess(void) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return; } LWLockAcquire(psch_shared_state->lock, LW_EXCLUSIVE); @@ -505,14 +502,14 @@ void PschRecordExportSuccess(void) { // Lock protects last_error_ts and last_error_text from torn reads by // PschGetStats/PschResetStats. send_failures is atomic so it's safe outside the lock. void PschRecordExportFailure(const char* error_msg) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return; } pg_atomic_fetch_add_u64(&psch_shared_state->send_failures, 1); LWLockAcquire(psch_shared_state->lock, LW_EXCLUSIVE); psch_shared_state->last_error_ts = GetCurrentTimestamp(); - if (error_msg != nullptr) { + if (error_msg != NULL) { strlcpy(psch_shared_state->last_error_text, error_msg, sizeof(psch_shared_state->last_error_text)); } @@ -520,17 +517,15 @@ void PschRecordExportFailure(const char* error_msg) { } int PschGetBgworkerPid(void) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return 0; } - return static_cast(pg_atomic_read_u32(&psch_shared_state->bgworker_pid)); + return (int)pg_atomic_read_u32(&psch_shared_state->bgworker_pid); } void PschSetBgworkerPid(int pid) { - if (psch_shared_state == nullptr) { + if (psch_shared_state == NULL) { return; } - pg_atomic_write_u32(&psch_shared_state->bgworker_pid, static_cast(pid)); + pg_atomic_write_u32(&psch_shared_state->bgworker_pid, (uint32)pid); } - -} // extern "C" diff --git a/src/worker/bgworker.cc b/src/worker/bgworker.c similarity index 92% rename from src/worker/bgworker.cc rename to src/worker/bgworker.c index 85a1358..68bfa73 100644 --- a/src/worker/bgworker.cc +++ b/src/worker/bgworker.c @@ -21,7 +21,6 @@ // SIGUSR2 -> HandleFlushSignal (extension-specific - immediate flush) // SIGPIPE -> SIG_IGN (ignore broken pipe from network) -extern "C" { #include "postgres.h" #include "miscadmin.h" @@ -34,12 +33,11 @@ extern "C" { #include "tcop/tcopprot.h" #include "utils/guc.h" #include "utils/wait_event.h" -} #include "queue/psch_dsa.h" #include "queue/shmem.h" -#include +#include #include "config/guc.h" #include "export/stats_exporter.h" @@ -69,7 +67,7 @@ static void HandleFlushSignal(SIGNAL_ARGS) { // // If we don't use this handler, operations that require barrier acknowledgment // (like DROP DATABASE) will hang indefinitely waiting for this worker. -static void SetupSignalHandlers() { +static void SetupSignalHandlers(void) { pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGTERM, die); pqsignal(SIGUSR1, procsignal_sigusr1_handler); // REQUIRED for barriers @@ -78,7 +76,7 @@ static void SetupSignalHandlers() { } // Handle SIGHUP config reload -static void HandleConfigReload() { +static void HandleConfigReload(void) { if (ConfigReloadPending != 0) { ConfigReloadPending = 0; ProcessConfigFile(PGC_SIGHUP); @@ -87,13 +85,14 @@ static void HandleConfigReload() { } // Callback for bgworker process exit (registered via on_proc_exit) -static void PschBgworkerShutdown([[maybe_unused]] int code, [[maybe_unused]] Datum arg) { +static void PschBgworkerShutdown(int code pg_attribute_unused(), + Datum arg pg_attribute_unused()) { PschExporterShutdown(); } // Process pending signals: barriers, interrupts (SIGTERM/SIGINT), config reload. // Called after WaitLatch wakes and between batches in the drain loop. -static void ProcessPendingSignals() { +static void ProcessPendingSignals(void) { // Barriers first: ProcSignalBarrierPending is set when operations like // DROP DATABASE need all backends to acknowledge. Failing to process // causes those operations to hang. @@ -108,7 +107,7 @@ static void ProcessPendingSignals() { // indicates the queue is nearly empty. Each batch gets its own PG_TRY/PG_CATCH // so an error on batch N+1 doesn't lose batches 1..N. Signals are processed // between batches to stay responsive to SIGTERM, barriers, and config reload. -static void ExportBatchWithRecovery() { +static void ExportBatchWithRecovery(void) { pgstat_report_activity(STATE_RUNNING, "exporting to ClickHouse"); for (;;) { @@ -137,11 +136,11 @@ static void ExportBatchWithRecovery() { } } - pgstat_report_activity(STATE_IDLE, nullptr); + pgstat_report_activity(STATE_IDLE, NULL); } // Initialize wait event for pg_stat_activity visibility -static uint32 InitializeWaitEvent() { +static uint32 InitializeWaitEvent(void) { #if PG_VERSION_NUM >= 170000 return WaitEventExtensionNew("PgStatChExporter"); #else @@ -150,7 +149,7 @@ static uint32 InitializeWaitEvent() { } // Calculate sleep time with exponential backoff on failures -static int CalculateSleepMs() { +static int CalculateSleepMs(void) { int sleep_ms = psch_flush_interval_ms; int failures = PschGetConsecutiveFailures(); if (failures > 0) { @@ -178,12 +177,10 @@ static void RunExportCycle(uint32 wait_event) { } } -extern "C" { - -void PschBgworkerMain([[maybe_unused]] Datum main_arg) { +void PschBgworkerMain(Datum main_arg pg_attribute_unused()) { SetupSignalHandlers(); BackgroundWorkerUnblockSignals(); - BackgroundWorkerInitializeConnection("postgres", nullptr, 0); + BackgroundWorkerInitializeConnection("postgres", NULL, 0); // Register cleanup callback for graceful shutdown on_proc_exit(PschBgworkerShutdown, 0); @@ -209,7 +206,7 @@ void PschBgworkerMain([[maybe_unused]] Datum main_arg) { elog(WARNING, "pg_stat_ch: failed to connect to ClickHouse on startup, will retry on first export"); } - pgstat_report_activity(STATE_IDLE, nullptr); + pgstat_report_activity(STATE_IDLE, NULL); // Main loop (pattern from worker_spi.c:206-290) for (;;) { @@ -243,10 +240,8 @@ void PschRegisterBgworker(void) { worker.bgw_restart_time = 10; // Restart after 10 seconds on crash strlcpy(worker.bgw_library_name, "pg_stat_ch", BGW_MAXLEN); strlcpy(worker.bgw_function_name, "PschBgworkerMain", BGW_MAXLEN); - worker.bgw_main_arg = static_cast(0); + worker.bgw_main_arg = (Datum)0; worker.bgw_notify_pid = 0; RegisterBackgroundWorker(&worker); } - -} // extern "C"