Гарантированная отправка событий в Kafka
В примере ниже, мы создаем объект KafkaProducer с заданными настройками, создаем ProducerRecord с указанием топика и сообщения для отправки, а затем вызываем метод send для отправки записи. Метод send возвращает Future<RecordMetadata>, и мы можем использовать метод get() для синхронного ожидания подтверждения отправки сообщения. В ином случае, без использования метода get(), программа пойдет дальше и мы не сможем отловить ошибку. Затем мы получаем метаданные записи, такие как partition и offset, и выводим их в консоль.
Пример
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaProducerExample {public static void main(String[] args) {
// Конфигурация Kafka Producer
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// Создание Kafka Producer
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);// Тема и сообщение для отправки
String topic = "my-topic";
String message = "Hello, Kafka!";// Создание записи
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);try {
// Отправка сообщения с синхронным ожиданием подтверждения
RecordMetadata metadata = producer.send(record).get();
System.out.println("Message sent to partition " + metadata.partition()
+ " with offset " + metadata.offset());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
// Закрытие Kafka Producer
producer.close();
}
}
}
Пример использования гарантированной отправки событий в kafka вместе с транзакцией
public void sendCityCount() {
Session session = sessionFactory.openSession();
Transaction transaction = session.getTransaction();
try {
transaction.begin();
Long count = session.createQuery("select count(*) from City", Long.class).uniqueResult();
Boolean isSuccess = send("count-topic", Long.toString(count))
if (!isSuccess) {
throw new CustomKafkaException("Error send event to count-topic")
}
transaction.commit();
}
catch (Exception e) {
transaction.rollback();
}
finally {
session.close();
sessionFactory.close();
}
}
private boolean send(String topic, String message) {
if (message == null || kafkaTemplate == null) {
throw new IllegalArgumentException("Message or kafka template is null");
}
try {
return Objects.requireNonNull(kafkaTemplate.execute(context -> kafkaTemplate.send(topic, message)))
.completable()
.thenApply(v -> true)
.exceptionally(e -> {
log.error(e.getMessage(), e);
return false;
})
.join();
} catch (Exception e) {
log.error(e.getMessage(), e);
return false
}
}
