Skip to content

Commit 0bb5055

Browse files
mattrpavjeanouii
authored andcommitted
[AMQ-9366] Add DataFileFactroy to KahaDBPersistenceAdapter and update test to remove SecurityManager
Co-authored-by: Gurpartap Singh <gurpartap0306>
1 parent 1d0e315 commit 0bb5055

File tree

7 files changed

+134
-17
lines changed

7 files changed

+134
-17
lines changed

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
4545
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
4646
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
47+
import org.apache.activemq.store.kahadb.disk.journal.DataFileFactory;
4748
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
4849
import org.apache.activemq.usage.SystemUsage;
4950
import org.apache.activemq.util.ServiceStopper;
@@ -840,4 +841,8 @@ public void setCleanupOnStop(boolean cleanupOnStop) {
840841
public boolean getCleanupOnStop() {
841842
return this.letter.getCleanupOnStop();
842843
}
844+
845+
public void setDataFileFactory(DataFileFactory dataFileFactory) {
846+
this.letter.setDataFileFactory(dataFileFactory);
847+
}
843848
}

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,8 @@
9292
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
9393
import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
9494
import org.apache.activemq.store.kahadb.disk.index.ListIndex;
95-
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
96-
import org.apache.activemq.store.kahadb.disk.journal.Journal;
95+
import org.apache.activemq.store.kahadb.disk.journal.*;
9796
import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
98-
import org.apache.activemq.store.kahadb.disk.journal.Location;
99-
import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
10097
import org.apache.activemq.store.kahadb.disk.page.Page;
10198
import org.apache.activemq.store.kahadb.disk.page.PageFile;
10299
import org.apache.activemq.store.kahadb.disk.page.Transaction;
@@ -266,6 +263,7 @@ public enum PurgeRecoveredXATransactionStrategy {
266263

267264
protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
268265
protected boolean archiveDataLogs;
266+
protected DataFileFactory dataFileFactory;
269267
protected File directoryArchive;
270268
protected AtomicLong journalSize = new AtomicLong(0);
271269
long journalDiskSyncInterval = 1000;
@@ -3421,6 +3419,9 @@ protected Journal createJournal() throws IOException {
34213419
IOHelper.mkdirs(getDirectoryArchive());
34223420
manager.setDirectoryArchive(getDirectoryArchive());
34233421
}
3422+
if (getDataFileFactory() != null) {
3423+
manager.setDataFileFactory(getDataFileFactory());
3424+
}
34243425
return manager;
34253426
}
34263427

