Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions connectors/sink.d/30-kafka.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ topic = "wparse_output"
num_partitions = 1
replication = 1
#config = ["compression.type=snappy", "acks=all"]
[connectors.params.config]
16 changes: 7 additions & 9 deletions connectors/sink.d/60-doris.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ allow_override = [
]

[connectors.params]

# Stream Load API 配置(新版)
endpoint = "http://localhost:8040" # 使用 BE 的 HTTP 端口(推荐)
user = "root"
password = ""
database = "test_db"
table = "events_parsed"
timeout_secs = 30
max_retries = 3
batch_size = 100_0000
user = "root" # 用户名
password = "" # 密码
database = "" # 数据库
table = "" # 表名
timeout_secs = 30 # 请求超时时间
max_retries = 3 # 请求重试次数
batch_size = 1_0000 # 请求批量大小

# 可选:自定义 Stream Load 参数
[connectors.params.headers]
Expand Down
9 changes: 9 additions & 0 deletions connectors/source.d/14-http.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[[connectors]]
id = "http_src"
type = "http"
# 允许覆写的键,兼容 syslog 的常见命名
allow_override = ["port","path"]

[connectors.params]
port = 8000
path = "/" # auto|line|len
24 changes: 18 additions & 6 deletions extensions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ This directory contains extension connector examples demonstrating WarpParse int

| Case | Purpose | Validated Features |
|------|---------|-------------------|
| **doris** | File Source → Doris Stream Load pipeline | Doris sink, Stream Load, batch processing |
| **clickhouse** | File Source -> ClickHouse pipeline | ClickHouse sink, batch processing, credential injection |
| **doris** | File Source -> Doris Stream Load pipeline | Doris sink, Stream Load, batch processing |
| **elasticsearch** | File Source -> Elasticsearch pipeline | Elasticsearch sink, index write, credential injection |
| **http** | File Source -> HTTP endpoint pipeline | HTTP sink, auth request, format output |
| **kafka** | Kafka Source/Sink integration | Kafka consumer/producer, topic routing |
| **pg_knowledge** | File Source → PostgreSQL knowledge enrichment | PostgreSQL knowdb provider, OML SQL enrichment, batch validation |
| **practice** | Real-world multi-source monitoring scenario | Multi-source collection, Fluent-bit, Kafka, VictoriaLogs, Grafana |
| **tcp_mysql** | TCP Source → MySQL Sink pipeline | TCP source, MySQL sink, data persistence |
| **tcp_victorialogs** | TCP Source → VictoriaLogs Sink pipeline | TCP source, VictoriaLogs sink, log storage |
| **syslog_postgres** | Syslog/TCP log -> Postgres pipeline | Postgres sink, structured storage, credential injection |
| **tcp_count** | TCP input counting and test helper | TCP source, counting, statistics |
| **tcp_mysql** | TCP Source -> MySQL Sink pipeline | TCP source, MySQL sink, data persistence |
| **tcp_postgres** | TCP Source -> Postgres Sink pipeline | TCP source, Postgres sink, data persistence |
| **tcp_victorialogs** | TCP Source -> VictoriaLogs Sink pipeline | TCP source, VictoriaLogs sink, log storage |
| **victoriametrics** | Internal metrics push to VictoriaMetrics | VictoriaMetrics sink, metrics export, monitoring |

## Common Structure
Expand Down Expand Up @@ -55,12 +61,18 @@ docker compose up -d

| 用例 | 目的 | 验证特性 |
|------|------|----------|
| **doris** | 文件源 → Doris Stream Load 管道 | Doris sink、Stream Load、批处理 |
| **clickhouse** | 文件源 -> ClickHouse 管道 | ClickHouse sink、批处理、安全变量注入 |
| **doris** | 文件源 -> Doris Stream Load 管道 | Doris sink、Stream Load、批处理 |
| **elasticsearch** | 文件源 -> Elasticsearch 管道 | Elasticsearch sink、索引写入、安全变量注入 |
| **http** | 文件源 -> HTTP 端点管道 | HTTP sink、认证请求、格式输出 |
| **kafka** | Kafka 源/汇集成 | Kafka 消费者/生产者、topic 路由 |
| **pg_knowledge** | 文件源 → PostgreSQL 知识库富化 | PostgreSQL knowdb provider、OML SQL 富化、批处理校验 |
| **practice** | 实战多源监控场景 | 多源采集、Fluent-bit、Kafka、VictoriaLogs、Grafana |
| **tcp_mysql** | TCP 源 → MySQL Sink 管道 | TCP 源、MySQL sink、数据持久化 |
| **tcp_victorialogs** | TCP 源 → VictoriaLogs Sink 管道 | TCP 源、VictoriaLogs sink、日志存储 |
| **syslog_postgres** | Syslog/TCP 日志 -> Postgres 管道 | Postgres sink、结构化入库、安全变量注入 |
| **tcp_count** | TCP 输入计数与测试辅助 | TCP source、计数、统计 |
| **tcp_mysql** | TCP 源 -> MySQL Sink 管道 | TCP 源、MySQL sink、数据持久化 |
| **tcp_postgres** | TCP 源 -> Postgres Sink 管道 | TCP 源、Postgres sink、数据持久化 |
| **tcp_victorialogs** | TCP 源 -> VictoriaLogs Sink 管道 | TCP 源、VictoriaLogs sink、日志存储 |
| **victoriametrics** | 内部指标推送到 VictoriaMetrics | VictoriaMetrics sink、指标导出、监控 |

