Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion fluent-plugin-oceanbase-logs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Every record includes **`ob_log_type`** (`slow_sql` or `top_sql`). With `include

| Gem | Fluentd | Ruby |
| --- | --- | --- |
| >= 0.1.2 | >= 1.8.0 | >= 2.4 |
| >= 0.1.5 | >= 1.8.0 | >= 2.4 |

For **Grafana Loki** output you additionally need [fluent-plugin-grafana-loki](https://github.com/grafana/fluent-plugin-grafana-loki).

Expand Down Expand Up @@ -44,6 +44,42 @@ tenant_id "#{ENV['OCEANBASE_TENANT_ID']}"

Optional: `OCEANBASE_ENDPOINT`, `OCEANBASE_FETCH_INTERVAL`, `OCEANBASE_LOOKBACK_SECONDS`, `OCEANBASE_DB_NAME`, `OCEANBASE_SEARCH_KEYWORD`, `OCEANBASE_PROJECT_ID` — see `.env.example` and the Docker table below.

### Multiple instances, tenants, and databases

Use one or more `<target>` blocks. Each block sets **`instance_id`** and **`tenant_id`** (required). Optional **`db_name`** applies the same `dbName` filter as the top-level parameter, for that scope only.

When any `<target>` is present, the plugin **only** uses those scopes; top-level `instance_id` / `tenant_id` / `db_name` are ignored (you may leave them empty). Global options such as `access_key_id`, `log_type`, `search_keyword`, `endpoint`, and `fetch_interval` still apply to every target.

Example: two clusters, one tenant each, and a second scope that filters one database on another tenant:

```conf
<source>
@type oceanbase_logs
tag oceanbase.slow_sql
log_type slow_sql
access_key_id "#{ENV['OCEANBASE_ACCESS_KEY_ID']}"
access_key_secret "#{ENV['OCEANBASE_ACCESS_KEY_SECRET']}"
endpoint "#{ENV['OCEANBASE_ACCESS_KEY_SECRET']}"
fetch_interval 60
lookback_seconds 600
deduplicate true
include_metadata true
<target>
instance_id "OCEANBASE_INSTANCE_1"
tenant_id "OCEANBASE_TENANT_A"
</target>
<target>
instance_id "OCEANBASE_INSTANCE_2"
tenant_id "OCEANBASE_TENANT_B"
</target>
<storage>
@type local
persistent true
path /var/log/fluentd/slow_sql_seen
</storage>
</source>
```

### Example: Slow SQL → JSON file

Full sample: [`example/fluentd.conf`](example/fluentd.conf).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ services:

fluentd-oceanbase-demo:
image: fluent/fluentd:v1.16-1
platform: linux/amd64
container_name: fluentd-oceanbase-demo
user: root
command:
Expand All @@ -46,7 +47,7 @@ services:
OCEANBASE_ENDPOINT: "${OCEANBASE_ENDPOINT:-api-cloud-cn.oceanbase.com}"
OCEANBASE_FETCH_INTERVAL: "${OCEANBASE_FETCH_INTERVAL:-60}"
OCEANBASE_LOOKBACK_SECONDS: "${OCEANBASE_LOOKBACK_SECONDS:-600}"
OCEANBASE_DB_NAME: "${OCEANBASE_DB_NAME:-test}"
OCEANBASE_DB_NAME: "${OCEANBASE_DB_NAME:-}"
OCEANBASE_SEARCH_KEYWORD: "${OCEANBASE_SEARCH_KEYWORD:-}"
OCEANBASE_PROJECT_ID: "${OCEANBASE_PROJECT_ID:-}"
networks:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Gem::Specification.new do |spec|
spec.summary = "Fluentd input plugin for OceanBase Cloud Logs"
spec.description = "Fetches Slow SQL and Top SQL per-execution samples from OceanBase Cloud " \
"and emits them as Fluentd events (one record per trace, dedup by traceId)."
spec.homepage = "https://github.com/sc-source/fluent-plugin-oceanbase-logs"
spec.homepage = "https://github.com/oceanbase/ecology-plugins/tree/main/fluent-plugin-oceanbase-logs"
spec.license = "Apache-2.0"

spec.files = Dir['lib/**/*', 'README.md', 'LICENSE']
Expand Down
88 changes: 88 additions & 0 deletions fluent-plugin-oceanbase-logs/test/plugin/test_in_oceanbase_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,50 @@ def stub_samples_auth(samples_response, times: 2)
assert_raise(Fluent::ConfigError) { create_driver(conf) }
end

test 'multiple targets: top-level instance_id and tenant_id may be empty' do
conf = %(
tag oceanbase.logs
access_key_id test_access_key_id
access_key_secret test_access_key_secret
fetch_interval 60
lookback_seconds 300
deduplicate false
<target>
instance_id ob_a
tenant_id t_a
</target>
<target>
instance_id ob_b
tenant_id t_b
db_name analytics_db
</target>
)
d = create_driver(conf)
assert_equal 2, d.instance.instance_variable_get(:@fetch_scopes).size
scopes = d.instance.instance_variable_get(:@fetch_scopes)
assert_equal 'ob_a', scopes[0].instance_id
assert_equal 't_a', scopes[0].tenant_id
assert_nil scopes[0].db_name
assert_equal 'ob_b', scopes[1].instance_id
assert_equal 'analytics_db', scopes[1].db_name
end

test 'target section missing tenant_id raises error' do
conf = %(
tag oceanbase.logs
access_key_id test_access_key_id
access_key_secret test_access_key_secret
fetch_interval 60
lookback_seconds 300
deduplicate false
<target>
instance_id ob_a
tenant_id ""
</target>
)
assert_raise(Fluent::ConfigError) { create_driver(conf) }
end

end

# ----------------------------------------------------------- fetching
Expand Down Expand Up @@ -267,6 +311,50 @@ def stub_samples_auth(samples_response, times: 2)
d.run(timeout: 3)
assert_equal 0, d.events.length
end

test 'slow_sql with two targets calls both APIs and emits scoped metadata' do
list_one = {
'success' => true,
'data' => { 'dataList' => [{ 'sqlId' => 'SQL_MULTI_A' }], 'total' => 1 },
}
list_two = {
'success' => true,
'data' => { 'dataList' => [{ 'sqlId' => 'SQL_MULTI_B' }], 'total' => 1 },
}
stub_digest_auth(
%r{instances/ob_a/tenants/t_a/slowSql},
list_one
)
stub_digest_auth(
%r{instances/ob_b/tenants/t_b/slowSql},
list_two
)
stub_samples_auth(SAMPLES_RESPONSE, times: 2)

conf = %(
tag oceanbase.logs
access_key_id test_access_key_id
access_key_secret test_access_key_secret
fetch_interval 60
lookback_seconds 300
deduplicate false
<target>
instance_id ob_a
tenant_id t_a
</target>
<target>
instance_id ob_b
tenant_id t_b
</target>
)
d = create_driver(conf)
d.run(timeout: 3)

events = d.events
assert events.length >= 2, "Expected at least 2 events, got #{events.length}"
insts = events.map { |e| e[2]['ob_instance_id'] }.uniq.sort
assert_equal %w[ob_a ob_b], insts
end
end

# ---------------------------------------------------------- deduplication
Expand Down
Loading