Connected to queue, consume collector events

This commit is contained in:
2026-01-11 13:35:52 +01:00
parent cb2c34cf67
commit cf7545f83e
6 changed files with 123 additions and 4 deletions

View File

@@ -1,9 +1,9 @@
package dev.mednikov.social.collector; package dev.mednikov.social.collector;
import dev.mednikov.social.collector.data.DataVerticle;
import dev.mednikov.social.collector.messaging.MessageVerticle;
import io.vertx.config.ConfigRetriever; import io.vertx.config.ConfigRetriever;
import io.vertx.core.AbstractVerticle; import io.vertx.core.*;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -14,7 +14,14 @@ public final class ApplicationVerticle extends AbstractVerticle {
@Override @Override
public void start(Promise<Void> startPromise) { public void start(Promise<Void> startPromise) {
ConfigRetriever configRetriever = ConfigRetriever.create(vertx); ConfigRetriever configRetriever = ConfigRetriever.create(vertx);
configRetriever.getConfig().onComplete(result -> { configRetriever.getConfig().compose(configurationResult -> {
MessageVerticle messageVerticle = new MessageVerticle();
DataVerticle dataVerticle = new DataVerticle();
return Future.all(
vertx.deployVerticle(messageVerticle, new DeploymentOptions().setConfig(configurationResult)),
vertx.deployVerticle(dataVerticle, new DeploymentOptions().setConfig(configurationResult))
);
}).onComplete(result -> {
if (result.succeeded()) { if (result.succeeded()) {
logger.info("Application verticle was started"); logger.info("Application verticle was started");
startPromise.complete(); startPromise.complete();

View File

@@ -0,0 +1,15 @@
package dev.mednikov.social.collector.data;
import dev.mednikov.social.collector.utils.EventBusConstants;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
public final class DataVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.eventBus().consumer(EventBusConstants.DATABASE_EVENTS_CREATE, new EventCreateDataHandler());
super.start(startPromise);
}
}

View File

@@ -0,0 +1,19 @@
package dev.mednikov.social.collector.data;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
final class EventCreateDataHandler implements Handler<Message<JsonObject>> {
private final static Logger logger = LoggerFactory.getLogger(EventCreateDataHandler.class);
@Override
public void handle(Message<JsonObject> message) {
JsonObject payload = message.body();
logger.info("Event saved in database: {}", payload.encode());
}
}

View File

@@ -0,0 +1,64 @@
package dev.mednikov.social.collector.messaging;
import dev.mednikov.social.collector.utils.EventBusConstants;
import dev.mednikov.social.collector.utils.QueueConstants;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class MessageVerticle extends AbstractVerticle {
private final static Logger logger = LoggerFactory.getLogger(MessageVerticle.class);
private RabbitMQClient rabbitMQClient;
@Override
public void start(Promise<Void> startPromise) {
RabbitMQOptions options = new RabbitMQOptions();
options.setHost(config().getString("APP_RABBITMQ_HOST"));
options.setPort(config().getInteger("APP_RABBITMQ_PORT"));
options.setUser(config().getString("APP_RABBITMQ_USER"));
options.setPassword(config().getString("APP_RABBITMQ_PASSWORD"));
this.rabbitMQClient = RabbitMQClient.create(vertx, options);
this.rabbitMQClient.start().onComplete(result -> {
this.rabbitMQClient.basicConsumer(QueueConstants.EVENT_QUEUE_NAME).onComplete(consumerResult -> {
if (result.succeeded()) {
RabbitMQConsumer consumer = consumerResult.result();
consumer.handler(message -> {
JsonObject payload = message.body().toJsonObject();
logger.info("Event received from RabbitMQ: {}", payload.encode());
vertx.eventBus().send(EventBusConstants.DATABASE_EVENTS_CREATE, payload);
});
} else {
logger.error("Failed to connect to queue", consumerResult.cause());
}
});
if (result.succeeded()) {
logger.info("RabbitMQ started successfully");
startPromise.complete();
} else {
logger.error("RabbitMQ start failed", result.cause());
startPromise.fail(result.cause());
}
});
}
@Override
public void stop(Promise<Void> stopPromise) {
this.rabbitMQClient.stop().onComplete(result -> {
if (result.succeeded()) {
logger.info("RabbitMQ stopped successfully");
stopPromise.complete();
} else {
logger.error("RabbitMQ stop failed", result.cause());
stopPromise.fail(result.cause());
}
});
}
}

View File

@@ -0,0 +1,7 @@
package dev.mednikov.social.collector.utils;
public final class EventBusConstants {
public static final String DATABASE_EVENTS_CREATE = "database_events_create";
}

View File

@@ -0,0 +1,7 @@
package dev.mednikov.social.collector.utils;
public final class QueueConstants {
public static final String EVENT_QUEUE_NAME = "social_events";
}