Örnek 3 microservici spring boot java 11 ve adaptor ve portla apache kafka ile kodlayıp outbox patterni örnek bir senaryo ile kodlar mısın? Saga ekleyip rollback yapmayı kodlayabilir misin?

Örnek 3 microservici spring boot java 11 ve adaptor ve portla apache kafka ile kodlayıp outbox patterni örnek bir senaryo ile kodlar mısın?





Öncelikle, bu soruda bahsedilen kavramları açıklamakla başlayalım:


Spring Boot: Java tabanlı bir uygulama geliştirme çerçevesidir.

Apache Kafka: Büyük veri işlemleri için dağıtık bir akış işleme platformudur.

Outbox Pattern: Uygulamalar arası etkileşimi sağlamak için kullanılan bir desendir. Outbox patterninde, bir servis bir mesajı (örneğin, bir etkinlik veya komut) göndermek istediğinde, bu mesajı öncelikle kendi veritabanının "outbox" tablosuna yazar. Ardından, bu mesajlar bir arka plan görevi veya başka bir mekanizma tarafından bu outbox'tan okunup istenen mesaj kuyruğuna (örneğin, Kafka topic) gönderilir.

Aşağıda, bu kavramları kullanarak üç örnek microservis ve adaptor ile Kafka'ya entegrasyonunu nasıl gerçekleştirebileceğinizi gösteren temel bir örnek oluşturacağım.


Örnek olarak basit bir kullanıcı kayıt sistemi oluşturalım:


user-service: Kullanıcılarla ilgili operasyonları yapar.

email-service: E-posta bildirimleri gönderir.

audit-service: Kullanıcı işlemlerini kaydeder.

Bu hizmetler arasında etkileşimde bulunmak için Kafka'yı kullanacağız. Özellikle, bir kullanıcı kaydedildiğinde user-service bir mesajı kendi outbox tablosuna yazar ve bu mesaj sonrasında Kafka'ya gönderilir. email-service ve audit-service bu mesajı dinleyip ilgili işlemleri yapar.


Öncelikle Kafka entegrasyonu için gerekli bağımlılıkları ekleyin:

Maven için pom.xml dosyasına:


xml

Copy code

<dependency>

    <groupId>org.springframework.kafka</groupId>

    <artifactId>spring-kafka</artifactId>

</dependency>

Spring Boot uygulamasında Kafka yapılandırmasını oluşturun:

java

Copy code

@Configuration

public class KafkaConfig {


    @Value("${kafka.bootstrap.servers}")

    private String bootstrapServers;


    @Bean

    public ProducerFactory<String, String> producerFactory() {

        Map<String, Object> configProps = new HashMap<>();

        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);

    }


    @Bean

    public KafkaTemplate<String, String> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());

    }

}

Kullanıcı kaydedildiğinde bir mesajın outbox tablosuna eklenmesi:

java

Copy code

@Service

public class UserService {

    // ... diğer bileşenler ve bağımlılıklar ...


    public void registerUser(User user) {

        userRepository.save(user);

        OutboxMessage message = new OutboxMessage("user-registered", user.getId(), user.toString());

        outboxRepository.save(message);

    }

}

Bu aşamadan sonra, Outbox tablosundan mesajlar okunup Kafka'ya gönderilebilir. Ancak bu süreci tam olarak kodlamak için daha detaylı bir yapılandırmaya ihtiyacınız olacak. Bunun için Kafka'nın konusunda uzmanlaşmış kütüphanelerini veya Debezium gibi araçları kullanmayı düşünebilirsiniz.


Outbox'dan mesajların okunup Kafka'ya gönderilmesi:

Bu işlem için bir planlayıcı (scheduler) veya bir olay dinleyici kullanabilirsiniz. Mesajlar outbox'tan okunduktan sonra Kafka'ya gönderilir ve başarılı bir şekilde gönderildiğinde outbox'tan silinir.


Outbox'dan Mesajların Okunması ve Kafka'ya Gönderilmesi

Bu adımda, outbox'tan mesajların periyodik olarak okunup Kafka'ya gönderilmesini sağlayacağız. Bu işlemi gerçekleştirmek için Spring Boot'un @Scheduled anotasyonunu kullanabiliriz.


java

Copy code

@Service

public class OutboxService {


    @Autowired

    private OutboxRepository outboxRepository;


    @Autowired

    private KafkaTemplate<String, String> kafkaTemplate;


    private static final String TOPIC_NAME = "user-operations";


    @Scheduled(fixedRate = 10000) // 10 saniyede bir çalışacak

    public void processOutboxMessages() {

        List<OutboxMessage> messages = outboxRepository.findAll();


        for (OutboxMessage message : messages) {

            kafkaTemplate.send(TOPIC_NAME, message.getData()); // Mesajı Kafka'ya gönder

            outboxRepository.delete(message); // Başarılı bir şekilde gönderildiyse outbox'tan sil

        }

    }

}

Kafka Consumer'ın Oluşturulması

Bu adımda, email-service ve audit-service'te mesajları tüketmek için Kafka consumer'lar oluşturacağız.


email-service için:


java

Copy code

