Skip to content

Commit f79f1b9

Browse files
authored
Merge pull request #1543 from prince-cs/tink-issue
[PLUGIN-1717]: Added implementation for openFile and openFileWithOptions
2 parents 6cdd47c + 4127a9d commit f79f1b9

File tree

2 files changed

+160
-77
lines changed

2 files changed

+160
-77
lines changed

pom.xml

Lines changed: 77 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -379,83 +379,6 @@
379379
</exclusion>
380380
</exclusions>
381381
</dependency>
382-
<dependency>
383-
<!--
384-
Required by bigtable-hbase-1.x-mapreduce instead of excluded non-shaded version.
385-
Shaded library is used to avoid dependency conflicts with Datastore module on profobuf-java dependency.
386-
Bigtable requires version 2.x and Datastore module requires 3.x protocol.
387-
-->
388-
<groupId>org.apache.hbase</groupId>
389-
<artifactId>hbase-shaded-client</artifactId>
390-
<version>${hbase-shaded-client.version}</version>
391-
<exclusions>
392-
<exclusion>
393-
<groupId>org.slf4j</groupId>
394-
<artifactId>slf4j-log4j12</artifactId>
395-
</exclusion>
396-
<exclusion>
397-
<groupId>log4j</groupId>
398-
<artifactId>log4j</artifactId>
399-
</exclusion>
400-
<exclusion>
401-
<groupId>org.apache.htrace</groupId>
402-
<artifactId>htrace-core</artifactId>
403-
</exclusion>
404-
</exclusions>
405-
</dependency>
406-
<dependency>
407-
<!--
408-
Required by bigtable-hbase-1.x-mapreduce instead of excluded non-shaded version.
409-
Shaded library is used to avoid dependency conflicts with Datastore module on profobuf-java dependency.
410-
Bigtable requires version 2.x and Datastore module requires 3.x protocol.
411-
-->
412-
<groupId>org.apache.hbase</groupId>
413-
<artifactId>hbase-shaded-server</artifactId>
414-
<version>${hbase-shaded-server.version}</version>
415-
<exclusions>
416-
<exclusion>
417-
<groupId>org.slf4j</groupId>
418-
<artifactId>slf4j-log4j12</artifactId>
419-
</exclusion>
420-
<exclusion>
421-
<groupId>log4j</groupId>
422-
<artifactId>log4j</artifactId>
423-
</exclusion>
424-
<exclusion>
425-
<groupId>org.apache.htrace</groupId>
426-
<artifactId>htrace-core</artifactId>
427-
</exclusion>
428-
<exclusion>
429-
<groupId>org.apache.hadoop</groupId>
430-
<artifactId>hadoop-common</artifactId>
431-
</exclusion>
432-
</exclusions>
433-
</dependency>
434-
<dependency>
435-
<groupId>io.dropwizard.metrics</groupId>
436-
<artifactId>metrics-core</artifactId>
437-
<version>${dropwizard.metrics-core.version}</version>
438-
</dependency>
439-
<dependency>
440-
<groupId>com.google.cloud</groupId>
441-
<artifactId>google-cloud-bigquery</artifactId>
442-
<version>${google.cloud.bigquery.version}</version>
443-
</dependency>
444-
<dependency>
445-
<groupId>com.google.crypto.tink</groupId>
446-
<artifactId>tink</artifactId>
447-
<version>${google.tink.version}</version>
448-
</dependency>
449-
<dependency>
450-
<groupId>com.google.crypto.tink</groupId>
451-
<artifactId>tink-gcpkms</artifactId>
452-
<version>${google.tink.version}</version>
453-
</dependency>
454-
<dependency>
455-
<groupId>com.google.cloud</groupId>
456-
<artifactId>google-cloud-spanner</artifactId>
457-
<version>${google.cloud.spanner.version}</version>
458-
</dependency>
459382
<dependency>
460383
<groupId>org.apache.hadoop</groupId>
461384
<artifactId>hadoop-common</artifactId>
@@ -536,6 +459,83 @@
536459
</exclusion>
537460
</exclusions>
538461
</dependency>
462+
<dependency>
463+
<!--
464+
Required by bigtable-hbase-1.x-mapreduce instead of excluded non-shaded version.
465+
Shaded library is used to avoid dependency conflicts with Datastore module on profobuf-java dependency.
466+
Bigtable requires version 2.x and Datastore module requires 3.x protocol.
467+
-->
468+
<groupId>org.apache.hbase</groupId>
469+
<artifactId>hbase-shaded-client</artifactId>
470+
<version>${hbase-shaded-client.version}</version>
471+
<exclusions>
472+
<exclusion>
473+
<groupId>org.slf4j</groupId>
474+
<artifactId>slf4j-log4j12</artifactId>
475+
</exclusion>
476+
<exclusion>
477+
<groupId>log4j</groupId>
478+
<artifactId>log4j</artifactId>
479+
</exclusion>
480+
<exclusion>
481+
<groupId>org.apache.htrace</groupId>
482+
<artifactId>htrace-core</artifactId>
483+
</exclusion>
484+
</exclusions>
485+
</dependency>
486+
<dependency>
487+
<!--
488+
Required by bigtable-hbase-1.x-mapreduce instead of excluded non-shaded version.
489+
Shaded library is used to avoid dependency conflicts with Datastore module on profobuf-java dependency.
490+
Bigtable requires version 2.x and Datastore module requires 3.x protocol.
491+
-->
492+
<groupId>org.apache.hbase</groupId>
493+
<artifactId>hbase-shaded-server</artifactId>
494+
<version>${hbase-shaded-server.version}</version>
495+
<exclusions>
496+
<exclusion>
497+
<groupId>org.slf4j</groupId>
498+
<artifactId>slf4j-log4j12</artifactId>
499+
</exclusion>
500+
<exclusion>
501+
<groupId>log4j</groupId>
502+
<artifactId>log4j</artifactId>
503+
</exclusion>
504+
<exclusion>
505+
<groupId>org.apache.htrace</groupId>
506+
<artifactId>htrace-core</artifactId>
507+
</exclusion>
508+
<exclusion>
509+
<groupId>org.apache.hadoop</groupId>
510+
<artifactId>hadoop-common</artifactId>
511+
</exclusion>
512+
</exclusions>
513+
</dependency>
514+
<dependency>
515+
<groupId>io.dropwizard.metrics</groupId>
516+
<artifactId>metrics-core</artifactId>
517+
<version>${dropwizard.metrics-core.version}</version>
518+
</dependency>
519+
<dependency>
520+
<groupId>com.google.cloud</groupId>
521+
<artifactId>google-cloud-bigquery</artifactId>
522+
<version>${google.cloud.bigquery.version}</version>
523+
</dependency>
524+
<dependency>
525+
<groupId>com.google.crypto.tink</groupId>
526+
<artifactId>tink</artifactId>
527+
<version>${google.tink.version}</version>
528+
</dependency>
529+
<dependency>
530+
<groupId>com.google.crypto.tink</groupId>
531+
<artifactId>tink-gcpkms</artifactId>
532+
<version>${google.tink.version}</version>
533+
</dependency>
534+
<dependency>
535+
<groupId>com.google.cloud</groupId>
536+
<artifactId>google-cloud-spanner</artifactId>
537+
<version>${google.cloud.spanner.version}</version>
538+
</dependency>
539539
<dependency>
540540
<groupId>com.google.cloud</groupId>
541541
<artifactId>google-cloud-datastore</artifactId>

