Skip to content

Commit c8f3d03

Browse files
authored
Merge pull request #14 from scalecube/fix-metrics-transmitter-race-condition
Fixed race-condition for MetricsRecorder/MetricsTransmitterAgent
2 parents 617af81 + e6739e3 commit c8f3d03

File tree

4 files changed

+23
-16
lines changed

4 files changed

+23
-16
lines changed

metrics/src/main/java/io/scalecube/metrics/CountersReaderAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public enum State {
4545
private final UnsafeBuffer headerBuffer = new UnsafeBuffer();
4646
private long countersStartTimestamp = -1;
4747
private long countersPid = -1;
48-
private long countersValuesBufferLength = -1;
48+
private int countersValuesBufferLength = -1;
4949
private CountersReader countersReader;
5050
private State state = State.CLOSED;
5151

metrics/src/main/java/io/scalecube/metrics/CountersRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ public static UnsafeBuffer createCountersMetaDataBuffer(
305305
public static UnsafeBuffer createCountersValuesBuffer(
306306
ByteBuffer buffer, DirectBuffer headerBuffer) {
307307
final var countersValuesBufferLength = countersValuesBufferLength(headerBuffer);
308-
final int offset = HEADER_LENGTH + countersMetaDataBufferLength(countersValuesBufferLength);
308+
final var offset = HEADER_LENGTH + countersMetaDataBufferLength(countersValuesBufferLength);
309309
final var length = countersValuesBufferLength;
310310
return new UnsafeBuffer(buffer, offset, length);
311311
}

metrics/src/main/java/io/scalecube/metrics/MetricsRecorder.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -277,20 +277,20 @@ private void concludeMetricsDirectory() {
277277
if (metricsDir.isDirectory()) {
278278
final var file = new File(metricsDir, METRICS_FILE);
279279
if (file.exists()) {
280-
final var buffer = mapExistingFile(file, METRICS_FILE);
280+
final var mappedByteBuffer = mapExistingFile(file, METRICS_FILE);
281281
try {
282-
if (!LayoutDescriptor.isMetricsHeaderLengthSufficient(buffer.capacity())) {
282+
if (!LayoutDescriptor.isMetricsHeaderLengthSufficient(mappedByteBuffer.capacity())) {
283283
delete(metricsDir, false);
284284
} else {
285-
final var headerBuffer = LayoutDescriptor.createHeaderBuffer(buffer);
285+
final var headerBuffer = LayoutDescriptor.createHeaderBuffer(mappedByteBuffer);
286286
final var startTimestamp = ManagementFactory.getRuntimeMXBean().getStartTime();
287287
final var pid = ManagementFactory.getRuntimeMXBean().getPid();
288288
if (!LayoutDescriptor.isMetricsActive(headerBuffer, startTimestamp, pid)) {
289289
delete(metricsDir, false);
290290
}
291291
}
292292
} finally {
293-
BufferUtil.free(buffer);
293+
BufferUtil.free(mappedByteBuffer);
294294
}
295295
}
296296
}
@@ -317,14 +317,15 @@ private void concludeMetricsBuffer() {
317317
final var totalLength = headerLength + bufferLength;
318318

319319
if (!file.exists()) {
320-
final var buffer = mapNewFile(file, totalLength);
320+
final var mappedByteBuffer = mapNewFile(file, totalLength);
321321
try {
322-
final var headerBuffer = LayoutDescriptor.createHeaderBuffer(buffer);
322+
final var headerBuffer = LayoutDescriptor.createHeaderBuffer(mappedByteBuffer);
323323
final var startTimestamp = ManagementFactory.getRuntimeMXBean().getStartTime();
324324
final var pid = ManagementFactory.getRuntimeMXBean().getPid();
325325
LayoutDescriptor.fillHeaderBuffer(headerBuffer, startTimestamp, pid, bufferLength);
326+
mappedByteBuffer.force();
326327
} finally {
327-
BufferUtil.free(buffer);
328+
BufferUtil.free(mappedByteBuffer);
328329
}
329330
}
330331

@@ -478,7 +479,7 @@ public static long pid(DirectBuffer headerBuffer) {
478479
return headerBuffer.getLong(PID_OFFSET);
479480
}
480481

481-
public static long metricsBufferLength(DirectBuffer headerBuffer) {
482+
public static int metricsBufferLength(DirectBuffer headerBuffer) {
482483
return headerBuffer.getInt(METRICS_BUFFER_LENGTH_OFFSET);
483484
}
484485

metrics/src/main/java/io/scalecube/metrics/MetricsTransmitterAgent.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public enum State {
4343
private final UnsafeBuffer headerBuffer = new UnsafeBuffer();
4444
private long metricsStartTimestamp = -1;
4545
private long metricsPid = -1;
46+
private int metricsBufferLength = -1;
4647
private State state = State.CLOSED;
4748

4849
MetricsTransmitterAgent(
@@ -102,16 +103,20 @@ private int init() {
102103
}
103104

104105
metricsByteBuffer = mapExistingFile(metricsFile, METRICS_FILE);
105-
headerBuffer.wrap(metricsByteBuffer, 0, LayoutDescriptor.HEADER_LENGTH);
106+
final var headerLength = LayoutDescriptor.HEADER_LENGTH;
107+
headerBuffer.wrap(metricsByteBuffer, 0, headerLength);
106108
metricsStartTimestamp = LayoutDescriptor.startTimestamp(headerBuffer);
107109
metricsPid = LayoutDescriptor.pid(headerBuffer);
110+
metricsBufferLength = LayoutDescriptor.metricsBufferLength(headerBuffer);
108111

109-
final var headerLength = LayoutDescriptor.HEADER_LENGTH;
110-
final var totalLength = metricsByteBuffer.capacity();
111-
final var length = totalLength - headerLength;
112+
if (metricsBufferLength <= 0) {
113+
state(State.CLEANUP);
114+
return 0;
115+
}
112116

113117
metricsBuffer =
114-
new ManyToOneRingBuffer(new UnsafeBuffer(metricsByteBuffer, headerLength, length));
118+
new ManyToOneRingBuffer(
119+
new UnsafeBuffer(metricsByteBuffer, headerLength, metricsBufferLength));
115120

116121
state(State.RUNNING);
117122
LOGGER.info("[{}] Initialized, now running", roleName());
@@ -149,7 +154,7 @@ private boolean isActive(File metricsFile) {
149154
LOGGER.warn("[{}] {} has not sufficient length", roleName(), metricsFile);
150155
return false;
151156
}
152-
if (metricsStartTimestamp != -1
157+
if (metricsBufferLength != -1
153158
&& !LayoutDescriptor.isMetricsActive(headerBuffer, metricsStartTimestamp, metricsPid)) {
154159
LOGGER.warn("[{}] {} is not active", roleName(), metricsFile);
155160
return false;
@@ -173,6 +178,7 @@ private int cleanup() {
173178
metricsFile = null;
174179
metricsStartTimestamp = -1;
175180
metricsPid = -1;
181+
metricsBufferLength = -1;
176182

177183
State previous = state;
178184
if (previous != State.CLOSED) { // when it comes from onClose()

0 commit comments

Comments
 (0)