Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1bd86e6
1. Refactoring the merge process to relax the 'config-alignment' prer…
popduke Oct 11, 2025
1db6032
1. local-engine refactoring to enforce read consistency based on Rock…
popduke Oct 16, 2025
e4721b9
Update DevOnlySettingProvider to use initial values for settings
popduke Oct 16, 2025
f6c4d47
Refactor AdaptiveReceiveQuota and related classes to improve quota ma…
popduke Oct 20, 2025
fcf06ed
Add MQTT metrics for tracking resend and deduplication bytes
popduke Oct 21, 2025
d4fa499
Enabling output rocksdb internal stats via metrics and configurable v…
popduke Oct 21, 2025
815d2e1
1. reinterpret compactWALThreshold as max log bytes before triggering…
popduke Oct 21, 2025
8a1a304
Optimize read path for WAL engine
popduke Oct 21, 2025
2178231
Optimize InboxStoreCoProc to reduce read/write overhead
popduke Oct 22, 2025
ad9e404
Fixed a bug during parsing AgentMetadata out of CRDT which may throw …
popduke Oct 23, 2025
7afc967
Optimize local engine's default configs
popduke Oct 23, 2025
d0e6448
New tenant-level setting for controlling if publish will message when…
popduke Oct 23, 2025
8a867c8
Add StorageEngineConfigUtil for dynamic storage engine configuration …
popduke Oct 23, 2025
25084d9
Refactor MQTT session handlers to unify channelInactive method as doT…
popduke Oct 23, 2025
db83fb6
Reduce the computational overhead of calculating size
popduke Oct 23, 2025
4d45754
Fix a bug in BatchQueryCall which may causing task duplicating
popduke Oct 25, 2025
37274ef
improve cancellation handling
popduke Oct 25, 2025
5fea1b9
Enhance inbox processing by improving hash distribution and implement…
popduke Oct 25, 2025
b4c13a9
InboxStoreConfig correction
popduke Oct 31, 2025
6574843
1. Reduce query/mutation tail latency by optimized thread usage
popduke Oct 31, 2025
229087c
Refactoring scheduler using more generalized SPI
popduke Oct 31, 2025
0bdfeef
Improved data plane tail latency by:
popduke Oct 31, 2025
4ba96bc
Refactor to make local engine pluggable
popduke Nov 3, 2025
1d2e9c1
Update batchinsert proto with compact format
popduke Nov 3, 2025
33f910b
Ignore license header for files under META-INF
popduke Nov 4, 2025
f5a4ea8
Closing checkpoint before closing DB to avoid crash
popduke Nov 4, 2025
c57178a
Prevented Checkpoint from being gc during reader opening
popduke Nov 4, 2025
df53d1a
1. remove "dbCheckpointRootDir" from WAL config
popduke Nov 4, 2025
3a35fd2
Adjust warn log ouput during config consolidation
popduke Nov 5, 2025
7474e3c
Disable legacy non-compact format of BatchInsertRequest
popduke Nov 5, 2025
b6bd59b
Enforce FIFO semantic for dist rpc
popduke Nov 5, 2025
8120404
Optimize cacheKey to reduce mem overhead
popduke Nov 5, 2025
077d1e1
Improve test stability in CI env
popduke Nov 5, 2025
f687a9c
Optimize cluster join process to speedup convergence
popduke Nov 5, 2025
a123caf
Enable zero-copy parsing for inter-cluster message parsing
popduke Nov 6, 2025
8ae261a
Support pluggable split hinters and factor out related interfaces
popduke Nov 6, 2025
11612a0
Decompose base-kv-local-engine for better code organization
popduke Nov 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
33 changes: 29 additions & 4 deletions .github/workflows/build-cov.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,43 @@ on:
branches:
- 'main'
jobs:
check-license:
name: "Check License"
prepare:
name: "Prepare"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
cache: maven
- name: Install without tests
run: mvn -q -DskipTests install

license:
name: "License Check"
needs: prepare
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
cache: maven
- name: Check License Header
uses: apache/skywalking-eyes/header@main
- name: Check Dependencies' License
uses: apache/skywalking-eyes/dependency@main
build:
env:
MAVEN_OPTS: -Xmx1g
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

test:
name: "BifroMQ Code Coverage Test"
needs: check-license
needs: license
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
33 changes: 29 additions & 4 deletions .github/workflows/build-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,43 @@ on:
- 'bugfix-**'
- 'fix-**'
jobs:
check-license:
name: "Check License"
prepare:
name: "Prepare"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
cache: maven
- name: Install without tests
run: mvn -q -DskipTests install

license:
name: "License Check"
needs: prepare
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
cache: maven
- name: Check License Header
uses: apache/skywalking-eyes/header@main
- name: Check Dependencies' License
uses: apache/skywalking-eyes/dependency@main
build:
env:
MAVEN_OPTS: -Xmx1g
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

