Skip to content

Commit 3f079b2

Browse files
Dmitry Konstantinovnetudima
authored andcommitted
Fix memory leak in BufferPoolAllocator when a capacity needs to be extended
Before the fix during a capacity extension BufferPoolAllocator returned to BufferPool a sliced ByteBuffer wrapper object instead of the originally allocated one, so the ByteBuffer was not recycled by BufferPool Adjust BufferPoolAllocatorTest to test the ByteBuf capacity extension with a real BufferPool behavior Patch by Dmitry Konstantinov; reviewed by Michael Semb Wever for CASSANDRA-20753
1 parent 61014f2 commit 3f079b2

File tree

4 files changed

+96
-45
lines changed

4 files changed

+96
-45
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
4.0.20
2+
* Fix memory leak in BufferPoolAllocator when a capacity needs to be extended (CASSANDRA-20753)
23
* Leveled Compaction doesn't validate maxBytesForLevel when the table is altered/created (CASSANDRA-20570)
34
* Updated dtest-api to 0.0.18 and removed JMX-related classes that now live in the dtest-api (CASSANDRA-20884)
45

src/java/org/apache/cassandra/net/BufferPoolAllocator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ public long usedSizeInBytes()
9191
return bufferPool.usedSizeInBytes();
9292
}
9393

94+
@VisibleForTesting
95+
long overflowMemoryInBytes()
96+
{
97+
return bufferPool.overflowMemoryInBytes();
98+
}
99+
94100
void release()
95101
{
96102
}
@@ -117,6 +123,7 @@ public ByteBuf capacity(int newCapacity)
117123

118124
ByteBuf newBuffer = super.capacity(newCapacity);
119125
ByteBuffer nioBuffer = newBuffer.nioBuffer(0, newBuffer.capacity());
126+
nioBuffer = bufferPool.unwrapBufferPoolManagedBuffer(nioBuffer);
120127

121128
bufferPool.put(wrapped);
122129
wrapped = nioBuffer;

src/java/org/apache/cassandra/utils/memory/BufferPool.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,4 +1613,30 @@ int unsafeNumChunks()
16131613
+ (pool.chunks.chunk1 != null ? 1 : 0)
16141614
+ (pool.chunks.chunk2 != null ? 1 : 0);
16151615
}
1616+
1617+
/**
1618+
* @return the inner buffer if it has a BufferPool.Chunk attached
1619+
* and originalBuffer in other cases
1620+
*/
1621+
public ByteBuffer unwrapBufferPoolManagedBuffer(ByteBuffer originalBuffer)
1622+
{
1623+
int MAX_DEPTH = 32; // a protection against possible loops in attachments
1624+
int depth = 0;
1625+
ByteBuffer buffer = originalBuffer;
1626+
do
1627+
{
1628+
if (buffer == null || !isExactlyDirect(buffer))
1629+
return originalBuffer;
1630+
if (Chunk.getParentChunk(buffer) != null)
1631+
return buffer;
1632+
1633+
Object attachment = MemoryUtil.getAttachment(buffer);
1634+
if (!(attachment instanceof ByteBuffer))
1635+
return originalBuffer;
1636+
buffer = (ByteBuffer) attachment;
1637+
depth++;
1638+
}
1639+
while (depth < MAX_DEPTH);
1640+
return originalBuffer;
1641+
}
16161642
}

test/unit/org/apache/cassandra/net/BufferPoolAllocatorTest.java

Lines changed: 62 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -22,57 +22,68 @@
2222
import java.util.Arrays;
2323
import java.util.Random;
2424

25+
import org.junit.BeforeClass;
2526
import org.junit.Test;
2627

2728
import io.netty.buffer.ByteBuf;
2829
import org.apache.cassandra.config.DatabaseDescriptor;
30+
import org.assertj.core.api.Assertions;
2931

3032
import static org.junit.Assert.assertArrayEquals;
3133
import static org.junit.Assert.assertEquals;
3234

