77import java .util .HashMap ;
88import java .util .Map ;
99import java .util .concurrent .CountDownLatch ;
10+ import java .util .concurrent .atomic .AtomicBoolean ;
1011
1112public class MessageConsumerTest extends VertxTestBase {
1213
1314
1415 @ Test
15- public void testMessageConsumptionStayOnWorkerThreadAfterResume () throws Exception {
16- TestVerticle verticle = new TestVerticle (2 );
16+ public void testMessageConsumptionStayOnWorkerThreadAfterResumeAndOnlyDispatchOneMessageAtOneMoment () throws Exception {
17+ int numberOfExpectedMessages = 10 ;
18+ TestVerticle verticle = new TestVerticle (numberOfExpectedMessages );
19+ EchoVerticle echoVerticle = new EchoVerticle ();
1720 Future <String > deployVerticle = vertx .deployVerticle (verticle , new DeploymentOptions ().setThreadingModel (ThreadingModel .WORKER ));
21+ Future <String > deployEchoVerticle = vertx .deployVerticle (echoVerticle , new DeploymentOptions ().setThreadingModel (ThreadingModel .WORKER ));
1822
1923 CountDownLatch startLatch = new CountDownLatch (1 );
20- deployVerticle .onComplete (onSuccess (cf -> startLatch .countDown ()));
24+ Future .all (deployVerticle , deployEchoVerticle )
25+ .onComplete (onSuccess (cf -> startLatch .countDown ()));
2126 awaitLatch (startLatch );
2227
23- vertx .eventBus ().send ("testAddress" , "message1" );
24- vertx .eventBus ().send ("testAddress" , "message2" );
28+ for (int i = 1 ; i <= numberOfExpectedMessages ; i ++) {
29+ vertx .eventBus ().send ("testAddress" , "message" + i );
30+ }
2531
2632 awaitLatch (verticle .msgLatch );
2733
28- assertEquals (2 , verticle .messageArrivedOnWorkerThread .size ());
29- assertTrue ("message1 should be processed on worker thread" , verticle .messageArrivedOnWorkerThread .get ("message1" ));
30- assertTrue ("message2 should be processed on worker thread" , verticle .messageArrivedOnWorkerThread .get ("message2" ));
34+ assertEquals (numberOfExpectedMessages , verticle .messageArrivedOnWorkerThread .size ());
35+ for (int i = 1 ; i <= numberOfExpectedMessages ; i ++) {
36+ assertTrue ("message" + i + " should be processed on worker thread" , verticle .messageArrivedOnWorkerThread .get ("message" + i ));
37+ }
3138 }
3239
3340
3441 private static class TestVerticle extends AbstractVerticle {
3542
3643 private final CountDownLatch msgLatch ;
44+ private final AtomicBoolean messageProcessingOngoing = new AtomicBoolean ();
3745
3846 private final Map <String , Boolean > messageArrivedOnWorkerThread = new HashMap <>();
3947
@@ -51,11 +59,34 @@ private void handleMessages(MessageConsumer<String> consumer) {
5159 consumer .handler (msg -> {
5260 consumer .pause ();
5361 messageArrivedOnWorkerThread .putIfAbsent (msg .body (), Context .isOnWorkerThread ());
54- msgLatch .countDown ();
55- vertx .setTimer (20 , id -> {
56- consumer .resume ();
62+ if (messageProcessingOngoing .compareAndSet (false , true )) {
63+ msgLatch .countDown ();
64+ } else {
65+ System .err .println ("Received message while processing another message" );
66+ }
67+ vertx .eventBus ().request ("echoAddress" , 20 )
68+ .onComplete (ar -> {
69+ messageProcessingOngoing .set (false );
70+ consumer .resume ();
71+ });
72+ });
73+ }
74+ }
75+
76+ private static class EchoVerticle extends AbstractVerticle {
77+ @ Override
78+ public void start () {
79+ MessageConsumer <Integer > consumer = vertx .eventBus ().localConsumer ("echoAddress" );
80+ handleMessages (consumer );
81+ }
82+
83+ private void handleMessages (MessageConsumer <Integer > consumer ) {
84+ consumer .handler (msg -> {
85+ vertx .setTimer (msg .body (), id -> {
86+ msg .reply (msg .body ());
5787 });
5888 });
5989 }
6090 }
91+
6192}
0 commit comments