Event Sourcing
Czym jest event sourcing
W tradycyjnych aplikacjach CRUDowych istotny jest aktualny stan obiektu. Na pierwszy rzut oka może wydawać się to zasadne. Jednak w rzeczywistości bardzo duże znaczenie ma to jak dany stan zmieniał się w czasie. Event sourcing jest sposobem tworzenia systemów, które są oparte na przetwarzaniu zdarzeń. Występuje często razem z CQRS. Nic nie stoi na przeszkodzie aby stosować go wspólnie z architekturą portów i adapterów.
Kod
Kod może powiedzieć więcej niż tysiące słów, więc na początek kawałek kodu.
@Getter @RequiredArgsConstructor(access = AccessLevel.PRIVATE) public class Account { private List<DomainEvent> pendingEvents = new ArrayList<>(); private final String aggregateId; private String ownerName; private String name; private BigDecimal amount; public static Account from(String aggregateId) { return Account.from(aggregateId, new ArrayList<>()); } public static Account from(String aggregateId, List<DomainEvent> events) { Account note = new Account(aggregateId); events.forEach(note::apply); return note; } public void create(String ownerName, String name) { add(new AccountCreatedEvent(aggregateId, LocalDateTime.now(), ownerName, name)); } private void add(DomainEvent accountCreatedEvent) { pendingEvents.add(accountCreatedEvent); apply(accountCreatedEvent); } private void apply(DomainEvent event) { if(event instanceof AccountCreatedEvent) { apply((AccountCreatedEvent)event); } else if (event instanceof DepositCreatedEvent) { apply((DepositCreatedEvent)event); } else if(event instanceof WithdrawCreatedEvent) { apply((WithdrawCreatedEvent)event); } } private void apply(AccountCreatedEvent accountCreatedEvent) { this.ownerName = accountCreatedEvent.getOwnerName(); this.name = accountCreatedEvent.getAccountName(); this.amount = BigDecimal.ZERO; } public void deposit(BigDecimal amount) { add(new DepositCreatedEvent(aggregateId, LocalDateTime.now(), amount)); } private void apply(DepositCreatedEvent depositCreatedEvent) { this.amount = amount.add(depositCreatedEvent.getAmount()); } public void withdraw(BigDecimal amount) { if(this.amount.compareTo(amount)<0) { throw new InsufficientAmountException("Insufficient amount on account"); } add(new WithdrawCreatedEvent(aggregateId, LocalDateTime.now(), amount)); } private void apply(WithdrawCreatedEvent withdrawCreatedEvent) { amount = amount.subtract(withdrawCreatedEvent.getAmount()); } public void clearEvents(){ pendingEvents.clear(); } }
W powyższym kodzie jest agregat który reprezentuje konto bankowe. Nie ma on setterów ale posiada metody, które mogą zmienić jego stan. Takie metody mogą istnieć w Twoim systemie. Dzięki utworzeniu odpowiednich zdarzeń w ich ciele można wprowadzić event sourcing do agregatu. Istotne jest to, że agregat posiada kolekcję oczekujących eventów i każdy z nich może zostać zaaplikowany. Tak samo agregat można odtworzyć na podstawie kolekcji eventów.
Poniżej serwis obsługujący dany agregat
@RequiredArgsConstructor public class AggregateService { private final DomainEventStoreRepository repository; private final ApplicationEventPublisher applicationEventPublisher; public void store(Account aggregate) { List<DomainEvent> pendingEvents = aggregate.getPendingEvents(); pendingEvents.forEach(this::saveAndPublish); aggregate.clearEvents(); } public Account load(String aggregateId) { List<DomainEvent> events = repository.findByAggregateIdOrderByCreatedAt(aggregateId); if(events.isEmpty()) { throw new AccountNotFoundException(String.format("Account %s not found.", aggregateId)); } return Account.from(aggregateId,events); } private void saveAndPublish(DomainEvent domainEvent) { repository.save(domainEvent); applicationEventPublisher.publishEvent(domainEvent); } }
A poniżej kilka use caseów dla przykładu konta bankowego.
@RequiredArgsConstructor public class AccountService { private final AggregateService aggregateService; private final UIDGenerator uidGenerator; public void create(String ownerName, String name) { Account account = Account.from(uidGenerator.generate()); account.create(ownerName,name); store(account); } private void store(Account account){ aggregateService.store(account); } private Account load(String aggregateId) { return aggregateService.load(aggregateId); } public void deposit(String aggregateIdTo, BigDecimal amount) { Account account = load(aggregateIdTo); account.deposit(amount); store(account); } void withdraw(String aggregateId, BigDecimal amount) { Account account = load(aggregateId); account.withdraw(amount); store(account); } public void transfer(String aggregateIdFrom, String aggregateIdTo, BigDecimal amount) { Account from = load(aggregateIdFrom); Account to = load(aggregateIdTo); from.withdraw(amount); store(from); to.deposit(amount); store(to); } }
Dlaczego stosować event sourcing
Wprowadzenie eventów do aplikacji nie musi być skomplikowane. Jednak dlaczego miałbyś to robić? Po pierwsze każdy event jest immutable, co już sporo ułatwia podczas pracy w środowisku wielowątkowym. Kolejnym plusem jest to, że zdarzenia są małe, posiadają tylko niezbędne informacje dla konkretnego eventu. Łącząc to z ich niezmiennością, będą zapisywane tylko raz a więcej razy odczytywane. Wobec tego transakcje będą dosyć szybkie. Oprócz powyższych event sourcing ma również inne zalety:
- skalowalność – łatwo eventy przechowywać w różnych bazach, na różnych maszynach z odpowiednią konfiguracją pod zapis. Odczyt zdarzeń również może być z całkiem innej bazy
- audyt – cała historia zmiany stanu agregatu jest zapisana. Dzięki temu nie musisz wprowadzać, żadnych tabel audytowych. Również w bardzo prosty sposób jesteś w stanie zdiagnozować dlaczego wystąpiły błędy. Ładujesz wszystkie zdarzenia i próbujesz wywołać akcję, która się nie powiodła. Na prawdę event sourcing potrafi ułatwić debugowanie.
- łatwość dodawania nowych funkcjonalności – każdy handler dla eventów jest niezależny od pozostałych. Czyli każda funkcjonalność jest stworzona zgodnie z SRP a Ciebie interesuje tylko to co dany event będzie zawierał.
Kiedy stosować event sourcing
Event sourcing możesz stosować w wielu przypadkach. Pierwszym z nich jest to, gdy Twoja aplikacja zbiera dane z różnych źródeł – integracje z systemami zewnętrznymi, parsowanie plików, dane wprowadzane przez użytkownika. Kolejnym przykładem może być gdy Twój system zajmuje się w głównej mierze przetwarzaniem danych a nie koniecznie ich prezentowaniem. Tak samo, gdy warstwa prezentacji jest niezdefiniowana, ciągle się zmienia to warto rozważyć użycie event sourcingu. A także gdy ważna jest dla Ciebie spójność ostateczna.

