Skip to content

Commit d0d58dd

Browse files
committed
fix: corrected inverted order of apply of transactions with tree ridbags, as described by PR #10549
1 parent 4bf821f commit d0d58dd

2 files changed

Lines changed: 247 additions & 1 deletion

File tree

distributed/src/main/java/com/orientechnologies/orient/server/distributed/impl/ODatabaseDocumentDistributed.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,13 @@ private void firstPhaseDataChecks(
923923
}
924924
}
925925
int persistentVersion = metadata.getVersion();
926-
if (changeVersion != persistentVersion) {
926+
boolean checkVersion;
927+
if (entry.getType() == ORecordOperation.UPDATED) {
928+
checkVersion = ORecordInternal.isContentChanged(entry.getRecord());
929+
} else {
930+
checkVersion = true;
931+
}
932+
if (changeVersion != persistentVersion && checkVersion) {
927933
throw new OConcurrentModificationException(
928934
entry.getRID(), persistentVersion, changeVersion, entry.getType());
929935
}
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
package com.orientechnologies.orient.server.distributed.impl;
2+
3+
import static junit.framework.TestCase.assertTrue;
4+
import static org.junit.Assert.assertEquals;
5+
6+
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
7+
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
8+
import com.orientechnologies.orient.core.db.ODatabaseSession;
9+
import com.orientechnologies.orient.core.db.OrientDB;
10+
import com.orientechnologies.orient.core.db.OrientDBInternal;
11+
import com.orientechnologies.orient.core.db.record.OIdentifiable;
12+
import com.orientechnologies.orient.core.db.record.ORecordOperation;
13+
import com.orientechnologies.orient.core.db.record.ridbag.ORidBag;
14+
import com.orientechnologies.orient.core.id.ORID;
15+
import com.orientechnologies.orient.core.metadata.schema.OClass;
16+
import com.orientechnologies.orient.core.metadata.schema.OType;
17+
import com.orientechnologies.orient.core.record.impl.ODocument;
18+
import com.orientechnologies.orient.core.transaction.OTransactionIdPromise;
19+
import com.orientechnologies.orient.server.OServer;
20+
import com.orientechnologies.orient.server.distributed.ODistributedDatabase;
21+
import com.orientechnologies.orient.server.distributed.ODistributedRequestId;
22+
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
23+
import com.orientechnologies.orient.server.distributed.impl.task.OTransactionPhase1Task;
24+
import com.orientechnologies.orient.server.distributed.impl.task.OTransactionPhase1TaskResult;
25+
import com.orientechnologies.orient.server.distributed.impl.task.OTransactionPhase2Task;
26+
import com.orientechnologies.orient.server.distributed.impl.task.transaction.OTxSuccess;
27+
import java.io.ByteArrayInputStream;
28+
import java.io.ByteArrayOutputStream;
29+
import java.io.DataInputStream;
30+
import java.io.DataOutputStream;
31+
import java.io.FileInputStream;
32+
import java.io.FileOutputStream;
33+
import java.io.IOException;
34+
import java.util.ArrayList;
35+
import java.util.List;
36+
import java.util.SortedSet;
37+
import java.util.TreeSet;
38+
import org.junit.After;
39+
import org.junit.Before;
40+
import org.junit.Test;
41+
42+
public class OTransactionTreeRidbagsTest {
43+
private ODatabaseSession session;
44+
private ODatabaseSession session1;
45+
private OServer server;
46+
private OIdentifiable id4;
47+
private OIdentifiable ridbagDoc;
48+
private ODatabaseSession session2;
49+
private int pre;
50+
51+
@Before
52+
public void before()
53+
throws ClassNotFoundException, InstantiationException, IOException, IllegalAccessException {
54+
pre = OGlobalConfiguration.RID_BAG_EMBEDDED_TO_SBTREEBONSAI_THRESHOLD.getValueAsInteger();
55+
OGlobalConfiguration.RID_BAG_EMBEDDED_TO_SBTREEBONSAI_THRESHOLD.setValue(-1);
56+
server = new OServer(false);
57+
server.startup(getClass().getClassLoader().getResourceAsStream("orientdb-server-config.xml"));
58+
server.activate();
59+
OrientDB orientDB = server.getContext();
60+
orientDB.execute(
61+
"create database ? plocal users(admin identified by 'adminpwd' role admin)",
62+
OTransactionTreeRidbagsTest.class.getSimpleName());
63+
session = orientDB.open(OTransactionTreeRidbagsTest.class.getSimpleName(), "admin", "adminpwd");
64+
OClass clazz = session.createClass("TestClass");
65+
clazz.createProperty("one", OType.LINKBAG);
66+
OClass clazz1 = session.createClass("ToLink");
67+
OIdentifiable id1 = session.save(new ODocument(clazz1));
68+
OIdentifiable id2 = session.save(new ODocument(clazz1));
69+
OIdentifiable id3 = session.save(new ODocument(clazz1));
70+
id4 = session.save(new ODocument(clazz1));
71+
72+
ODocument doc = new ODocument(clazz);
73+
ORidBag bag = new ORidBag();
74+
bag.add(id1);
75+
bag.add(id2);
76+
bag.add(id3);
77+
78+
doc.setProperty("one", bag);
79+
ridbagDoc = session.save(doc);
80+
81+
session.backup(new FileOutputStream("target/test_sync_backup.zip"), null, null, null, 0, 4096);
82+
83+
OrientDBInternal internalContext = OrientDBInternal.extract(orientDB);
84+
internalContext.restore(
85+
OTransactionTreeRidbagsTest.class.getSimpleName() + "_1",
86+
new FileInputStream("target/test_sync_backup.zip"),
87+
null,
88+
null,
89+
null);
90+
91+
session1 =
92+
orientDB.open(
93+
OTransactionTreeRidbagsTest.class.getSimpleName() + "_1", "admin", "adminpwd");
94+
95+
internalContext.restore(
96+
OTransactionTreeRidbagsTest.class.getSimpleName() + "_2",
97+
new FileInputStream("target/test_sync_backup.zip"),
98+
null,
99+
null,
100+
null);
101+
102+
session2 =
103+
orientDB.open(
104+
OTransactionTreeRidbagsTest.class.getSimpleName() + "_2", "admin", "adminpwd");
105+
}
106+
107+
@After
108+
public void after() {
109+
session.activateOnCurrentThread();
110+
if (session != null) session.close();
111+
server.getContext().drop(OTransactionTreeRidbagsTest.class.getSimpleName());
112+
113+
session1.activateOnCurrentThread();
114+
if (session1 != null) session1.close();
115+
server.getContext().drop(OTransactionTreeRidbagsTest.class.getSimpleName() + "_1");
116+
117+
session2.activateOnCurrentThread();
118+
if (session2 != null) session2.close();
119+
server.getContext().drop(OTransactionTreeRidbagsTest.class.getSimpleName() + "_2");
120+
121+
server.shutdown();
122+
OGlobalConfiguration.RID_BAG_EMBEDDED_TO_SBTREEBONSAI_THRESHOLD.setValue(pre);
123+
}
124+
125+
private void firstPhaseExecution(
126+
OTransactionPhase1Task task, ODistributedRequestId requestId, ODatabaseSession session)
127+
throws Exception {
128+
task = passNetwork(task);
129+
OTransactionPhase1TaskResult res =
130+
(OTransactionPhase1TaskResult)
131+
task.execute(requestId, server, null, (ODatabaseDocumentInternal) session);
132+
133+
assertTrue(res.getResultPayload().toString(), res.getResultPayload() instanceof OTxSuccess);
134+
}
135+
136+
private OTransactionPhase1Task createFirstPhase(ODocument doc, ODatabaseSession session) {
137+
List<ORecordOperation> operations = new ArrayList<>();
138+
operations.add(new ORecordOperation(doc, ORecordOperation.UPDATED));
139+
ODistributedServerManager dm = server.getDistributedManager();
140+
ODistributedDatabase dd = dm.getDatabase(session.getName());
141+
OTransactionIdPromise txId = dd.nextId().get();
142+
dd.rollback(txId);
143+
return new OTransactionPhase1Task(operations, txId, new TreeSet<>());
144+
}
145+
146+
private OTransactionPhase1Task passNetwork(OTransactionPhase1Task task) throws IOException {
147+
ByteArrayOutputStream out = new ByteArrayOutputStream();
148+
DataOutputStream dataOut = new DataOutputStream(out);
149+
task.toStream(dataOut);
150+
OTransactionPhase1Task tx = new OTransactionPhase1Task();
151+
tx.fromStream(new DataInputStream(new ByteArrayInputStream(out.toByteArray())), null);
152+
return tx;
153+
}
154+
155+
@Test
156+
public void testExecution() throws Exception {
157+
158+
session.activateOnCurrentThread();
159+
160+
ODocument doc = session.load(ridbagDoc.getIdentity());
161+
((ORidBag) doc.getProperty("one")).add(id4);
162+
163+
OTransactionPhase1Task task = createFirstPhase(doc, session);
164+
session.getLocalCache().clear();
165+
// Start the first transaction that do not update the version and reach the quorum of two nodes.
166+
ODistributedRequestId requestIdTx = new ODistributedRequestId(10, 20);
167+
firstPhaseExecution(task, requestIdTx, session);
168+
169+
session2.activateOnCurrentThread();
170+
firstPhaseExecution(task, requestIdTx, session2);
171+
172+
SortedSet<ORID> ids = new TreeSet<>();
173+
ids.add(ridbagDoc.getIdentity());
174+
OTransactionPhase2Task secondPhase =
175+
new OTransactionPhase2Task(requestIdTx, true, ids, new TreeSet<>(), task.getPromise());
176+
177+
session.activateOnCurrentThread();
178+
secondPhase.execute(
179+
new ODistributedRequestId(10, 21), server, null, (ODatabaseDocumentInternal) session);
180+
session2.activateOnCurrentThread();
181+
secondPhase.execute(
182+
new ODistributedRequestId(10, 21), server, null, (ODatabaseDocumentInternal) session2);
183+
184+
// Applied the first transaction to two nodes
185+
186+
// Start the second transaction that update the version and is applied to all nodesnodes.
187+
188+
session2.getLocalCache().clear();
189+
ODocument doc1 = session2.load(ridbagDoc.getIdentity());
190+
doc1.setProperty("two", "value");
191+
192+
OTransactionPhase1Task task1 = createFirstPhase(doc1, session2);
193+
194+
ODistributedRequestId requestIdTx1 = new ODistributedRequestId(11, 20);
195+
196+
firstPhaseExecution(task1, requestIdTx1, session2);
197+
198+
session.activateOnCurrentThread();
199+
session.getLocalCache().clear();
200+
firstPhaseExecution(task1, requestIdTx1, session);
201+
202+
session1.activateOnCurrentThread();
203+
session1.getLocalCache().clear();
204+
firstPhaseExecution(task1, requestIdTx1, session1);
205+
206+
OTransactionPhase2Task secondPhase1 =
207+
new OTransactionPhase2Task(requestIdTx1, true, ids, new TreeSet<>(), task1.getPromise());
208+
secondPhase1.execute(
209+
new ODistributedRequestId(11, 21), server, null, (ODatabaseDocumentInternal) session1);
210+
211+
session.activateOnCurrentThread();
212+
secondPhase1.execute(
213+
new ODistributedRequestId(11, 21), server, null, (ODatabaseDocumentInternal) session);
214+
215+
session2.activateOnCurrentThread();
216+
secondPhase1.execute(
217+
new ODistributedRequestId(11, 21), server, null, (ODatabaseDocumentInternal) session2);
218+
219+
// Completed the second transaction to all nodes.
220+
221+
// Execute the two phases of the first transaction on the missing node
222+
session1.activateOnCurrentThread();
223+
224+
firstPhaseExecution(task, requestIdTx, session1);
225+
secondPhase.execute(
226+
new ODistributedRequestId(10, 21), server, null, (ODatabaseDocumentInternal) session1);
227+
228+
assertContent(session);
229+
assertContent(session1);
230+
assertContent(session2);
231+
}
232+
233+
private void assertContent(ODatabaseSession session) {
234+
session.activateOnCurrentThread();
235+
session.getLocalCache().clear();
236+
ODocument docRead = session.load(ridbagDoc.getIdentity());
237+
assertTrue(((ORidBag) docRead.getProperty("one")).contains(id4));
238+
assertEquals((String) docRead.getProperty("two"), "value");
239+
}
240+
}

0 commit comments

Comments
 (0)