src/main/java/io/cdap/plugin/gcp/crypto/EncryptedFileSystem.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818

1919
import org.apache.hadoop.conf.Configurable;
2020
import org.apache.hadoop.conf.Configuration;
21+
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
2122
import org.apache.hadoop.fs.FSDataInputStream;
2223
import org.apache.hadoop.fs.FSInputStream;
2324
import org.apache.hadoop.fs.FileSystem;
2425
import org.apache.hadoop.fs.FilterFileSystem;
26+
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
2527
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.fs.impl.OpenFileParameters;
29+
import org.jetbrains.annotations.NotNull;
2630
import org.slf4j.Logger;
2731
import org.slf4j.LoggerFactory;
2832

@@ -32,6 +36,8 @@
3236
import java.nio.channels.Channels;
3337
import java.nio.channels.SeekableByteChannel;
3438
import java.util.Map;
39+
import java.util.concurrent.CompletableFuture;
40+
import java.util.concurrent.CompletionException;
3541

3642
/**
3743
* A hadoop {@link FileSystem} that support files decryption (encryption is currently not supported).
@@ -103,6 +109,83 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
103109
return new FSDataInputStream(new SeekableByteChannelFSInputStream(decryptor.open(fs, path, bufferSize)));
104110
}
105111

112+
/**
113+
* Opens a file asynchronously and returns a {@link FutureDataInputStreamBuilder}
114+
* to build a {@link FSDataInputStream} for the specified {@link Path}.
115+
*
116+
* <p>This implementation returns a builder that constructs an input stream by using a decryptor
117+
* to open the file through a {@link SeekableByteChannelFSInputStream}. The file is read
118+
* with a buffer size of 4096 bytes.</p>
119+
*
120+
* @param path the {@link Path} of the file to open
121+
* @return a {@link FutureDataInputStreamBuilder} that asynchronously builds a {@link FSDataInputStream}
122+
* @throws UnsupportedOperationException if the operation is not supported
123+
*/
124+
@Override
125+
public FutureDataInputStreamBuilder openFile(Path path) throws UnsupportedOperationException {
126+
return new FutureDataInputStreamBuilder() {
127+
@Override
128+
public CompletableFuture<FSDataInputStream> build()
129+
throws IllegalArgumentException, UnsupportedOperationException {
130+
return CompletableFuture.supplyAsync(() -> {
131+
try {
132+
return new FSDataInputStream(new SeekableByteChannelFSInputStream(
133+
decryptor.open(fs, path, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT)));
134+
} catch (Exception e) {
135+
throw new CompletionException(e);
136+
}
137+
});
138+
}
139+
140+
@Override
141+
public FutureDataInputStreamBuilder opt(@NotNull String s, @NotNull String s1) {
142+
return this;
143+
}
144+
145+
@Override
146+
public FutureDataInputStreamBuilder opt(@NotNull String s, @NotNull String... strings) {
147+
return this;
148+
}
149+
150+
@Override
151+
public FutureDataInputStreamBuilder must(@NotNull String s, @NotNull String s1) {
152+
return this;
153+
}
154+
155+
@Override
156+
public FutureDataInputStreamBuilder must(@NotNull String s, @NotNull String... strings) {
157+
return this;
158+
}
159+
};
160+
}
161+
162+
/**
163+
* Opens a file asynchronously using the provided {@link Path}, and returns
164+
* a {@link CompletableFuture} that supplies a {@link FSDataInputStream}.
165+
*
166+
* <p>This method uses a decryptor to open the file and wraps it in a {@link SeekableByteChannelFSInputStream}.
167+
* It uses the buffer size specified in the {@code parameters}; if the buffer size is not greater than zero,
168+
* a default of 4096 bytes is used.</p>
169+
*
170+
* @param path the {@link Path} to the file to open
171+
* @param parameters the {@link OpenFileParameters} containing optional configuration, such as buffer size
172+
* @return a {@link CompletableFuture} that will complete with the {@link FSDataInputStream}
173+
* @throws CompletionException if an exception occurs during file opening
174+
*/
175+
@Override
176+
protected CompletableFuture<FSDataInputStream> openFileWithOptions(Path path, OpenFileParameters parameters) {
177+
return CompletableFuture.supplyAsync(() -> {
178+
try {
179+
int bufferSize = parameters.getBufferSize() > 0 ? parameters.getBufferSize()
180+
: CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
181+
return new FSDataInputStream(
182+
new SeekableByteChannelFSInputStream(decryptor.open(fs, path, bufferSize)));
183+
} catch (Exception e) {
184+
throw new CompletionException(e);
185+
}
186+
});
187+
}
188+
106189
/**
107190
* A {@link FSInputStream} implementation backed by a {@link SeekableByteChannel}.
108191
*/

0 commit comments

Comments
 (0)