Wady
Oczywiście, nie ma rozwiązań idealnych i każde ma swoje wady. Tak samo jest z event sourcingiem. Pierwszym minusem jest to, że próg wejścia jest wyższy niż do klasycznych aplikacji. Potrzeba tutaj zmiany myślenia a to może być trudne. Nie wolno zapomnieć o tym, że w kodzie nie podążasz co raz głębiej w metody, tylko wyszukujesz eventy oraz ich handlery. Problemem może też być warstwa prezentacji albo właściwie jej brak. Z tym możesz łatwo sobie poradzić poprzez załadowanie całego agregatu na podstawie kolekcji zdarzeń lub stworzenie dedykowanych widoków w handlerach. Nie możesz również zapomnieć o rozwiązaniu konfliktów w sytuacji gdy kilka eventów może być zaaplikowanych jednocześnie.
Jak zacząć
To jest bardzo dobre pytanie. W sieci znajdziesz sposoby aby do event sourcingu użyć kafki, rabbita, frameworków jak axon, bazy danych – event store. Moim zdaniem na początku mogą Ci przynieść więcej szkody niż pożytku. Spróbuj od podstaw zrozumieć co się dzieje a nie polegać na magii narzędzi. Użyj guavowego event busa, springowego event publishera. O takich podstawach w zrozumieniu i łatwości napisania wielu rzeczy jest prezentacja Davida Schmitz. Najpierw zrozum, poznaj od podstaw jak działa event sourcing a w razie potrzeby dodaj bardziej zaawansowane toole. Inne prezentacje które mogą polecić na temat event sourcingu, to którakolwiek Jakuba Pillimona.
Kod znajduje się na githubie.
Warto też wspomnieć o wersjonowaniu event-ów jako minus
Dzięki Patryk za komentarz. Masz rację, że wersjonowanie eventów jest problematyczne.
Jednak rozwiązaniem na to jest aby ich nie wersjonować. W sytuacji gdy masz taką potrzebę tworzysz nowy event i w tej sytuacji gdy stary mógł zostać wywołany wtedy używasz tego nowego. Ewentualnie w razie potrzeby tworzysz event uzupełniający dane dla wszystkich wcześniejszych. W takiej sytuacji tylko podczas ładowanie event streamu wykorzystasz ten stary event i event uzupełniający dla już istniejących a dla reszty będą załadowane tylko te nowe. Rozwiązanie jest omówione w zalinkowanej prezentacji Davida Schmitza.
Chyba, że masz jakieś przykłady, gdzie to rozwiązanie się nie sprawdzi?
Który biznes nie zażąda raportów? Jak sobie wtedy radzić z brakiem sql żeby nie wylądować z systemem który na bieżąco utrzymuje każdy rekord w postaci hybrydowej – snapshot do czytania + eventy do update?
Oczywiście raporty to bardzo ważna rzecz i każdy business powinien do nich mieć dostęp. Jest to dosyć szeroka kwestia ale po pierwsze należy zrozumieć czego dokładnie oczekuje biznes. Może się okazać, że nie potrzebujesz robić dużych sql tylko jakieś konkretne dane, metryki np liczba nowych kont, konta niezweryfikowane, anulowane transakcje. Po drugie bardzo możliwe, że część danych jest w innych systemach jak CRM, fakturowanie i inne. Wobec tego dane też są jakoś łączone ze sobą. Wbrew pozorom przy event sourcingu generowanie raportów biznesowych, które są ważne dla biznesu jest w wielu sytuacjach łatwiejsze, bo jednak masz też większą wiedzę co dzieje się w systemie. Rozwiązanie hybrydowe o którym piszesz to przecież CQRS w wielu systemach też jest używany. I ma to jak najbardziej sens, bo to co generuje raport nie operuje na tych tabelach które są produkcyjne. Co za tym idzie spokojnie może być nawet w innej bazie, gdzie będzie odpowiednio skonfigurowany.