@Service

public class EmailService {


    @Autowired

    private EmailSender emailSender; // Bu servis e-posta göndermek için önceden tanımlanmış olmalı


    @KafkaListener(topics = "user-operations", groupId = "email-service-group")

    public void listenForUserRegistrations(String message) {

        // Basitçe bu örnekte doğrudan mesajı e-posta olarak gönderebiliriz.

        emailSender.send("notification@example.com", "New User Registered", message);

    }

}

audit-service için:


java

Copy code

@Service

public class AuditService {


    @Autowired

    private AuditRepository auditRepository; // Bu repo etkinlikleri kaydetmek için önceden tanımlanmış olmalı


    @KafkaListener(topics = "user-operations", groupId = "audit-service-group")

    public void auditUserOperations(String message) {

        AuditRecord record = new AuditRecord("user-registration", message);

        auditRepository.save(record); // Etkinliği veritabanına kaydedin

    }

}

Bu aşamada, temel entegrasyon işlemlerini tamamlamış olduk. Ancak unutulmamalıdır ki gerçek dünyada bu tür bir entegrasyon çok daha karmaşıktır. Mesajların başarılı bir şekilde gönderilip gönderilmediğini kontrol etmek, hata durumunda yeniden deneme mekanizmaları eklemek, güvenlik ve izleme gibi konularda ek yapılandırmalar ve optimizasyonlar gerekebilir.


email service de veya audit service de user-operations topicinde outbox kaydını okuduktan sonra hata alsaydık ve işlemi gerçekleştiremeseydik. Bu senaryoyu da kodlar mısın?


Tabii, bu durumu ele almak oldukça önemlidir. Eğer email-service veya audit-service Kafka'dan bir mesajı okuduktan sonra hata alırsa ve işlemi tamamlayamazsa, bu mesajı daha sonra tekrar işlemek için bir strateji belirlememiz gerekir. Kafka, bu tür hataları yönetmek için bazı özellikler sunar.


@KafkaListener'ın Yeniden Deneme Mekanizması: Spring Kafka, hatalı mesajları belirli bir süre boyunca yeniden işlemeye çalışmak için bir yeniden deneme mekanizması sunar. ConcurrentKafkaListenerContainerFactory'i özelleştirerek bu mekanizmayı etkinleştirebiliriz.

Öncelikle, yapılandırma sınıfınıza aşağıdaki ayarları ekleyin:


java

Copy code

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.TopicPartition;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.listener.ContainerProperties;

import org.springframework.kafka.listener.SeekToCurrentErrorHandler;

import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;

import org.springframework.util.backoff.FixedBackOff;


@Configuration

public class KafkaConsumerConfig {


    @Autowired

    private ConsumerFactory<String, String> consumerFactory;


    @Bean

    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);

        factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(1000L, 3))); // 3 kez yeniden deneme, 1 saniye arayla

        return factory;

    }

}

Bu yapılandırma ile, bir hata meydana geldiğinde mesajı 3 kez yeniden işlemeye çalışacaktır. Her deneme arasında 1 saniye bekleyecektir.


Ölü Mektup Kuyruğu (Dead Letter Queue) Kullanımı: Yeniden deneme mekanizmasının belirli bir sayıdan sonra başarısız olması durumunda, bu mesajları bir "ölü mektup kuyruğuna" (dead-letter queue, DLQ) yönlendirebiliriz. Bu kuyruğa düşen mesajlar daha sonra analiz edilip manuel müdahaleyle veya otomatik süreçlerle işlenebilir.

Kafka'da bu işlemi gerçekleştirmek için DeadLetterPublishingRecoverer'ı kullanabiliriz.


Öncelikle, ölü mektup kuyruğunu tanımlamalısınız:


java

Copy code

@Bean

public NewTopic deadLetterTopic() {

    return new NewTopic("deadLetterTopic", 1, (short) 1);

}

Ardından, SeekToCurrentErrorHandler'ı bu kuyruğu kullanacak şekilde özelleştirin:


java

Copy code

import org.springframework.kafka.support.serializer.DeserializationException;

import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;

import org.springframework.util.backoff.FixedBackOff;


// ...


@Bean

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory);

    

    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);

    SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(1000L, 3));

    factory.setErrorHandler(errorHandler);


    return factory;

}

Bu yapılandırma ile, mesaj 3 kez işlenemediğinde otomatik olarak deadLetterTopic adında bir ölü mektup kuyruğuna yönlendirilecektir. Bu mesajları daha sonra analiz edip gerekli müdahalelerde bulunabilirsiniz.


emailSender.send() metdunda hata alsaydık?


Eğer emailSender.send() metodunda bir hata alırsak ve bu hatayı yakalamazsak, yukarıda belirttiğimiz şekilde Spring Kafka'nın hata yönetim mekanizması devreye girecektir. Bu durumda, mesajın tekrar işlenmesi için belirlediğimiz yeniden deneme mekanizması çalışacaktır.


Ancak bazı senaryolarda belirli bir hata için yeniden deneme mekanizmasını çalıştırmak istemeyebiliriz. Örneğin, belirli bir e-posta adresi geçersizse, bu mesajı sürekli yeniden işlemek istemeyiz.


