Skip to content

Commit b3d63cc

Browse files
authored
Merge branch 'main' into ddelnano/remove-unnecessary-changes
Signed-off-by: Dom Delnano <[email protected]>
2 parents 50f0cdc + 2394108 commit b3d63cc

File tree

12 files changed

+9375
-14
lines changed

12 files changed

+9375
-14
lines changed

k8s/cloud_deps/base/opensearch/operator/opensearch_operator.yaml

Lines changed: 8850 additions & 0 deletions
Large diffs are not rendered by default.

src/carnot/carnot_executable.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ std::shared_ptr<px::table_store::Table> GetTableFromCsv(const std::string& filen
178178

179179
// Construct the table.
180180
px::table_store::schema::Relation rel(types, names);
181-
auto table = px::table_store::Table::Create("csv_table", rel);
181+
auto table = px::table_store::HotColdTable::Create("csv_table", rel);
182182

183183
// Add rowbatches to the table.
184184
row_idx = 0;

src/carnot/plan/operators.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,7 @@ class ClickHouseExportSinkOperator : public Operator {
423423
};
424424

425425
class OTelExportSinkOperator : public Operator {
426+
426427
public:
427428
explicit OTelExportSinkOperator(int64_t id) : Operator(id, planpb::OTEL_EXPORT_SINK_OPERATOR) {}
428429
~OTelExportSinkOperator() override = default;

src/carnot/planner/logical_planner_test.cc

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,6 +1213,132 @@ TEST_F(LogicalPlannerTest, ClickHouseExportWithExplicitEndpoint) {
12131213
EXPECT_TRUE(has_clickhouse_export);
12141214
}
12151215

1216+
constexpr char kClickHouseExportQuery[] = R"pxl(
1217+
import px
1218+
1219+
# Test ClickHouse export using endpoint config
1220+
df = px.DataFrame('http_events', start_time='-10m')
1221+
df = df[['time_', 'req_path', 'resp_status', 'resp_latency_ns']]
1222+
px.export(df, px.otel.ClickHouseRows(table='http_events'))
1223+
)pxl";
1224+
1225+
TEST_F(LogicalPlannerTest, ClickHouseExportWithEndpointConfig) {
1226+
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
1227+
1228+
// Create a planner state with an OTel endpoint config containing ClickHouse DSN
1229+
auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema);
1230+
1231+
// Set up the endpoint config with ClickHouse DSN in the URL field
1232+
auto* endpoint_config = state.mutable_otel_endpoint_config();
1233+
endpoint_config->set_url("clickhouse_user:[email protected]:9000/pixie_db");
1234+
endpoint_config->set_insecure(true);
1235+
endpoint_config->set_timeout(10);
1236+
1237+
auto plan_or_s = planner->Plan(MakeQueryRequest(state, kClickHouseExportQuery));
1238+
EXPECT_OK(plan_or_s);
1239+
auto plan = plan_or_s.ConsumeValueOrDie();
1240+
EXPECT_OK(plan->ToProto());
1241+
1242+
// Verify the plan contains ClickHouse export sink operators with correct config
1243+
auto plan_pb = plan->ToProto().ConsumeValueOrDie();
1244+
bool has_clickhouse_export = false;
1245+
1246+
for (const auto& [address, agent_plan] : plan_pb.qb_address_to_plan()) {
1247+
for (const auto& planFragment : agent_plan.nodes()) {
1248+
for (const auto& planNode : planFragment.nodes()) {
1249+
if (planNode.op().op_type() == planpb::OperatorType::CLICKHOUSE_EXPORT_SINK_OPERATOR) {
1250+
const auto& clickhouse_sink_op = planNode.op().clickhouse_sink_op();
1251+
1252+
// Verify table name
1253+
EXPECT_EQ(clickhouse_sink_op.table_name(), "http_events");
1254+
1255+
// Verify the DSN was parsed correctly into ClickHouseConfig
1256+
const auto& config = clickhouse_sink_op.clickhouse_config();
1257+
EXPECT_EQ(config.username(), "clickhouse_user");
1258+
EXPECT_EQ(config.password(), "clickhouse_pass");
1259+
EXPECT_EQ(config.host(), "clickhouse.example.com");
1260+
EXPECT_EQ(config.port(), 9000);
1261+
EXPECT_EQ(config.database(), "pixie_db");
1262+
1263+
// Verify column mappings were created
1264+
EXPECT_GT(clickhouse_sink_op.column_mappings_size(), 0);
1265+
1266+
has_clickhouse_export = true;
1267+
break;
1268+
}
1269+
}
1270+
if (has_clickhouse_export) break;
1271+
}
1272+
if (has_clickhouse_export) break;
1273+
}
1274+
1275+
EXPECT_TRUE(has_clickhouse_export);
1276+
}
1277+
1278+
constexpr char kClickHouseExportWithExplicitEndpointQuery[] = R"pxl(
1279+
import px
1280+
1281+
# Test ClickHouse export with explicit endpoint config
1282+
df = px.DataFrame('http_events', start_time='-10m')
1283+
df = df[['time_', 'req_path', 'resp_status']]
1284+
1285+
endpoint = px.otel.Endpoint(
1286+
url="explicit_user:explicit_pass@explicit-host:9001/explicit_db",
1287+
insecure=False,
1288+
timeout=20
1289+
)
1290+
1291+
px.export(df, px.otel.ClickHouseRows(table='custom_table', endpoint=endpoint))
1292+
)pxl";
1293+
1294+
TEST_F(LogicalPlannerTest, ClickHouseExportWithExplicitEndpoint) {
1295+
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
1296+
1297+
// Create a planner state with a default endpoint config
1298+
auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema);
1299+
1300+
// Set up a default endpoint config (should be overridden by explicit endpoint)
1301+
auto* endpoint_config = state.mutable_otel_endpoint_config();
1302+
endpoint_config->set_url("default_user:default_pass@default-host:9000/default_db");
1303+
1304+
auto plan_or_s = planner->Plan(MakeQueryRequest(state, kClickHouseExportWithExplicitEndpointQuery));
1305+
EXPECT_OK(plan_or_s);
1306+
auto plan = plan_or_s.ConsumeValueOrDie();
1307+
EXPECT_OK(plan->ToProto());
1308+
1309+
// Verify the plan uses the explicit endpoint config, not the default
1310+
auto plan_pb = plan->ToProto().ConsumeValueOrDie();
1311+
bool has_clickhouse_export = false;
1312+
1313+
for (const auto& [address, agent_plan] : plan_pb.qb_address_to_plan()) {
1314+
for (const auto& planFragment : agent_plan.nodes()) {
1315+
for (const auto& planNode : planFragment.nodes()) {
1316+
if (planNode.op().op_type() == planpb::OperatorType::CLICKHOUSE_EXPORT_SINK_OPERATOR) {
1317+
const auto& clickhouse_sink_op = planNode.op().clickhouse_sink_op();
1318+
1319+
// Verify table name
1320+
EXPECT_EQ(clickhouse_sink_op.table_name(), "custom_table");
1321+
1322+
// Verify the explicit endpoint was used, not the default
1323+
const auto& config = clickhouse_sink_op.clickhouse_config();
1324+
EXPECT_EQ(config.username(), "explicit_user");
1325+
EXPECT_EQ(config.password(), "explicit_pass");
1326+
EXPECT_EQ(config.host(), "explicit-host");
1327+
EXPECT_EQ(config.port(), 9001);
1328+
EXPECT_EQ(config.database(), "explicit_db");
1329+
1330+
has_clickhouse_export = true;
1331+
break;
1332+
}
1333+
}
1334+
if (has_clickhouse_export) break;
1335+
}
1336+
if (has_clickhouse_export) break;
1337+
}
1338+
1339+
EXPECT_TRUE(has_clickhouse_export);
1340+
}
1341+
12161342
} // namespace planner
12171343
} // namespace carnot
12181344
} // namespace px

