88import java .util .concurrent .CompletionStage ;
99import java .util .concurrent .CountDownLatch ;
1010import java .util .concurrent .TimeUnit ;
11+ import java .util .function .BiFunction ;
1112
1213import org .hibernate .SessionFactory ;
1314import org .hibernate .boot .registry .StandardServiceRegistry ;
1819import org .hibernate .reactive .util .impl .CompletionStages ;
1920import org .hibernate .reactive .vertx .VertxInstance ;
2021
21- import org .junit .jupiter .api .AfterAll ;
22- import org .junit .jupiter .api .BeforeAll ;
22+ import org .junit .jupiter .api .AfterEach ;
23+ import org .junit .jupiter .api .BeforeEach ;
2324import org .junit .jupiter .api .Test ;
2425import org .junit .jupiter .api .TestInstance ;
2526import org .junit .jupiter .api .extension .ExtendWith ;
@@ -101,8 +102,8 @@ public class MultithreadedInsertionWithLazyConnectionTest {
101102 private static Vertx vertx ;
102103 private static SessionFactory sessionFactory ;
103104
104- @ BeforeAll
105- public static void setupSessionFactory () {
105+ @ BeforeEach
106+ public void setupSessionFactory () {
106107 vertx = Vertx .vertx ( getVertxOptions () );
107108 Configuration configuration = new Configuration ();
108109 setDefaultProperties ( configuration );
@@ -130,8 +131,8 @@ private static VertxOptions getVertxOptions() {
130131 return vertxOptions ;
131132 }
132133
133- @ AfterAll
134- public static void closeSessionFactory () {
134+ @ AfterEach
135+ public void closeSessionFactory () {
135136 stageSessionFactory .close ();
136137 }
137138
@@ -140,8 +141,32 @@ public void testIdentityGenerator(VertxTestContext context) {
140141 final DeploymentOptions deploymentOptions = new DeploymentOptions ();
141142 deploymentOptions .setInstances ( N_THREADS );
142143
144+ // We are not using transactions on purpose here, because this approach will cause a context switch
145+ // and an assertion error if things aren't handled correctly.
146+ // See Hibernate Reactive issue #2768: https://github.com/hibernate/hibernate-reactive/issues/2768
143147 vertx
144- .deployVerticle ( InsertEntitiesVerticle ::new , deploymentOptions )
148+ .deployVerticle ( () -> new InsertEntitiesVerticle ( (s , entity ) -> s
149+ .persist ( entity )
150+ .thenCompose ( v -> s .flush () )
151+ .thenAccept ( v -> s .clear () ) ), deploymentOptions
152+ )
153+ .onSuccess ( res -> {
154+ endLatch .waitForEveryone ();
155+ context .completeNow ();
156+ } )
157+ .onFailure ( context ::failNow )
158+ .eventually ( () -> vertx .close () );
159+ }
160+
161+ @ Test
162+ public void testIdentityGeneratorWithTransaction (VertxTestContext context ) {
163+ final DeploymentOptions deploymentOptions = new DeploymentOptions ();
164+ deploymentOptions .setInstances ( N_THREADS );
165+ vertx
166+ .deployVerticle (
167+ () -> new InsertEntitiesVerticle ( (s , entity ) -> s
168+ .withTransaction ( t -> s .persist ( entity ) ) ), deploymentOptions
169+ )
145170 .onSuccess ( res -> {
146171 endLatch .waitForEveryone ();
147172 context .completeNow ();
@@ -152,9 +177,12 @@ public void testIdentityGenerator(VertxTestContext context) {
152177
153178 private static class InsertEntitiesVerticle extends AbstractVerticle {
154179
180+ final BiFunction <Stage .Session , EntityWithGeneratedId , CompletionStage <Void >> insertFun ;
181+
155182 int sequentialOperation = 0 ;
156183
157- public InsertEntitiesVerticle () {
184+ public InsertEntitiesVerticle (BiFunction <Stage .Session , EntityWithGeneratedId , CompletionStage <Void >> insertFun ) {
185+ this .insertFun = insertFun ;
158186 }
159187
160188 @ Override
@@ -196,9 +224,8 @@ private CompletionStage<Void> storeEntity(Stage.Session s) {
196224 final int localVerticleOperationSequence = sequentialOperation ++;
197225 final EntityWithGeneratedId entity = new EntityWithGeneratedId ();
198226 entity .name = beforeOperationThread + "__" + localVerticleOperationSequence ;
199-
200- return s
201- .withTransaction ( t -> s .persist ( entity ) )
227+ return insertFun
228+ .apply ( s , entity )
202229 .thenCompose ( v -> beforeOperationThread != Thread .currentThread ()
203230 ? failedFuture ( new IllegalStateException ( "Detected an unexpected switch of carrier threads!" ) )
204231 : voidFuture () );
0 commit comments