Bu tür senaryolarda, özelleştirilmiş hata yönetimi yapmak için emailSender.send() metodunu bir try-catch bloğu içerisine alabiliriz:


java

Copy code

@Service

public class EmailService {


    @Autowired

    private EmailSender emailSender;


    @KafkaListener(topics = "user-operations", groupId = "email-service-group")

    public void listenForUserRegistrations(String message) {

        try {

            emailSender.send("notification@example.com", "New User Registered", message);

        } catch (InvalidEmailAddressException e) { // Bu sadece örnek bir exception

            // Geçersiz e-posta adresi için özel bir işlem yapabiliriz

            // Örneğin, bu tür hataları ayrı bir log dosyasına yazabiliriz.

            log.error("Invalid email address error: " + e.getMessage());

        } catch (EmailSendingFailedException e) { // Bu da örnek bir exception

            // E-postanın gönderilmesi ile ilgili bir hata meydana geldiyse, bu hatayı yukarıya fırlatarak 

            // Spring Kafka'nın hata yönetim mekanizmasının devreye girmesini sağlayabiliriz.

            throw e;

        }

    }

}

Bu yaklaşımla, belirli hatalara özgü özel işlemler yapabilir ve gerekirse Spring Kafka'nın hata yönetim mekanizmasını devreye sokabilirsiniz.


Unutulmamalıdır ki, bu tür bir hata yönetimi mekanizması, iş uygulamanızın gereksinimlerine göre özelleştirilmelidir. Örneğin, bazı hatalar için alarm mekanizmaları tetiklenebilir veya otomatik düzeltme adımları başlatılabilir.


Saga ekleyip rollback yapmayı kodlayabilir misin?


Saga deseni, birçok mikroservis arasında işlemleri koordine etmek için kullanılır. Bir işlemde hata olduğunda, Saga deseni bu işlemi geri almak (rollback) için bir dizi kompansasyon işlemini tetikler. Özellikle, Kafka'nın olay tabanlı doğasını kullanarak Sagaları gerçekleştirebiliriz.


Basit bir senaryo ile devam edelim. Diyelim ki bir kullanıcının kaydedilmesi sürecinde, hem kullanıcının veritabanına kaydedilmesi hem de e-posta gönderilmesi gerekiyor. E-posta gönderiminde bir hata oluştuğunda, veritabanına eklenen kullanıcı kaydını geri almak istiyoruz.


Saga İçin Event Tanımlama

İlk olarak, Saga'da kullanılacak eventleri tanımlayalım:


UserRegistrationEvent: Kullanıcının başarıyla kaydedildiğini belirten bir olay.

UserRegistrationFailedEvent: Kullanıcının kaydedilmesi sırasında bir hata oluştuğunu belirten bir olay.

Saga'nın Gerçekleştirilmesi

E-posta servisi, e-posta gönderme sırasında bir hata olursa UserRegistrationFailedEvent'i yayınlar. Bu eventi userService dinler ve kullanıcının kaydını geri alır (rollback).


java

Copy code

@Service

public class EmailService {


    @Autowired

    private KafkaTemplate<String, String> kafkaTemplate;


    @KafkaListener(topics = "user-registration", groupId = "email-service-group")

    public void listenForUserRegistrations(UserRegistrationEvent event) {

        try {

            emailSender.send(event.getEmail(), "Welcome!", "Thanks for registering!");

        } catch (Exception e) {

            // E-posta gönderimi başarısız olursa, bir hata eventi yayınla

            kafkaTemplate.send("user-registration-failed", new UserRegistrationFailedEvent(event.getUserId()));

        }

    }

}


@Service

public class UserService {


    @Autowired

    private UserRepository userRepository;


    @KafkaListener(topics = "user-registration-failed", groupId = "user-service-group")

    public void handleFailedUserRegistration(UserRegistrationFailedEvent event) {

        // Kullanıcı kaydını geri al (rollback)

        userRepository.deleteById(event.getUserId());

    }

}

Kompansasyon İşlemleri

Yukarıda, EmailService'te bir hata olması durumunda UserService'te bir kompansasyon işlemi (kullanıcı kaydının silinmesi) gerçekleştirdik. Ancak Saga desenini kullanırken daha karmaşık senaryolar ve çok sayıda mikroservis olabilir. Her servisin kendi kompansasyon işlemlerini tanımlaması ve Saga'da bir hata olduğunda bu işlemleri gerçekleştirmesi gerekir.


Bu örnek, Saga deseninin temel bir uygulamasını gösterir. Gerçekte, saga desenini uygularken, daha karmaşık senaryolar, hata yönetimi, işlem süreleri, timeoutlar ve daha fazlasını dikkate almanız gerekir. Saga deseni, CAP teoreminin gerekliliklerinden dolayı, tutarlılık ve kullanılabilirlik arasında bir denge sağlar ve bu, genellikle duruma özgü özelleştirilmiş çözümlerle başarılır.



Hiç yorum yok

Rastgele İçerik

DonanımHaber

© tüm hakları saklıdır
made with by templateszoo