Skip to content

Commit 274d704

Browse files
committed
Make automatic flushing of compression configurable
1 parent 06671fd commit 274d704

File tree

6 files changed

+132
-23
lines changed

6 files changed

+132
-23
lines changed

stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSu
4545
case object AllDataAllowed extends Exception with NoStackTrace
4646
protected def corruptInputCheck: Boolean = true
4747

48+
protected def autoFlush: Boolean = true
49+
4850
def extraTests(): Unit = {}
4951

5052
s"The $codecName codec" should {
@@ -145,6 +147,7 @@ abstract class CoderSpec(codecName: String) extends AnyWordSpec with CodecSpecSu
145147
}
146148

147149
"be able to decode chunk-by-chunk (depending on input chunks)" in {
150+
assume(autoFlush)
148151
val minLength = 100
149152
val maxLength = 1000
150153
val numElements = 1000
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.stream.io.compression
19+
20+
import org.apache.pekko.stream.scaladsl.{ Compression, Flow }
21+
import org.apache.pekko.util.ByteString
22+
23+
import java.util.zip.Deflater
24+
25+
class DeflateAutoFlushSpec extends DeflateSpec {
26+
override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
27+
Compression.deflate(Deflater.BEST_COMPRESSION, nowrap = false, autoFlush = false)
28+
override protected val autoFlush: Boolean = false
29+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.stream.io.compression
19+
20+
import org.apache.pekko.stream.scaladsl.{ Compression, Flow }
21+
import org.apache.pekko.util.ByteString
22+
23+
import java.util.zip.Deflater
24+
25+
class GzipAutoFlushSpec extends GzipSpec {
26+
override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
27+
Compression.gzip(Deflater.BEST_COMPRESSION, autoFlush = false)
28+
override protected val autoFlush: Boolean = false
29+
}

stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,21 @@ import pekko.util.ByteString
2828
/**
2929
* Creates a flow from a compressor constructor.
3030
*/
31-
def compressorFlow(newCompressor: () => Compressor): Flow[ByteString, ByteString, NotUsed] =
31+
def compressorFlow(newCompressor: () => Compressor, autoFlush: Boolean = true)
32+
: Flow[ByteString, ByteString, NotUsed] =
3233
Flow.fromGraph {
3334
new SimpleLinearGraphStage[ByteString] {
3435
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
3536
new GraphStageLogic(shape) with InHandler with OutHandler {
3637
val compressor = newCompressor()
3738

3839
override def onPush(): Unit = {
39-
val data = compressor.compressAndFlush(grab(in))
40+
val grabbed = grab(in)
41+
val data = if (autoFlush)
42+
compressor.compressAndFlush(grabbed)
43+
else
44+
compressor.compress(grabbed)
45+
4046
if (data.nonEmpty) push(out, data)
4147
else pull(in)
4248
}

stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ object Compression {
4747
scaladsl.Compression.inflate(maxBytesPerChunk, nowrap).asJava
4848

4949
/**
50-
* Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor
51-
* will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]]
52-
* coming out of the flow can be fully decompressed without waiting for additional data. This may
53-
* come at a compression performance cost for very small chunks.
50+
* Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor will
51+
* flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]]
52+
* coming out of the flow can be fully decompressed without waiting for additional data. This may come at
53+
* a compression performance cost for very small chunks.
5454
*/
5555
def gzip: Flow[ByteString, ByteString, NotUsed] =
5656
scaladsl.Compression.gzip.asJava
@@ -64,10 +64,21 @@ object Compression {
6464
scaladsl.Compression.gzip(level).asJava
6565

6666
/**
67-
* Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor
68-
* will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]]
69-
* coming out of the flow can be fully decompressed without waiting for additional data. This may
70-
* come at a compression performance cost for very small chunks.
67+
* Same as [[gzip]] with a custom level and configurable flush mode.
68+
*
69+
* @param level Compression level (0-9)
70+
* @param autoFlush If true will automatically flush after every single element in the stream.
71+
*
72+
* @since 1.3.0
73+
*/
74+
def gzip(level: Int, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] =
75+
scaladsl.Compression.gzip(level, autoFlush).asJava
76+
77+
/**
78+
* Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor will
79+
* flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]]
80+
* coming out of the flow can be fully decompressed without waiting for additional data. This may come at
81+
* a compression performance cost for very small chunks.
7182
*/
7283
def deflate: Flow[ByteString, ByteString, NotUsed] =
7384
scaladsl.Compression.deflate.asJava
@@ -81,4 +92,16 @@ object Compression {
8192
def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, NotUsed] =
8293
scaladsl.Compression.deflate(level, nowrap).asJava
8394

95+
/**
96+
* Same as [[deflate]] with configurable level, nowrap and autoFlush.
97+
*
98+
* @param level Compression level (0-9)
99+
* @param nowrap if true then use GZIP compatible compression
100+
* @param autoFlush If true will automatically flush after every single element in the stream.
101+
*
102+
* @since 1.3.0
103+
*/
104+
def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] =
105+
scaladsl.Compression.deflate(level, nowrap, autoFlush).asJava
106+
84107
}

stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,10 @@ object Compression {
2424
final val MaxBytesPerChunkDefault = 64 * 1024
2525

2626
/**
27-
* Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor
28-
* will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]]
29-
* coming out of the flow can be fully decompressed without waiting for additional data. This may
30-
* come at a compression performance cost for very small chunks.
31-
*
32-
* FIXME: should strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849
27+
* Creates a flow that gzip-compresses a stream of ByteStrings. Note that the compressor will
28+
* flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]]
29+
* coming out of the flow can be fully decompressed without waiting for additional data. This may come at
30+
* a compression performance cost for very small chunks.
3331
*/
3432
def gzip: Flow[ByteString, ByteString, NotUsed] = gzip(Deflater.BEST_COMPRESSION)
3533

@@ -41,6 +39,17 @@ object Compression {
4139
def gzip(level: Int): Flow[ByteString, ByteString, NotUsed] =
4240
CompressionUtils.compressorFlow(() => new GzipCompressor(level))
4341

42+
/**
43+
* Same as [[gzip]] with a custom level and configurable flush mode.
44+
*
45+
* @param level Compression level (0-9)
46+
* @param autoFlush If true will automatically flush after every single element in the stream.
47+
*
48+
* @since 1.3.0
49+
*/
50+
def gzip(level: Int, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] =
51+
CompressionUtils.compressorFlow(() => new GzipCompressor(level), autoFlush)
52+
4453
/**
4554
* Creates a Flow that decompresses a gzip-compressed stream of data.
4655
*
@@ -51,14 +60,12 @@ object Compression {
5160
Flow[ByteString].via(new GzipDecompressor(maxBytesPerChunk)).named("gzipDecompress")
5261

5362
/**
54-
* Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor
55-
* will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is guaranteed that every [[pekko.util.ByteString]]
56-
* coming out of the flow can be fully decompressed without waiting for additional data. This may
57-
* come at a compression performance cost for very small chunks.
58-
*
59-
* FIXME: should strategy / flush mode be configurable? See https://github.com/akka/akka/issues/21849
63+
* Creates a flow that deflate-compresses a stream of ByteString. Note that the compressor will
64+
* flush after every single element in stream so that it is guaranteed that every [[pekko.util.ByteString]]
65+
* coming out of the flow can be fully decompressed without waiting for additional data. This may come at
66+
* a compression performance cost for very small chunks.
6067
*/
61-
def deflate: Flow[ByteString, ByteString, NotUsed] = deflate(Deflater.BEST_COMPRESSION, false)
68+
def deflate: Flow[ByteString, ByteString, NotUsed] = deflate(Deflater.BEST_COMPRESSION, nowrap = false)
6269

6370
/**
6471
* Same as [[deflate]] with configurable level and nowrap
@@ -69,6 +76,18 @@ object Compression {
6976
def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString, NotUsed] =
7077
CompressionUtils.compressorFlow(() => new DeflateCompressor(level, nowrap))
7178

79+
/**
80+
* Same as [[deflate]] with configurable level, nowrap and autoFlush.
81+
*
82+
* @param level Compression level (0-9)
83+
* @param nowrap if true then use GZIP compatible compression
84+
* @param autoFlush If true will automatically flush after every single element in the stream.
85+
*
86+
* @since 1.3.0
87+
*/
88+
def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean): Flow[ByteString, ByteString, NotUsed] =
89+
CompressionUtils.compressorFlow(() => new DeflateCompressor(level, nowrap), autoFlush)
90+
7291
/**
7392
* Creates a Flow that decompresses a deflate-compressed stream of data.
7493
*

0 commit comments

Comments
 (0)