From cf7545f83e022abf7a3f51dfc84da07a4ebd12b5 Mon Sep 17 00:00:00 2001 From: Iurii Mednikov Date: Sun, 11 Jan 2026 13:35:52 +0100 Subject: [PATCH] Connected to queue, consume collector events --- .../social/collector/ApplicationVerticle.java | 15 +++-- .../social/collector/data/DataVerticle.java | 15 +++++ .../data/EventCreateDataHandler.java | 19 ++++++ .../collector/messaging/MessageVerticle.java | 64 +++++++++++++++++++ .../collector/utils/EventBusConstants.java | 7 ++ .../collector/utils/QueueConstants.java | 7 ++ 6 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 src/main/java/dev/mednikov/social/collector/data/DataVerticle.java create mode 100644 src/main/java/dev/mednikov/social/collector/data/EventCreateDataHandler.java create mode 100644 src/main/java/dev/mednikov/social/collector/messaging/MessageVerticle.java create mode 100644 src/main/java/dev/mednikov/social/collector/utils/EventBusConstants.java create mode 100644 src/main/java/dev/mednikov/social/collector/utils/QueueConstants.java diff --git a/src/main/java/dev/mednikov/social/collector/ApplicationVerticle.java b/src/main/java/dev/mednikov/social/collector/ApplicationVerticle.java index 118fe2b..21a97fe 100644 --- a/src/main/java/dev/mednikov/social/collector/ApplicationVerticle.java +++ b/src/main/java/dev/mednikov/social/collector/ApplicationVerticle.java @@ -1,9 +1,9 @@ package dev.mednikov.social.collector; +import dev.mednikov.social.collector.data.DataVerticle; +import dev.mednikov.social.collector.messaging.MessageVerticle; import io.vertx.config.ConfigRetriever; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Promise; -import io.vertx.core.Vertx; +import io.vertx.core.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,7 +14,14 @@ public final class ApplicationVerticle extends AbstractVerticle { @Override public void start(Promise startPromise) { ConfigRetriever configRetriever = ConfigRetriever.create(vertx); - configRetriever.getConfig().onComplete(result -> { + configRetriever.getConfig().compose(configurationResult -> { + MessageVerticle messageVerticle = new MessageVerticle(); + DataVerticle dataVerticle = new DataVerticle(); + return Future.all( + vertx.deployVerticle(messageVerticle, new DeploymentOptions().setConfig(configurationResult)), + vertx.deployVerticle(dataVerticle, new DeploymentOptions().setConfig(configurationResult)) + ); + }).onComplete(result -> { if (result.succeeded()) { logger.info("Application verticle was started"); startPromise.complete(); diff --git a/src/main/java/dev/mednikov/social/collector/data/DataVerticle.java b/src/main/java/dev/mednikov/social/collector/data/DataVerticle.java new file mode 100644 index 0000000..7e0557f --- /dev/null +++ b/src/main/java/dev/mednikov/social/collector/data/DataVerticle.java @@ -0,0 +1,15 @@ +package dev.mednikov.social.collector.data; + +import dev.mednikov.social.collector.utils.EventBusConstants; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; + +public final class DataVerticle extends AbstractVerticle { + + @Override + public void start(Promise startPromise) throws Exception { + vertx.eventBus().consumer(EventBusConstants.DATABASE_EVENTS_CREATE, new EventCreateDataHandler()); + super.start(startPromise); + } + +} diff --git a/src/main/java/dev/mednikov/social/collector/data/EventCreateDataHandler.java b/src/main/java/dev/mednikov/social/collector/data/EventCreateDataHandler.java new file mode 100644 index 0000000..274753f --- /dev/null +++ b/src/main/java/dev/mednikov/social/collector/data/EventCreateDataHandler.java @@ -0,0 +1,19 @@ +package dev.mednikov.social.collector.data; + +import io.vertx.core.Handler; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class EventCreateDataHandler implements Handler> { + + private final static Logger logger = LoggerFactory.getLogger(EventCreateDataHandler.class); + + @Override + public void handle(Message message) { + JsonObject payload = message.body(); + logger.info("Event saved in database: {}", payload.encode()); + } + +} diff --git a/src/main/java/dev/mednikov/social/collector/messaging/MessageVerticle.java b/src/main/java/dev/mednikov/social/collector/messaging/MessageVerticle.java new file mode 100644 index 0000000..f79aa71 --- /dev/null +++ b/src/main/java/dev/mednikov/social/collector/messaging/MessageVerticle.java @@ -0,0 +1,64 @@ +package dev.mednikov.social.collector.messaging; + +import dev.mednikov.social.collector.utils.EventBusConstants; +import dev.mednikov.social.collector.utils.QueueConstants; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import io.vertx.core.json.JsonObject; +import io.vertx.rabbitmq.RabbitMQClient; +import io.vertx.rabbitmq.RabbitMQConsumer; +import io.vertx.rabbitmq.RabbitMQOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class MessageVerticle extends AbstractVerticle { + + private final static Logger logger = LoggerFactory.getLogger(MessageVerticle.class); + + private RabbitMQClient rabbitMQClient; + + @Override + public void start(Promise startPromise) { + RabbitMQOptions options = new RabbitMQOptions(); + options.setHost(config().getString("APP_RABBITMQ_HOST")); + options.setPort(config().getInteger("APP_RABBITMQ_PORT")); + options.setUser(config().getString("APP_RABBITMQ_USER")); + options.setPassword(config().getString("APP_RABBITMQ_PASSWORD")); + this.rabbitMQClient = RabbitMQClient.create(vertx, options); + this.rabbitMQClient.start().onComplete(result -> { + this.rabbitMQClient.basicConsumer(QueueConstants.EVENT_QUEUE_NAME).onComplete(consumerResult -> { + if (result.succeeded()) { + RabbitMQConsumer consumer = consumerResult.result(); + consumer.handler(message -> { + JsonObject payload = message.body().toJsonObject(); + logger.info("Event received from RabbitMQ: {}", payload.encode()); + vertx.eventBus().send(EventBusConstants.DATABASE_EVENTS_CREATE, payload); + }); + } else { + logger.error("Failed to connect to queue", consumerResult.cause()); + } + }); + if (result.succeeded()) { + logger.info("RabbitMQ started successfully"); + startPromise.complete(); + } else { + logger.error("RabbitMQ start failed", result.cause()); + startPromise.fail(result.cause()); + } + }); + } + + @Override + public void stop(Promise stopPromise) { + this.rabbitMQClient.stop().onComplete(result -> { + if (result.succeeded()) { + logger.info("RabbitMQ stopped successfully"); + stopPromise.complete(); + } else { + logger.error("RabbitMQ stop failed", result.cause()); + stopPromise.fail(result.cause()); + } + }); + } + +} diff --git a/src/main/java/dev/mednikov/social/collector/utils/EventBusConstants.java b/src/main/java/dev/mednikov/social/collector/utils/EventBusConstants.java new file mode 100644 index 0000000..a56799d --- /dev/null +++ b/src/main/java/dev/mednikov/social/collector/utils/EventBusConstants.java @@ -0,0 +1,7 @@ +package dev.mednikov.social.collector.utils; + +public final class EventBusConstants { + + public static final String DATABASE_EVENTS_CREATE = "database_events_create"; + +} diff --git a/src/main/java/dev/mednikov/social/collector/utils/QueueConstants.java b/src/main/java/dev/mednikov/social/collector/utils/QueueConstants.java new file mode 100644 index 0000000..6a0e287 --- /dev/null +++ b/src/main/java/dev/mednikov/social/collector/utils/QueueConstants.java @@ -0,0 +1,7 @@ +package dev.mednikov.social.collector.utils; + +public final class QueueConstants { + + public static final String EVENT_QUEUE_NAME = "social_events"; + +}