Skip to content

Commit 2c6489a

Browse files
committed
move AliyunOSSTDFSLinke from tis-oss-plugin to tis-aliyun-fs-plugi
1 parent cf5c04c commit 2c6489a

File tree

18 files changed

+405
-36
lines changed

18 files changed

+405
-36
lines changed

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,11 @@
239239
<artifactId>mariadb-java-client</artifactId>
240240
<version>${mariadb-java-client.version}</version>
241241
</dependency>
242+
<!--https://github.com/aliyun/aliyun-oss-java-sdk-->
242243
<dependency>
243244
<groupId>com.aliyun.oss</groupId>
244245
<artifactId>aliyun-sdk-oss</artifactId>
245-
<version>3.12.0</version>
246+
<version>3.18.1</version>
246247
</dependency>
247248

248249
<dependency>

tis-aliyun-fs-plugin/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
<groupId>com.qlangtech.tis.plugins</groupId>
3131
<artifactId>tis-aliyun-fs-plugin</artifactId>
32-
<!-- <packaging>tpi</packaging>-->
32+
<packaging>tpi</packaging>
3333

3434

3535
<name>tis-aliyun-fs-plugin</name>
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.qlangtech.tis.plugin.datax.aliyunoss;
20+
21+
import com.alibaba.citrus.turbine.Context;
22+
import com.aliyun.oss.OSS;
23+
import com.aliyun.oss.OSSClientBuilder;
24+
import com.aliyun.oss.OSSException;
25+
import com.aliyun.oss.model.BucketInfo;
26+
import com.qlangtech.tis.config.ParamsConfig;
27+
import com.qlangtech.tis.config.aliyun.IAliyunAccessKey;
28+
import com.qlangtech.tis.config.aliyun.IAliyunEndpoint;
29+
import com.qlangtech.tis.config.aliyun.IHttpToken;
30+
import com.qlangtech.tis.extension.TISExtension;
31+
import com.qlangtech.tis.plugin.IdentityName;
32+
import com.qlangtech.tis.plugin.annotation.FormField;
33+
import com.qlangtech.tis.plugin.annotation.FormFieldType;
34+
import com.qlangtech.tis.plugin.annotation.Validator;
35+
import com.qlangtech.tis.plugin.tdfs.ITDFSSession;
36+
import com.qlangtech.tis.plugin.tdfs.TDFSLinker;
37+
import com.qlangtech.tis.plugin.tdfs.TDFSSessionVisitor;
38+
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
39+
import com.qlangtech.tis.runtime.module.misc.IFieldErrorHandler;
40+
import org.apache.commons.lang.StringUtils;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
43+
44+
import java.util.List;
45+
46+
/**
47+
* @author: 百岁(baisui@qlangtech.com)
48+
* @create: 2023-08-04 13:30
49+
**/
50+
public class AliyunOSSTDFSLinker extends TDFSLinker {
51+
public static final String DATAX_NAME = "AlyiunOSS";
52+
public static final String FIELD_BUCKET = "bucket";
53+
private static final Logger logger = LoggerFactory.getLogger(AliyunOSSTDFSLinker.class);
54+
@FormField(ordinal = 2, type = FormFieldType.INPUTTEXT, validate = {Validator.require})
55+
public String bucket;
56+
// @FormField(ordinal = 7, type = FormFieldType.INPUTTEXT, validate = {Validator.require})
57+
// public String object;
58+
59+
60+
@Override
61+
public String getRootPath() {
62+
return StringUtils.removeStart(this.path, "/");
63+
}
64+
65+
protected IAliyunEndpoint getOSSConfig() {
66+
return IHttpToken.getAliyunEndpoint(this.linker);
67+
}
68+
69+
public final OSS createOSSClient() {
70+
final IAliyunEndpoint end = getOSSConfig();
71+
IAliyunAccessKey accessKey = end.getAccessKey();
72+
return new OSSClientBuilder().build(end.getEndpoint(), accessKey.getAccessKeyId(), accessKey.getAccessKeySecret());
73+
}
74+
75+
@Override
76+
public ITDFSSession createTdfsSession(Integer timeout) {
77+
return this.createTdfsSession();
78+
}
79+
80+
@Override
81+
public ITDFSSession createTdfsSession() {
82+
return new OSSSession(this);
83+
}
84+
85+
@Override
86+
public <T> T useTdfsSession(TDFSSessionVisitor<T> tdfsSession) {
87+
try {
88+
return tdfsSession.accept(createTdfsSession());
89+
} catch (Exception e) {
90+
throw new RuntimeException(e);
91+
}
92+
}
93+
94+
@TISExtension
95+
public static class DftDescriptor extends BasicDescriptor {
96+
@Override
97+
public String getDisplayName() {
98+
return DATAX_NAME;
99+
}
100+
101+
@Override
102+
protected List<? extends IdentityName> createRefLinkers() {
103+
return ParamsConfig.getItems(IHttpToken.KEY_FIELD_ALIYUN_TOKEN);
104+
}
105+
106+
public boolean validateLinker(IFieldErrorHandler msgHandler, Context context, String fieldName, String endpoint) {
107+
return true;
108+
}
109+
110+
@Override
111+
protected boolean validateAll(IControlMsgHandler msgHandler, Context context, PostFormVals postFormVals) {
112+
AliyunOSSTDFSLinker osstdfsLinker = (AliyunOSSTDFSLinker) postFormVals.newInstance();
113+
return verifyFormOSSRelative(msgHandler, context, osstdfsLinker);
114+
}
115+
116+
private static boolean verifyFormOSSRelative(IControlMsgHandler msgHandler, Context context, AliyunOSSTDFSLinker osstdfsLinker) {
117+
String bucket = osstdfsLinker.bucket;
118+
try {
119+
OSS ossClient = osstdfsLinker.createOSSClient();
120+
121+
BucketInfo bucketInfo = null;
122+
try {
123+
bucketInfo = ossClient.getBucketInfo(bucket);
124+
} catch (OSSException e) {
125+
logger.error("request bucket info:" + bucket, e);
126+
msgHandler.addFieldError(context, FIELD_BUCKET, e.getMessage());
127+
return false;
128+
}
129+
130+
} catch (Exception e) {
131+
throw new RuntimeException(e);
132+
}
133+
return true;
134+
}
135+
}
136+
137+
}
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.qlangtech.tis.plugin.datax.aliyunoss;
20+
21+
import com.aliyun.oss.OSS;
22+
import com.aliyun.oss.model.GenericResult;
23+
import com.aliyun.oss.model.ListObjectsRequest;
24+
import com.aliyun.oss.model.OSSObject;
25+
import com.aliyun.oss.model.OSSObjectSummary;
26+
import com.aliyun.oss.model.ObjectListing;
27+
import com.aliyun.oss.model.PutObjectRequest;
28+
import com.google.common.collect.Lists;
29+
import com.google.common.collect.Sets;
30+
import com.qlangtech.tis.fs.IPath;
31+
import com.qlangtech.tis.plugin.tdfs.ITDFSSession;
32+
import org.apache.commons.collections.CollectionUtils;
33+
import org.apache.commons.lang.StringUtils;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
import java.io.File;
38+
import java.io.FilterOutputStream;
39+
import java.io.IOException;
40+
import java.io.InputStream;
41+
import java.io.OutputStream;
42+
import java.io.PipedInputStream;
43+
import java.io.PipedOutputStream;
44+
import java.util.Collections;
45+
import java.util.HashSet;
46+
import java.util.List;
47+
import java.util.Set;
48+
import java.util.concurrent.CountDownLatch;
49+
import java.util.concurrent.ExecutorService;
50+
import java.util.concurrent.Executors;
51+
import java.util.concurrent.TimeUnit;
52+
53+
/**
54+
* @author: 百岁(baisui@qlangtech.com)
55+
* @create: 2023-08-04 13:43
56+
**/
57+
public class OSSSession implements ITDFSSession {
58+
private static final Logger logger = LoggerFactory.getLogger(OSSSession.class);
59+
private final AliyunOSSTDFSLinker ossLinker;
60+
private OSS oss;
61+
private static final ExecutorService ossPutExecutor = Executors.newCachedThreadPool();
62+
63+
public OSSSession(AliyunOSSTDFSLinker ossLinker) {
64+
this.ossLinker = ossLinker;
65+
}
66+
67+
private OSS getOSS() {
68+
if (oss == null) {
69+
oss = ossLinker.createOSSClient();
70+
}
71+
return oss;
72+
}
73+
74+
@Override
75+
public String getRootPath() {
76+
return this.ossLinker.getRootPath();
77+
}
78+
79+
@Override
80+
public boolean isDirExist(String dir) {
81+
return true;
82+
}
83+
84+
@Override
85+
public void mkDirRecursive(String directoryPath) {
86+
87+
}
88+
89+
@Override
90+
public Set<String> getAllFilesInDir(String path, String prefixFileName) {
91+
ObjectListing objectList = this.getOSS().listObjects(ossLinker.bucket, path + prefixFileName);
92+
List<OSSObjectSummary> objectSummaries = objectList.getObjectSummaries();
93+
Set<String> result = Sets.newHashSet();
94+
for (OSSObjectSummary obj : objectSummaries) {
95+
result.add(obj.getKey());
96+
}
97+
return result;
98+
}
99+
100+
@Override
101+
public void deleteFiles(Set<String> filesToDelete) {
102+
if (CollectionUtils.isEmpty(filesToDelete)) {
103+
throw new IllegalArgumentException("param filesToDelete can not be empty");
104+
}
105+
for (String df : filesToDelete) {
106+
this.getOSS().deleteObject(ossLinker.bucket, df);
107+
}
108+
}
109+
110+
@Override
111+
public HashSet<Res> getAllFiles(List<String> srcPaths, int parentLevel, int maxTraversalLevel) {
112+
HashSet<Res> allFiles = Sets.newHashSet();
113+
for (String path : srcPaths) {
114+
getListFiles(allFiles, path, path, Collections.emptyList(), parentLevel, maxTraversalLevel);
115+
}
116+
return allFiles;
117+
}
118+
119+
@Override
120+
public HashSet<Res> getListFiles(String directoryPath, int parentLevel, int maxTraversalLevel) {
121+
HashSet<Res> allFiles = Sets.newHashSet();
122+
getListFiles(allFiles, directoryPath, directoryPath, Collections.emptyList(), parentLevel, maxTraversalLevel);
123+
return allFiles;
124+
}
125+
126+
private void getListFiles(HashSet<Res> listFiles, final String rootDir, String directoryPath, List<String> relevantPath, final int parentLevel, final int maxTraversalLevel) {
127+
if (StringUtils.startsWith(directoryPath, File.separator)) {
128+
throw new IllegalArgumentException("path:" + directoryPath + " can not start with '" + File.separator + "'");
129+
}
130+
if (parentLevel >= maxTraversalLevel) {
131+
//超出最大递归层数
132+
String message = String.format("获取path:[%s] 下文件列表时超出最大层数,请确认路径[%s]下不存在软连接文件", directoryPath, directoryPath);
133+
throw new IllegalStateException(message);
134+
} else {
135+
136+
ListObjectsRequest listReq = new ListObjectsRequest();
137+
listReq.setDelimiter("/");
138+
listReq.setBucketName(ossLinker.bucket);
139+
// IPath.pathConcat() 处理很重要,因为OSS取子文件一定要以 '/'结尾 ,才能取到子文件列表
140+
if (StringUtils.isNotEmpty(directoryPath)) {
141+
listReq.setPrefix(IPath.pathConcat(directoryPath, StringUtils.EMPTY));
142+
} else {
143+
listReq.setPrefix(StringUtils.EMPTY);
144+
}
145+
146+
147+
ObjectListing children = this.getOSS().listObjects(listReq);
148+
// 文件列表
149+
String fileKey;
150+
for (OSSObjectSummary obj : children.getObjectSummaries()) {
151+
fileKey = obj.getKey();
152+
if (StringUtils.endsWith(fileKey, File.separator)) {
153+
continue;
154+
}
155+
listFiles.add(new Res(fileKey, Res.buildRelevantPath(relevantPath, StringUtils.substringAfter(fileKey, directoryPath))));
156+
}
157+
if (parentLevel + 1 >= maxTraversalLevel) {
158+
return;
159+
}
160+
161+
// 文件夹列表
162+
for (String commonPrefix : children.getCommonPrefixes()) {
163+
getListFiles(listFiles, rootDir, commonPrefix, Lists.newArrayList(StringUtils.split(StringUtils.removeStart(commonPrefix, rootDir), File.separatorChar)), parentLevel + 1, maxTraversalLevel);
164+
}
165+
166+
// getListFiles(listFiles, null, parentLevel + 1, maxTraversalLevel);
167+
}
168+
}
169+
170+
@Override
171+
public OutputStream getOutputStream(String filePath, boolean append) {
172+
PipedOutputStream output = new PipedOutputStream();
173+
174+
CountDownLatch countDown = new CountDownLatch(1);
175+
176+
final CountDownLatch over = new CountDownLatch(1);
177+
178+
ossPutExecutor.execute(() -> {
179+
try {
180+
OSS oss = this.getOSS();
181+
182+
try (PipedInputStream input = new PipedInputStream(output)) {
183+
countDown.countDown();
184+
185+
GenericResult result = null;
186+
if (append) {
187+
throw new UnsupportedOperationException("not support with append mode");
188+
// result = oss.appendObject(new AppendObjectRequest(ossLinker.bucket, filePath, input));
189+
} else {
190+
result = oss.putObject(new PutObjectRequest(ossLinker.bucket, filePath, input));
191+
}
192+
193+
}
194+
over.countDown();
195+
} catch (Exception e) {
196+
logger.error("wirte path faild,path:" + filePath, e);
197+
Thread.currentThread().interrupt();
198+
}
199+
});// execute over
200+
try {
201+
countDown.await(20, TimeUnit.SECONDS);
202+
} catch (InterruptedException e) {
203+
throw new RuntimeException(e);
204+
}
205+
return new FilterOutputStream(output) {
206+
@Override
207+
public void close() throws IOException {
208+
super.close();
209+
try {
210+
over.await(2, TimeUnit.HOURS);
211+
} catch (InterruptedException e) {
212+
throw new IOException(e);
213+
}
214+
}
215+
};
216+
217+
218+
}
219+
220+
@Override
221+
public InputStream getInputStream(String filePath) {
222+
OSSObject ossObject = this.getOSS().getObject(ossLinker.bucket, filePath);
223+
return ossObject.getObjectContent();
224+
}
225+
226+
@Override
227+
public void close() throws Exception {
228+
getOSS().shutdown();
229+
}
230+
}

0 commit comments

Comments
 (0)