src/carnot/planner/objects/otel.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,30 @@ class ClickHouseRows : public QLObject {
313313
std::string table_name_;
314314
};
315315

316+
class ClickHouseRows : public QLObject {
317+
public:
318+
static constexpr TypeDescriptor ClickHouseRowsType = {
319+
/* name */ "ClickHouseRows",
320+
/* type */ QLObjectType::kClickHouseRows,
321+
};
322+
323+
static StatusOr<std::shared_ptr<ClickHouseRows>> Create(
324+
ASTVisitor* ast_visitor, const std::string& table_name);
325+
326+
static bool IsClickHouseRows(const QLObjectPtr& obj) {
327+
return obj->type() == ClickHouseRowsType.type();
328+
}
329+
330+
const std::string& table_name() const { return table_name_; }
331+
332+
protected:
333+
ClickHouseRows(ASTVisitor* ast_visitor, std::string table_name)
334+
: QLObject(ClickHouseRowsType, ast_visitor), table_name_(std::move(table_name)) {}
335+
336+
private:
337+
std::string table_name_;
338+
};
339+
316340
} // namespace compiler
317341
} // namespace planner
318342
} // namespace carnot

src/carnot/planpb/plan.pb.go

Lines changed: 102 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/experimental/standalone_pem/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ pl_cc_library(
5050
"//src/vizier/funcs:cc_library",
5151
"//src/vizier/funcs/context:cc_library",
5252
"//src/vizier/services/agent/shared/base:cc_library",
53+
"//src/vizier/services/metadata/local:cc_library",
5354
"@com_github_grpc_grpc//:grpc++",
5455
],
5556
)

src/experimental/standalone_pem/standalone_pem_manager.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "src/shared/schema/utils.h"
2828
#include "src/table_store/table_store.h"
2929
#include "src/vizier/funcs/funcs.h"
30+
#include "src/vizier/services/metadata/local/local_metadata_service.h"
3031

3132
DEFINE_int32(
3233
table_store_data_limit, gflags::Int32FromEnv("PL_TABLE_STORE_DATA_LIMIT_MB", 1024 + 256),

0 commit comments

Comments
 (0)