@@ -4297,4 +4298,12 @@ private void handleIOException(String taskName, IOException ioe) {
42974298
LOG.debug(e.getMessage(), e);
42984299
}
42994300
}
4301+
4302+
public DataFileFactory getDataFileFactory() {
4303+
return this.dataFileFactory;
4304+
}
4305+
4306+
public void setDataFileFactory(DataFileFactory dataFileFactory) {
4307+
this.dataFileFactory = dataFileFactory;
4308+
}
43004309
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.store.kahadb.disk.journal;
18+
19+
import java.io.File;
20+
21+
public interface DataFileFactory {
22+
DataFile create(File file, int number);
23+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.store.kahadb.disk.journal;
18+
19+
import java.io.File;
20+
21+
public class DefaultDataFileFactory implements DataFileFactory {
22+
23+
@Override
24+
public DataFile create(File file, int number) {
25+
return new DataFile(file, number);
26+
}
27+
}

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ private static byte[] createEofBatchAndLocationRecord() {
241241
private long cleanupInterval = DEFAULT_CLEANUP_INTERVAL;
242242

243243
protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
244+
protected DataFileFactory dataFileFactory = new DefaultDataFileFactory();
244245

245246
public interface DataFileRemovedListener {
246247
void fileRemoved(DataFile datafile);
@@ -272,7 +273,7 @@ public boolean accept(File dir, String n) {
272273
String n = file.getName();
273274
String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
274275
int num = Integer.parseInt(numStr);
275-
DataFile dataFile = new DataFile(file, num);
276+
DataFile dataFile = dataFileFactory.create(file, num);
276277
fileMap.put(dataFile.getDataFileId(), dataFile);
277278
totalLength.addAndGet(dataFile.getLength());
278279
} catch (NumberFormatException e) {
@@ -687,7 +688,7 @@ public void run() {
687688
private DataFile newDataFile() throws IOException {
688689
int nextNum = nextDataFileId++;
689690
File file = getFile(nextNum);
690-
DataFile nextWriteFile = new DataFile(file, nextNum);
691+
DataFile nextWriteFile = dataFileFactory.create(file, nextNum);
691692
preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile());
692693
return nextWriteFile;
693694
}
@@ -697,7 +698,7 @@ public DataFile reserveDataFile() {
697698
synchronized (dataFileIdLock) {
698699
int nextNum = nextDataFileId++;
699700
File file = getFile(nextNum);
700-
DataFile reservedDataFile = new DataFile(file, nextNum);
701+
DataFile reservedDataFile = dataFileFactory.create(file, nextNum);
701702
synchronized (currentDataFile) {
702703
fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
703704
fileByFileMap.put(file, reservedDataFile);
@@ -1164,6 +1165,14 @@ public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedLi
11641165
this.dataFileRemovedListener = dataFileRemovedListener;
11651166
}
11661167

1168+
public void setDataFileFactory(DataFileFactory dataFileFactory) {
1169+
this.dataFileFactory = dataFileFactory;
1170+
}
1171+
1172+
public DataFileFactory getDataFileFactory() {
1173+
return this.dataFileFactory;
1174+
}
1175+
11671176
public static class WriteCommand extends LinkedNode<WriteCommand> {
11681177
public final Location location;
11691178
public final ByteSequence data;

activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import org.apache.activemq.broker.region.policy.PolicyMap;
2323
import org.apache.activemq.command.ActiveMQQueue;
2424
import org.apache.activemq.store.kahadb.disk.journal.DataFile;
25+
import org.apache.activemq.store.kahadb.disk.journal.DataFileFactory;
26+
import org.apache.activemq.store.kahadb.disk.journal.DefaultDataFileFactory;
27+
import org.apache.activemq.store.kahadb.disk.journal.ErrorDataFileFactory;
2528
import org.junit.After;
2629
import org.junit.Test;
2730
import org.slf4j.Logger;
@@ -54,6 +57,7 @@ public class JournalArchiveTest {
5457
private BrokerService broker = null;
5558
private final Destination destination = new ActiveMQQueue("Test");
5659
private KahaDBPersistenceAdapter adapter;
60+
private DataFileFactory dataFileFactory;
5761

5862
protected void startBroker() throws Exception {
5963
doStartBroker(true);
@@ -104,6 +108,7 @@ protected void configurePersistence(BrokerService brokerService) throws Exceptio
104108
adapter.setCheckForCorruptJournalFiles(true);
105109

106110
adapter.setArchiveDataLogs(true);
111+
adapter.setDataFileFactory(dataFileFactory);
107112
}
108113

109114
@After
@@ -119,16 +124,8 @@ public void tearDown() throws Exception {
119124
public void testRecoveryOnArchiveFailure() throws Exception {
120125
final AtomicInteger atomicInteger = new AtomicInteger();
121126

122-
System.setSecurityManager(new SecurityManager() {
123-
public void checkPermission(Permission perm) {}
124-
public void checkPermission(Permission perm, Object context) {}
127+
this.dataFileFactory = new ErrorDataFileFactory();
125128

126-
public void checkWrite(String file) {
127-
if (file.contains(DEFAULT_ARCHIVE_DIRECTORY) && atomicInteger.incrementAndGet() > 4) {
128-
throw new SecurityException("No Perms to write to archive times:" + atomicInteger.get());
129-
}
130-
}
131-
});
132129
startBroker();
133130

134131
int sent = produceMessagesToConsumeMultipleDataFiles(50);
@@ -151,7 +148,7 @@ public void run() {
151148
assertTrue("broker got shutdown on page in error", gotShutdown.await(10, TimeUnit.SECONDS));
152149

153150
// no restrictions
154-
System.setSecurityManager(null);
151+
this.dataFileFactory = new DefaultDataFileFactory();
155152

156153
int numFilesAfterRestart = 0;
157154
try {
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.activemq.store.kahadb.disk.journal;
18+
19+
import org.apache.activemq.util.IOHelper;
20+
21+
import java.io.File;
22+
import java.io.IOException;
23+
24+
import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_ARCHIVE_DIRECTORY;
25+
26+
public class ErrorDataFileFactory implements DataFileFactory {
27+
28+
@Override
29+
public DataFile create(File file, int number) {
30+
return new ErrorDataFile(file, number);
31+
}
32+
33+
public static class ErrorDataFile extends DataFile {
34+
35+
ErrorDataFile(File file, int number) {
36+
super(file, number);
37+
}
38+
39+
@Override
40+
public synchronized void move(File targetDirectory) throws IOException {
41+
if (targetDirectory.getName().contains(DEFAULT_ARCHIVE_DIRECTORY) && this.dataFileId > 4) {
42+
throw new SecurityException("No Perms to write to archive times:" + this.dataFileId);
43+
}
44+
IOHelper.moveFile(file, targetDirectory);
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)