Event Sourcing - что там после баланса пользователя?
Будет много кода, приготовьтесь.
Event Sourcing - это подход сохранения сущности как набор выполняемых над ней операций с момента её создания. Самый
распространенный пример со счетом баланса пользователя, у каждого пользователя есть баланс в системе с его деньгами.
Баланс - это очень важная область, возникшие ошибки могут быть очень критичны для проекта, и бизнеса в целом. Нужна надежная
система, с надежным хранилищем, и желательно с историей начисления и списания, чтобы всегда можно было провести аудит
баланса. Давайте рассмотрим табличные представления баланса в разрезе CRUD и Event Sourcing.
В случае CRUD была бы таблица balance с полями id - ид баланса, user_id - привязка пользователю, total - сумма на
балансе. При списании или начислении система обновляет поле total. На этом вся логика: никакого журнала событий,
никаких гарантий, что поля total не обновится по ошибке или установит корректное значение, или не обновится вовсе.
id
user_id
total
1
1
100
2
2
200
2
3
500
Что предлагает Event Sourcing - предлагает все изменения записывать отдельной строкой в таблицу, выполняется операция
INSERT и таблица в БД уже выглядит так:
balance_id
user_id
operation_type
value
datetime
1
1
accrue
10 000
2021-12-20 14:50:41
2
2
accrue
5 000
2021-12-21 15:21:32
1
1
write_off
3 500
2021-12-21 15:22:59
2
3
accrue
4 000
2021-12-21 16:50:12
1
1
write_off
2 000
2021-12-22 23:53:31
1
1
accrue
750
2021-12-22 14:50:41
Столбцы таблицы:
balance_id - ид баланса, над котором выполнилась операция;
user_id - ид пользователя, который выполнил операцию;
operation_type - тип операции, accrue - начисление, write_off - списание
value - сумма денег, участвующих в операции;
datetime - дата и время выполненной операции.
Теперь в системе есть журнал событий, и есть возможность проанализировать баланс, списания и начисления, когда и сколько.
Чтобы получить текущий баланс пользователя, необходимо получить все операции и посчитать, например, баланс пользователя с
ид = 1, будет равен 4750.
balance_id
user_id
operation_type
value
datetime
1
1
accrue
10 000
2021-12-20 14:50:41
1
1
write_off
3 500
2021-12-21 15:22:59
1
1
write_off
2 000
2021-12-22 23:53:31
1
1
accrue
750
2021-12-22 14:50:41
Пользователь с ид 1, 20 числа положил 10 000 на баланс, затем 21 числа списал 3500, 22 числа списал ещё 2500 и положил
750, итого 10000-3500-2500+750 = 4750.
Это очень легкий и детский пример Event Sourcing, но ярко раскрывающий его суть:
Состояние сущности хранится как набор операций (событий) выполненных над ней.
Event Sourcing - могут применять как в отдельных модулях, так и во всем проекте ко всем сущностям. Event Sourcing
очень выигрывает в подобных системах типа Баланс или счет, пользователя, складской учет и т.д. А также очень полезен,
когда в системе есть сложные сущности, которые требуют особого внимания и аудита, это может быть пользователь в системе
sso, заказ в интернет-магазине, объект недвижимости в росреестре, или в системе технадзора.
Event Sourcing во всей красе
Далее Event Sourcing мы продолжим рассматривать совместно с CQRS, давайте вспомним его суть. CQRS разбивает действия в системе
на два вида операций, а также модели:
WriteModel - также модель записи, агрегат, бизнес модель, содержит бизнес правила, операции выполняющие действия над
ней называют командами (Command);
ReadModel - также модель чтения, DTO, является представлением данных, не содержит бизнес логику, над моделью не выполняется
никаких команд, модель чтения можно получить с помощью запроса (Query).
CQRS - говорит, что наши модели записи могут храниться как в одной БД (даже в тех же таблицах), так и в разных БД.
Event Sourcing - определяет способ хранения WriteModel, когда модель записи хранится в виде набора событий.
Событие
Событие - это простой объект, описывающий какое-либо действие, которое произошло, и все связанные данные с ним. Они
неизменяемые, сохраняются только с помощью операции добавления, не обновляют состояния системы, они просто записываются
для обработки. Для отмены события необходимо использовать компенсирующее событие, что позволяет сохранять историю и поведение
пользователя. Список событий можно использовать для анализа производительности приложения и выявлять тенденции поведения
пользователя. А также можно его использовать для любой другой бизнес-информации.
Событие в коде представляет собой класс, который содержит простые примитивы, что позволяет его легко сериализовать, сохранять
в хранилище событий и помещать в очередь для обработки.
WriteModel - агрегат, модель записи, бизнес модель - главной особенностью при использовании Event Sourcing становится
механизм изменения состояния модели данных, состояние изменяется через генерацию и применения события, давайте
взглянем на абстракцию.
private ?EventStream $events = null; - набор событий, которые выполнились над объектом до момента сохранения его состояния;
public function pullEvents(): EventStream - метод возвращает набор событий и очищает;
public static function reconstruct(EventStream $eventStream): static - метод, который воссоздает агрегат из
последовательности событий, один из ключевых методов, благодаря ему, появляется возможность из сохраненных в Event Store
событий воссоздать модель данных с актуальной информацией;
protected function record(Event $events): void - записывает событие в массив;
public function apply(Event $event): self - применяет событие к агрегату;
protected function recordAndApply(Event $event): void - записывает и применяет событие, дальше будем использовать
это метод в “сеттерах”;
public function replay(EventStream $events): void - применяет события к агрегату.
Теперь давайте взглянем как меняется работа с самим агрегатом
Обратите внимание, что публичные методы для изменения состояния register, changePasswordHash, changeEmail не изменяют
состояние объекта напрямую, они генерируют события UserWasRegistered, UserPasswordHashWasChanged, UserEmailWasChanged
записывают и применяют с помощью метода recordAndApply. Но чтобы событие применилось, необходимо реализовать обработчики
события, для этого на каждое событие есть хендлер внутри агрегата onUserWasRegistered, onUserPasswordHashWasChanged,
onUserEmailWasChanged, который применяет событие и изменяет состояние модели.
Repository
Repository - репозиторий, должен отвечать за получение модели чтения по ид, и её сохранение.
Метод public function ofId(AggregateId $userId): User возвращает пользователя по id, обращаясь к Event Store, для получения
всех событий и затем воссоздает агрегат с помощью User::reconstruct($events).
Метод public function persist(User $user): void отвечает за то, чтобы сохранить модель записи, получает все события
изменения состояния, сохраняет их в Event Store и вызывает механизм проекций.
Event Store
Event Store - хранилище событий, где каждое событие, происходящее в системе, записывает друг за другом, в качестве
хранилища может выступать что угодно: postgresql, mysql, mongodb, redis и т.д.
Мы рассмотрим реализацию на основе реляционной БД. В качестве хранилища будет выступать таблица event_store со
следующими полями:
id, uuid - ид события;
aggregate_id, uuid - ид сущности, также в других вариациях это поле может называться stream_id;
version, int - версия сущности, с каждым новым событием к определенной сущности значение в этом поле увеличивается;
occurred_on, datetime - дата и время, когда произошло событие;
publicfunctiongetCurrentVersion(AggregateId $aggregateId): int { return (int) $this->connection->fetchOne( 'SELECT MAX(version)' . ' FROM ' . $this->tableName . ' WHERE "aggregate_id" = ?', [$aggregateId->value()] ); } }
Event Store - является постоянным источником информации, поэтому данные о событиях никогда не должны обновляться.
Projection
Projection - проекция данных, позволяет создавать представления, ReadModel, и восстанавливать состояние системы на любую
дату и время. Происходит это за счет того, что события проигрываются один за другим в порядке добавления в хранилище.
Таким образом, projection, грубо говоря, это механизм синхронизации WriteModel с ReadModel.
DomainUserRepository умеет находить пользователя по ид, но для авторизации нужен логин и пароль. Выполнить поиск по
логину в Event Store мы не можем, это будет очень трудозатратно и не прагматично. Поэтому, нужна ReadModel.
И сами проекции UserWasPasswordHashChangedProjection и UserWasRegisteredProjection, которые обрабатывают события
UserWasPasswordHashChanged и UserWasRegistered, соответственно, и выполняют синхронизацию с ReadModel.
Projection & Symfony DI
Не забываем сконфигурировать DI, на примере Symfony, пометим все проекции с интерфейсом Projection тегом
shared.domain.bus.projector.projection, и попросим DI при создании объекта ProjectorInMemory передать отмеченные
объекты этим тегом в конструктор.
Один из кейсов - это аутентификация и авторизация пользователя. Аутентификация - будет в примере происходить с
помощью логина и пароля, в ответ клиент получит JWT токен, которым можно будет авторизоваться.
В обработчике пытаемся получить ReadModel пользователя по username, и проверяем hash пароля, если совпало, генерируем
jwt токен с помощью сервиса TokenService.
Контроллер может быть реализован следующим образом:
Для авторизации можно применить lexik/LexikJWTAuthenticationBundle,
нам остается реализовать Symfony\Component\Security\Core\User\UserInterface и Symfony\Component\Security\Core\User\UserProviderInterface
Для этого создадим декоратор User над моделью чтения UserSecurity
Рассмотрим регистрацию пользователя и смену пароля в качестве команд.
UserRegisterCommand
Команды работают на запись, и работают непосредственно с WriteModel, в обработчике вызываем метод User::register,
который, в свою очередь, сгенерирует событие UserWasRegistered, и сохраняем пользователя при помощи UserRepositoryPersistence,
который позаботится о том, чтобы события были сохранены и переданы в механизм синхронизации моделей записи и чтения.
По аналогии с предыдущей командой мы работаем c WriteModel, вызываем соответствующий метод, который сгенерирует событие,
и оно будет сохранено благодаря реализации UserRepositoryPersistence.
Обратите внимание, для смены пароля необходима авторизация, чтобы выполнить команду, необходимо передать id пользователя.
В контроллере symfony мы можем получить текущего пользователя с помощью метода $this->getUser(), и, соответственно, его
идентификатор
Приложение может быть иметь множество различных ReadModel, которые обновлять в реалтайме или на пользовательском хите будет
очень непроизводительно, поэтому в целях оптимизации следует реализовать асинхронный прожектор. В данном примере используется
Redis в качестве очереди.
protectedfunctionexecute(InputInterface $input, OutputInterface $output): int { $this->workerProjectionRedis->consume(); returnCommand::SUCCESS; } }
Да, консюмер очень похож на синхронный прожектор, за тем исключением, что события поступают из очереди, а не передаются
в качестве аргументов в метод.
Особо внимательные из вас, наверное, заметили, что это очень похоже на обычных подписчиков доменных событий. Так и есть,
единственное отличие, что projections нацелены на синхронизацию ReadModel-ей. Так, синхронизация данных может иметь более
высокий приоритет. В примерах выше это примитивные реализации, чтобы показать механику максимально доступно, вы можете
использовать существующую шину событий у вас на проекте, настроить роуторы на Rabbit MQ, чтобы разделить потоки обработки
событий и процесс синхронизации, либо использовать kafka, помимо этого есть готовые решения предоставляющие реализацию
шин, например, тот же symfony/messenger.
Приложение может в любой момент прочитать журнал событий, а затем использовать для воссоздания или обновлений
ReadModel, как в примере: в порядке очереди, по запросу или в рамках cron заданий.
Snapshot
Снапшоты - ещё один трюк по оптимизации. Представьте, ваша WriteModel будет иметь через полгода 1 000 или даже 100 000
событий, согласитесь, получать такое количество событий и воспроизводить будет весьма затратно. Идея заключается в том, чтобы
создавать слепки WriteModel через какое-то количество событий, например, каждое пятое:
Репозиторий WriteModel должен будет получить последний снапшот, а затем применить свежие события из event store.
Обратите внимание как выполняется серилаизация и десериализация агрегата, строки 36 и 68, применяется функции PHP
serialize и unserialize.
В данном подходе, могут возникнуть проблемы при рефакторинге, которые повлекут за собой пересборку всех снапшотов.
Upcasting
Рано или поздно системы меняются вместе с бизнесом, а значит сохраненные ранее события могут стать невалидными в новой
версии приложения, может быть переименовано поле, удаленно или добавлено новое.
Обновление формата события потребует выполнить миграцию, и обновить существующие события. Но процесс может оказаться длительным.
Поэтому, используют подход upcasting, суть его заключается в том, чтобы старое событие привести к новому.
Представим, что к приложению поступили новые требования - у пользователя должны быть роли. Необходимо создать новое
событие с новым полем.
Upcasting позволяет не обновлять события в истории, можно не поддерживать старые версии в коде, но появляется новый код
в виде “конвертатора” версий, а также такой подход влияет на производительность получения модели чтения из хранилища.
Выбор как всегда между компромиссами: обновлять формат сообщений в хранилище или upcasting. Я бы применил upcasting
на время, а затем обновил бы не спеша события в хранилище, это позволит быстро перейти на новую версию приложения и даст
время на обновление хранилища без последствий.
Когда стоит применять Event Sourcing
CRUD имеет ряд ограничений, т.к. операции выполняются непосредственно в хранилище, то это может замедлить производительность,
а при работе нескольких пользователей над одним ресурсом может приводить к блокировкам и конфликтам данных.
Стоит использовать Event Sourcing:
когда необходимо знать причину обновления данных;
свести к минимуму конфликт обновления данных;
когда нужна высокая производительность;
когда нужен журнал аудита, воспроизвести события, привести систему / сущность к определенной точки времени, отменить
события;
когда нужна гибкость с ReadModel, создавать и изменять разные представления в условиях быстро изменяющихся требований
к проекту;
Не стоит применять Event Sourcing в простых доменных, с маленькой бизнес-логикой, вне домена, там где достаточно простого
CRUD, и не нужен журнал аудита.