## 通用结构
Expand Down
37 changes: 20 additions & 17 deletions extensions/clickhouse/test.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
drop table default.wp_nginx;
CREATE TABLE wp_nginx
(
wp_event_id Int64 COMMENT '事件唯一ID',
wp_src_key LowCardinality(String) COMMENT '数据来源表示',
sip IPv4 COMMENT '客户端IP',
`timestamp` String COMMENT '原始时间(毫秒精度)',
`http/request` String COMMENT 'HTTP请求行',
status UInt16 COMMENT 'HTTP状态码',
size UInt32 COMMENT '响应大小(byte)',
referer String COMMENT '来源页面',
`http/agent` String COMMENT 'User-Agent'
)
ENGINE = MergeTree
ORDER BY (wp_src_key)
SETTINGS index_granularity = 8192;
CREATE DATABASE IF NOT EXISTS test_db;

select count(*) from wp_nginx;
DROP TABLE IF EXISTS test_db.wp_nginx;
CREATE TABLE test_db.wp_nginx (
wp_event_id BIGINT COMMENT '事件唯一ID',
wp_src_key STRING COMMENT '数据来源表示',
sip STRING COMMENT '客户端IP',
`timestamp` STRING COMMENT '原始时间字符串',
`http/request` STRING COMMENT 'HTTP请求行',
status SMALLINT COMMENT 'HTTP状态码',
size INT COMMENT '响应大小(byte)',
referer STRING COMMENT '来源页面',
`http/agent` STRING COMMENT 'User-Agent'
)
ENGINE=OLAP
DUPLICATE KEY(wp_event_id)
DISTRIBUTED BY HASH(wp_event_id) BUCKETS 8
PROPERTIES (
"replication_num" = "1"
);
select count(*) from test_db.wp_nginx
4 changes: 2 additions & 2 deletions extensions/clickhouse/topology/sinks/business.d/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ name = "ck_stream_load"
connect = "clickhouse_sink"
[sink_group.sinks.params]
endpoint= "http://localhost:8123"
username = "default"
password = "default"
username = "${SEC_SINK_USERNAME}"
password = "${SEC_SINK_PASSWORD}"
database = "default"
table = "wp_nginx"
timeout_secs = 30
Expand Down
1 change: 0 additions & 1 deletion extensions/doris/test.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
CREATE DATABASE IF NOT EXISTS test_db;