test:
name: "Build BifroMQ"
needs: check-license
needs: license
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
3 changes: 3 additions & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ header:
- '.gitignore'
- '.gitattributes'
- 'lombok.config'
- 'META-INF/**'
- '**/META-INF/**'
- '**/resources/META-INF/**'
- '**/*.crt'
- '**/*.pem'
- '**/MANIFEST.MF'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,11 @@ private void handleJoin(Join join) {
messenger.send(ClusterMessage.newBuilder()
.setJoin(Join.newBuilder().setMember(local).build())
.build(), getMemberAddress(joinMember.getEndpoint()), true);
} else if (newMember) {
// send back a join to speed up convergence
messenger.send(ClusterMessage.newBuilder()
.setJoin(Join.newBuilder().setMember(local).build())
.build(), getMemberAddress(joinMember.getEndpoint()), true);
}
} else {
clearZombie(join.getExpectedHost());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basecluster.memberlist.agent;
Expand All @@ -23,10 +23,6 @@
import static org.apache.bifromq.basecrdt.core.api.CausalCRDTType.mvreg;
import static org.apache.bifromq.basecrdt.core.api.CausalCRDTType.ormap;

import org.apache.bifromq.basecluster.agent.proto.AgentMemberAddr;
import org.apache.bifromq.basecluster.agent.proto.AgentMemberMetadata;
import org.apache.bifromq.basecrdt.core.api.IMVReg;
import org.apache.bifromq.basecrdt.core.api.IORMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
Expand All @@ -41,6 +37,10 @@
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basecluster.agent.proto.AgentMemberAddr;
import org.apache.bifromq.basecluster.agent.proto.AgentMemberMetadata;
import org.apache.bifromq.basecrdt.core.api.IMVReg;
import org.apache.bifromq.basecrdt.core.api.IORMap;

@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
Expand All @@ -56,7 +56,8 @@ static Map<AgentMemberAddr, AgentMemberMetadata> toAgentMemberMap(IORMap agentCR
Iterator<IORMap.ORMapKey> orMapKeyItr = agentCRDT.keys();
while (orMapKeyItr.hasNext()) {
IORMap.ORMapKey orMapKey = orMapKeyItr.next();
agentMemberMap.put(parseAgentMemberAddr(orMapKey), parseMetadata(agentCRDT.getMVReg(orMapKey.key())).get());
Optional<AgentMemberMetadata> meta = parseMetadata(agentCRDT.getMVReg(orMapKey.key()));
meta.ifPresent(m -> agentMemberMap.put(parseAgentMemberAddr(orMapKey), m));
}
return agentMemberMap;
}
Expand All @@ -78,11 +79,11 @@ private static Optional<AgentMemberMetadata> parseMetadata(IMVReg value) {
return AgentMemberMetadata.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
log.error("Unable to parse agent host node", e);
// this exception should not happen
// should not happen, skip the broken value
return null;
}
}), Objects::nonNull));
metaList.sort(Comparator.comparingLong(AgentMemberMetadata::getHlc));
metaList.sort(Comparator.comparingLong(AgentMemberMetadata::getHlc).reversed());
return Optional.ofNullable(metaList.isEmpty() ? null : metaList.get(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basecluster.messenger;

import org.apache.bifromq.basecluster.messenger.proto.DirectMessage;
import org.apache.bifromq.basecluster.messenger.proto.GossipMessage;
import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;
import org.apache.bifromq.basecluster.proto.ClusterMessage;
import org.apache.bifromq.basecluster.transport.ITransport;
import org.apache.bifromq.basecluster.util.RandomUtils;
import com.google.common.collect.Maps;
import com.google.protobuf.InvalidProtocolBufferException;
import io.micrometer.core.instrument.Counter;
Expand All @@ -46,26 +40,27 @@
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basecluster.messenger.proto.DirectMessage;
import org.apache.bifromq.basecluster.messenger.proto.GossipMessage;
import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;
import org.apache.bifromq.basecluster.proto.ClusterMessage;
import org.apache.bifromq.basecluster.transport.ITransport;
import org.apache.bifromq.basecluster.util.RandomUtils;
import org.apache.bifromq.baseenv.ZeroCopyParser;

@Slf4j
public class Messenger implements IMessenger {
private enum State {
INIT, START, STOP
}

private State state = State.INIT;
// threshold for determine which transport to use
private final MessengerTransport transport;
private final CompositeDisposable disposables = new CompositeDisposable();
private final InetSocketAddress localAddress;
private final Gossiper gossiper;
private final Subject<Timed<MessageEnvelope>> publisher =
PublishSubject.<Timed<MessageEnvelope>>create().toSerialized();

private final Scheduler scheduler;
private final MessengerOptions opts;
private final MetricManager metricManager;

private State state = State.INIT;
@Builder
private Messenger(ITransport transport, Scheduler scheduler, MessengerOptions opts) {
this.transport = new MessengerTransport(transport);
Expand Down Expand Up @@ -202,8 +197,8 @@ private void onMessengerMessage(Timed<MessengerMessageEnvelope> timedMessageEnve
switch (messengerMessageEnvelope.message.getMessengerMessageTypeCase()) {
case DIRECT:
try {
ClusterMessage clusterMessage =
ClusterMessage.parseFrom(messengerMessageEnvelope.message.getDirect().getPayload());
ClusterMessage clusterMessage = ZeroCopyParser.parse(
messengerMessageEnvelope.message.getDirect().getPayload(), ClusterMessage.parser());
log.trace("Received message: sender={}, message={}",
messengerMessageEnvelope.sender, clusterMessage);
metricManager.msgRecvCounters.get(clusterMessage.getClusterMessageTypeCase()).increment();
Expand Down Expand Up @@ -244,6 +239,10 @@ private void onError(Throwable throwable) {
log.error("Received unexpected error:", throwable);
}

private enum State {
INIT, START, STOP
}

private static class MetricManager {
final Map<ClusterMessage.ClusterMessageTypeCase, Counter> msgSendCounters = Maps.newHashMap();
final Map<ClusterMessage.ClusterMessageTypeCase, Counter> msgRecvCounters = Maps.newHashMap();
Expand All @@ -253,17 +252,19 @@ private static class MetricManager {

MetricManager(InetSocketAddress localAddress) {
for (ClusterMessage.ClusterMessageTypeCase typeCase : ClusterMessage.ClusterMessageTypeCase.values()) {
Tags tags = Tags
.of("local", localAddress.getAddress().getHostAddress() + ":" + localAddress.getPort())
.and("type", typeCase.name());
msgSendCounters.put(typeCase,
Metrics.counter("basecluster.send.count", tags));
msgRecvCounters.put(typeCase,
Metrics.counter("basecluster.recv.count", tags));
gossipGenCounters.put(typeCase,
Metrics.counter("basecluster.gossip.gen.count", tags));
gossipHeardCounters.put(typeCase,
Metrics.counter("basecluster.gossip.heard.count", tags));
if (typeCase != ClusterMessage.ClusterMessageTypeCase.CLUSTERMESSAGETYPE_NOT_SET) {
Tags tags = Tags
.of("local", localAddress.getAddress().getHostAddress() + ":" + localAddress.getPort())
.and("type", typeCase.name());
msgSendCounters.put(typeCase,
Metrics.counter("basecluster.send.count", tags));
msgRecvCounters.put(typeCase,
Metrics.counter("basecluster.recv.count", tags));
gossipGenCounters.put(typeCase,
Metrics.counter("basecluster.gossip.gen.count", tags));
gossipHeardCounters.put(typeCase,
Metrics.counter("basecluster.gossip.heard.count", tags));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.bifromq.basecluster.messenger.proto.MessengerMessage;
import org.apache.bifromq.basecluster.transport.ITransport;
import org.apache.bifromq.basecluster.transport.PacketEnvelope;
import org.apache.bifromq.baseenv.ZeroCopyParser;

@Slf4j
final class MessengerTransport {
Expand Down Expand Up @@ -62,9 +63,11 @@ Observable<Timed<MessengerMessageEnvelope>> receive() {
private Observable<Timed<MessengerMessageEnvelope>> convert(PacketEnvelope packetEnvelope) {
return Observable.fromIterable(packetEnvelope.data.stream().map(b -> {
try {
// Parse with aliasing directly from ByteString to reduce copies
MessengerMessage msg = ZeroCopyParser.parse(b, MessengerMessage.parser());
MessengerMessageEnvelope mmEnvelop = MessengerMessageEnvelope.builder()
.recipient(packetEnvelope.recipient)
.message(MessengerMessage.parseFrom(b))
.message(msg)
.sender(packetEnvelope.sender)
.build();
return new Timed<>(mmEnvelop, System.currentTimeMillis(), TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* under the License.
*/

package org.apache.bifromq.basecluster.transport;

import org.apache.bifromq.basecluster.transport.proto.Packet;
import org.apache.bifromq.baseenv.NettyEnv;
import org.apache.bifromq.basehlc.HLC;
import com.google.protobuf.InvalidProtocolBufferException;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
Expand Down Expand Up @@ -49,6 +46,10 @@
import lombok.Builder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.bifromq.basecluster.transport.proto.Packet;
import org.apache.bifromq.baseenv.NettyEnv;
import org.apache.bifromq.baseenv.ZeroCopyParser;
import org.apache.bifromq.basehlc.HLC;

@Slf4j
public final class UDPTransport extends AbstractTransport {
Expand Down Expand Up @@ -151,7 +152,8 @@ protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) {
try {
byte[] data = new byte[dp.content().readableBytes()];
dp.content().readBytes(data);
Packet packet = Packet.parseFrom(data);
// Parse with aliasing to avoid extra copies of bytes fields
Packet packet = ZeroCopyParser.parse(data, Packet.parser());
transportLatency.record(HLC.INST.getPhysical(packet.getHlc() - HLC.INST.get()));
doReceive(packet, dp.sender(), dp.recipient());
} catch (InvalidProtocolBufferException e) {
Expand Down
Loading
Loading