3335
public class BufferPoolAllocatorTest
3436
{
35-
@Test
36-
public void testAdoptedBufferContentAfterResize() {
37+
38+
@BeforeClass
39+
public static void beforeClass()
40+
{
3741
DatabaseDescriptor.clientInitialization();
38-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 500);
39-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
42+
// cache size hould be more than a macro chunk size for proper pool testing
43+
// if it is 0 or less than a macro chunk size we actually do not pool
44+
DatabaseDescriptor.getRawConfig().networking_cache_size_in_mb = 128;
45+
}
4046

47+
@Test
48+
public void testAdoptedBufferContentAfterResize() {
49+
ByteBuf buffer = allocateByteBuf(200, 500);
50+
int originalCapacity = buffer.capacity();
4151
byte[] content = new byte[300];
4252

4353
Random rand = new Random();
4454
rand.nextBytes(content);
4555

4656
buffer.writeBytes(Arrays.copyOfRange(content, 0, 200));
47-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
57+
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
4858

4959
buffer.writeBytes(Arrays.copyOfRange(content, 200, 300));
60+
int increasedCapacity = buffer.capacity();
61+
assertEquals(increasedCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
5062

5163
byte[] bufferContent = new byte[300];
5264

5365
BufferPoolAllocator.Wrapped wrapped = (BufferPoolAllocator.Wrapped) buffer;
5466
ByteBuffer adopted = wrapped.adopt();
5567
adopted.get(bufferContent);
5668
assertArrayEquals(content, bufferContent);
57-
assertEquals(500, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
69+
assertEquals(increasedCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
5870

5971
GlobalBufferPoolAllocator.instance.put(adopted);
60-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
72+
ensureThatAllMemoryIsReturnedBackToBufferPool();
6173
}
6274

6375
@Test
6476
public void testAdoptedBufferContentBeforeResize() {
65-
DatabaseDescriptor.clientInitialization();
66-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 300);
67-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
77+
ByteBuf buffer = allocateByteBuf(200, 300);
78+
int originalCapacity = buffer.capacity();
6879

6980
byte[] content = new byte[200];
7081

7182
Random rand = new Random();
7283
rand.nextBytes(content);
7384

7485
buffer.writeBytes(content);
75-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
86+
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
7687

7788
byte[] bufferContent = new byte[200];
7889

@@ -82,115 +93,121 @@ public void testAdoptedBufferContentBeforeResize() {
8293
assertArrayEquals(content, bufferContent);
8394

8495
GlobalBufferPoolAllocator.instance.put(adopted);
85-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
96+
ensureThatAllMemoryIsReturnedBackToBufferPool();
8697
}
8798

8899
@Test
89100
public void testPutPooledBufferBackIntoPool() {
90-
DatabaseDescriptor.clientInitialization();
91-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 500);
92-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
101+
ByteBuf buffer = allocateByteBuf(200, 500);
93102
buffer.writeBytes(new byte[200]);
94103

95104
buffer.release();
96-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
105+
ensureThatAllMemoryIsReturnedBackToBufferPool();
97106
}
98107

99108
@Test
100109
public void testPutResizedBufferBackIntoPool() {
101-
DatabaseDescriptor.clientInitialization();
102-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 500);
103-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
110+
ByteBuf buffer = allocateByteBuf(200, 500);
104111
buffer.writeBytes(new byte[500]);
105112

106113
buffer.release();
107-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
114+
ensureThatAllMemoryIsReturnedBackToBufferPool();
108115
}
109116

110117
@Test
111118
public void testBufferDefaultMaxCapacity()
112119
{
113-
DatabaseDescriptor.clientInitialization();
114120
ByteBuf noMaxCapacity = GlobalBufferPoolAllocator.instance.buffer(100);
115121
noMaxCapacity.writeBytes(new byte[100]);
116122
assertEquals(100, noMaxCapacity.readableBytes());
117123
noMaxCapacity.release();
118-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
124+
ensureThatAllMemoryIsReturnedBackToBufferPool();
119125
}
120126

121127
@Test
122128
public void testBufferWithMaxCapacity()
123129
{
124-
DatabaseDescriptor.clientInitialization();
125-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(100, 500);
130+
ByteBuf buffer = allocateByteBuf(100, 500);
126131
buffer.writeBytes(new byte[500]);
127132
assertEquals(500, buffer.readableBytes());
128-
assertEquals(500, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
133+
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
129134
buffer.release();
130-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
135+
ensureThatAllMemoryIsReturnedBackToBufferPool();
131136
}
132137

133138
@Test
134139
public void testBufferContentAfterResize()
135140
{
136-
DatabaseDescriptor.clientInitialization();
137-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 300);
138-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
141+
ByteBuf buffer = allocateByteBuf(200, 300);
142+
int originalCapacity = buffer.capacity();
139143

140144
byte[] content = new byte[300];
141-
142145
Random rand = new Random();
143146
rand.nextBytes(content);
144147

145148
buffer.writeBytes(Arrays.copyOfRange(content, 0, 200));
146-
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
149+
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
147150

148151
buffer.writeBytes(Arrays.copyOfRange(content, 200, 300));
149152

150153
byte[] bufferContent = new byte[300];
151154
buffer.readBytes(bufferContent);
152155
assertArrayEquals(content, bufferContent);
153-
assertEquals(300, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
156+
Assertions.assertThat(buffer.capacity()).isGreaterThanOrEqualTo(300);
157+
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
158+
154159
buffer.release();
155-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
160+
ensureThatAllMemoryIsReturnedBackToBufferPool();
161+
156162
}
157163

158164
@Test(expected = IndexOutOfBoundsException.class)
159165
public void testBufferExceedMaxCapacity()
160166
{
161-
DatabaseDescriptor.clientInitialization();
162-
ByteBuf maxCapacity = GlobalBufferPoolAllocator.instance.buffer(100, 200);
167+
ByteBuf maxCapacity = allocateByteBuf(100, 200);
163168
try
164169
{
165170
maxCapacity.writeBytes(new byte[300]);
166171
} finally {
167172
maxCapacity.release();
168-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
173+
ensureThatAllMemoryIsReturnedBackToBufferPool();
169174
}
170175
}
171176

172177
@Test
173178
public void testResizeBufferMultipleTimes()
174179
{
175-
DatabaseDescriptor.clientInitialization();
176-
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(100, 2000);
180+
ByteBuf buffer = allocateByteBuf(100, 2000);
177181
buffer.writeBytes(new byte[200]);
178182
assertEquals(200, buffer.readableBytes());
179-
assertEquals(256, buffer.capacity());
180-
assertEquals(256, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
183+
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
181184

182185
buffer.writeBytes(new byte[100]);
183186
assertEquals(300, buffer.readableBytes());
184-
assertEquals(512, buffer.capacity());
185-
assertEquals(512, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
187+
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
186188

187189
buffer.writeBytes(new byte[300]);
188190
assertEquals(600, buffer.readableBytes());
189-
assertEquals(1024, buffer.capacity());
190-
assertEquals(1024, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
191+
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
191192

192193
buffer.release();
193-
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
194+
ensureThatAllMemoryIsReturnedBackToBufferPool();
195+
}
196+
197+
private static ByteBuf allocateByteBuf(int initialCapacity, int maxCapacity)
198+
{
199+
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(initialCapacity, maxCapacity);
200+
int originalCapacity = buffer.capacity();
201+
202+
// BufferPool can allocate more capacity than requested to avoid fragmentation
203+
Assertions.assertThat(originalCapacity).isGreaterThanOrEqualTo(initialCapacity);
204+
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
205+
return buffer;
194206
}
195207

208+
private static void ensureThatAllMemoryIsReturnedBackToBufferPool()
209+
{
210+
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
211+
assertEquals(0, GlobalBufferPoolAllocator.instance.overflowMemoryInBytes());
212+
}
196213
}

0 commit comments

Comments
 (0)