DROP TABLE IF EXISTS test_db.wp_nginx;
CREATE TABLE test_db.wp_nginx (
wp_event_id BIGINT COMMENT '事件唯一ID',
Expand Down
5 changes: 2 additions & 3 deletions extensions/doris/topology/sinks/business.d/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ parallel = 4
name = "doris_stream_load"
connect = "doris_sink"
[sink_group.sinks.params]
# 新版 Doris Sink 使用 HTTP Stream Load API
endpoint = "http://localhost:8040" # BE 的 HTTP 端口(推荐使用 8040 而非 FE 的 8030)
database = "test_db"
table = "wp_nginx"
user = "root"
password = ""
user = "${SEC_SINK_USERNAME}"
password = "${SEC_SINK_PASSWORD}"
timeout_secs = 30
max_retries = 1
batch_size = 10_0000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ connect = "elasticsearch_sink"
protocol = "http"
host = "localhost"
port = "9200"
username = "elastic"
password = "zgVClXP2"
username = "${SEC_SINK_USERNAME}"
password = "${SEC_SINK_PASSWORD}"
index = "wp_nginx"
timeout_secs = 30
max_retries = 3
Expand Down
4 changes: 2 additions & 2 deletions extensions/http/topology/sinks/business.d/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ name = "http_sink_demo"
connect = "http_sink"
[sink_group.sinks.params]
endpoint= "http://localhost:8080/auth/ingest/ndjson"
username = "root"
password = "root"
username = "${SEC_SINK_USERNAME}"
password = "${SEC_SINK_PASSWORD}"
fmt = "ndjson" # ndjson 格式,每行一个 JSON 对象
compression = "none"
timeout_secs = 30
Expand Down
14 changes: 14 additions & 0 deletions extensions/http_source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
### 支持的参数
1. port:监听的端口。
2. path:监听的路径。
> 不同的source之间可以共用一个端口,只要保证path不同即可。

### 支持的功能
#### 格式选择
支持json格式和ndjson格式,通过请求参数 `fmt` 或者请求头`Content-Type`指定输入格式,`fmt` 参数优先级高于 Content-Type,且两者都不指定时默认使用 `json`。
Content-Type 映射规则:
- `application/json` => `json`
- `application/x-ndjson` => `ndjson`
- `application/ndjson` => `ndjson`
#### 压缩选择
支持请求以gzip格式压缩或none(不压缩),通过请求参数 `compression` 或者请求头`Content-Encoding`指定压缩方式。
40 changes: 40 additions & 0 deletions extensions/http_source/conf/wparse.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
version = "1.0"

[models]
wpl = "./models/wpl"
oml = "./models/oml"

[topology]
sources = "./topology/sources"
sinks = "./topology/sinks"

[performance]
parse_workers = 8
rate_limit_rps = 0

[rescue]
path = "./data/rescue"


[log_conf]
level = "warn,ctrl=warn,metrics=info,data=info"
output = "File"
[log_conf.file]
path = "./data/logs/"

[stat]
# window_sec = 60

[[stat.pick]]
key = "pick_stat"
target = "*"

[[stat.parse]]
key = "parse_stat"
target = "*"

[[stat.sink]]
key = "sink_stat"
target = "*"


22 changes: 22 additions & 0 deletions extensions/http_source/conf/wpgen.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version = ""

[generator]
mode = "sample"
count = 100
speed = 20
parallel = 1


[output]
# 仅当存在 connect 时,其他 output 字段和旧式 [output.*] 子块将被忽略
name = "gen_out"
connect = "http_sink"
params = { endpoint= "http://localhost:8000/insert", fmt = "ndjson", compression = "none", timeout_secs = 30, max_retries = 3, batch_size = 10 }

[logging]
level = ""
module_levels = []
output = ""
file_path = "./data/logs"

[presets]
15 changes: 15 additions & 0 deletions extensions/http_source/models/oml/nginx.oml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name : /oml/normal/nginx
rule : Nginx/nginx
enable : true
---
size : digit = take(size);
status : digit = take(status);
str_status = match read(option:[status]) {
digit(500) => chars(Internal Server Error);
digit(404) => chars(Not Found);
};
match_chars = match read(option:[wp_src_ip]) {
ip(127.0.0.1) => chars(localhost);
!ip(127.0.0.1) => chars(attack_ip);
};
* : auto = read();
5 changes: 5 additions & 0 deletions extensions/http_source/models/wpl/nginx/parse.wpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package Nginx {
rule nginx {
(json)
}
}
1 change: 1 addition & 0 deletions extensions/http_source/models/wpl/nginx/sample.dat
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"http/request":"GET /nginx-logo.png HTTP/1.1","http/status":500}
15 changes: 15 additions & 0 deletions extensions/http_source/topology/sinks/business.d/example.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version = "2.0"

[sink_group]
name = "http"
rule = ["*"]
batch_timeout_ms=100
parallel = 1

[[sink_group.sinks]]
connect = "file_json_sink"
params = { file = "data.dat" }

[sink_group.sinks.expect]
ratio = 0.50
tol = 0.02
7 changes: 7 additions & 0 deletions extensions/http_source/topology/sinks/defaults.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version = "2.0"

[defaults]
[defaults.expect]
basis = "total_input"
min_samples = 100
mode = "error"
16 changes: 16 additions & 0 deletions extensions/http_source/topology/sinks/infra.d/default.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version = "2.0"

[sink_group]
name = "default"
oml = []

[[sink_group.sinks]]
name = "default_output"
connect = "blackhole_sink"

#connect = "file_raw_sink"
#params = { file = "default.dat" }

[sink_group.sinks.expect]
ratio = 1.00
tol = 0.02
15 changes: 15 additions & 0 deletions extensions/http_source/topology/sinks/infra.d/error.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
version = "2.0"

[sink_group]
name = "error"
oml = []

[[sink_group.sinks]]
name = "error_output"
connect = "file_raw_sink"
params = { file = "error.dat" }
tags = ["sink:error", "format:raw"]

[sink_group.sinks.expect]
ratio = 0.01
tol = 0.02
14 changes: 14 additions & 0 deletions extensions/http_source/topology/sinks/infra.d/miss.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version = "2.0"

[sink_group]
name = "miss"
oml = []

[[sink_group.sinks]]
name = "miss_output"
connect = "file_raw_sink"
params = { file = "miss.dat" }
tags = ["sink:miss", "format:raw"]

[sink_group.sinks.expect]
max = 2.0
9 changes: 9 additions & 0 deletions extensions/http_source/topology/sinks/infra.d/monitor.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version = "2.0"

[sink_group]
name = "monitor"

[[sink_group.sinks]]
name = "monitor_output"
connect = "file_proto_sink"
params = { file = "monitor.dat" }
9 changes: 9 additions & 0 deletions extensions/http_source/topology/sinks/infra.d/residue.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version = "2.0"

[sink_group]
name = "residue"

[[sink_group.sinks]]
name = "residue_output"
connect = "file_raw_sink"
params = { file = "residue.dat" }
Loading
Loading