January 15, 2023

Event Sourcing - что там после баланса пользователя?

Будет много кода, приготовьтесь.

Event Sourcing - это подход сохранения сущности как набор выполняемых над ней операций с момента её создания. Самый распространенный пример со счетом баланса пользователя, у каждого пользователя есть баланс в системе с его деньгами. Баланс - это очень важная область, возникшие ошибки могут быть очень критичны для проекта, и бизнеса в целом. Нужна надежная система, с надежным хранилищем, и желательно с историей начисления и списания, чтобы всегда можно было провести аудит баланса. Давайте рассмотрим табличные представления баланса в разрезе CRUD и Event Sourcing.

В случае CRUD была бы таблица balance с полями id - ид баланса, user_id - привязка пользователю, total - сумма на балансе. При списании или начислении система обновляет поле total. На этом вся логика: никакого журнала событий, никаких гарантий, что поля total не обновится по ошибке или установит корректное значение, или не обновится вовсе.

id user_id total
1 1 100
2 2 200
2 3 500

Что предлагает Event Sourcing - предлагает все изменения записывать отдельной строкой в таблицу, выполняется операция INSERT и таблица в БД уже выглядит так:

balance_id user_id operation_type value datetime
1 1 accrue 10 000 2021-12-20 14:50:41
2 2 accrue 5 000 2021-12-21 15:21:32
1 1 write_off 3 500 2021-12-21 15:22:59
2 3 accrue 4 000 2021-12-21 16:50:12
1 1 write_off 2 000 2021-12-22 23:53:31
1 1 accrue 750 2021-12-22 14:50:41

Столбцы таблицы:

Теперь в системе есть журнал событий, и есть возможность проанализировать баланс, списания и начисления, когда и сколько. Чтобы получить текущий баланс пользователя, необходимо получить все операции и посчитать, например, баланс пользователя с ид = 1, будет равен 4750.

balance_id user_id operation_type value datetime
1 1 accrue 10 000 2021-12-20 14:50:41
1 1 write_off 3 500 2021-12-21 15:22:59
1 1 write_off 2 000 2021-12-22 23:53:31
1 1 accrue 750 2021-12-22 14:50:41

Пользователь с ид 1, 20 числа положил 10 000 на баланс, затем 21 числа списал 3500, 22 числа списал ещё 2500 и положил 750, итого 10000-3500-2500+750 = 4750.

Это очень легкий и детский пример Event Sourcing, но ярко раскрывающий его суть:

Состояние сущности хранится как набор операций (событий) выполненных над ней.

Event Sourcing - могут применять как в отдельных модулях, так и во всем проекте ко всем сущностям. Event Sourcing очень выигрывает в подобных системах типа Баланс или счет, пользователя, складской учет и т.д. А также очень полезен, когда в системе есть сложные сущности, которые требуют особого внимания и аудита, это может быть пользователь в системе sso, заказ в интернет-магазине, объект недвижимости в росреестре, или в системе технадзора.

Event Sourcing во всей красе

Далее Event Sourcing мы продолжим рассматривать совместно с CQRS, давайте вспомним его суть. CQRS разбивает действия в системе на два вида операций, а также модели:

CQRS - говорит, что наши модели записи могут храниться как в одной БД (даже в тех же таблицах), так и в разных БД. Event Sourcing - определяет способ хранения WriteModel, когда модель записи хранится в виде набора событий.

Событие

Событие - это простой объект, описывающий какое-либо действие, которое произошло, и все связанные данные с ним. Они неизменяемые, сохраняются только с помощью операции добавления, не обновляют состояния системы, они просто записываются для обработки. Для отмены события необходимо использовать компенсирующее событие, что позволяет сохранять историю и поведение пользователя. Список событий можно использовать для анализа производительности приложения и выявлять тенденции поведения пользователя. А также можно его использовать для любой другой бизнес-информации.

Событие в коде представляет собой класс, который содержит простые примитивы, что позволяет его легко сериализовать, сохранять в хранилище событий и помещать в очередь для обработки.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?php

class UserWasRegistered extends Shared\Domain\Bus\Event\Event
{
public function __construct(
string $aggregateId,
public readonly string $username,
public readonly string $email,
public readonly string $passwordHash,
) {
parent::__construct($aggregateId);
}

public static function eventName(): string
{
return 'auth.domain.user.registered';
}
}

WriteModel

WriteModel - агрегат, модель записи, бизнес модель - главной особенностью при использовании Event Sourcing становится механизм изменения состояния модели данных, состояние изменяется через генерацию и применения события, давайте взглянем на абстракцию.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
<?php

abstract class AggregateRoot implements Aggregate, AggregateEventable, AggregateReconstructable
{
private ?EventStream $events = null;
private int $version = 0;

final protected function __construct(
protected AggregateId $id
) {
}

public function id(): AggregateId
{
return $this->id;
}

public function pullEvents(): EventStream
{
$events = $this->getEventStream();
$this->events = new EventStream($this->id());

return $events;
}

public static function reconstruct(EventStream $eventStream): static
{
$aggregate = new static($eventStream->aggregateId);

$aggregate->replay($eventStream);

return $aggregate;
}

protected function getEventStream(): EventStream
{
if ($this->events) {
return $this->events;
}

$this->events = new EventStream($this->id());

return $this->events;
}

protected function record(Event $events): void
{
$events->setVersion($this->version + 1);
$this->getEventStream()->append($events);
}

public function apply(Event $event): self
{
$classPart = explode('\\', get_class($event));
$name = 'on'.array_pop($classPart);

if (method_exists($this, $name)) {
$this->{$name}($event);
$this->version = $event->getVersion();
}

return $this;
}

protected function recordAndApply(Event $event): void
{
$this->record($event);
$this->apply($event);
}

public function version(): int
{
return $this->version;
}

public function replay(EventStream $events): void
{
foreach ($events as $event) {
$this->apply($event);
}
}
}

Теперь давайте взглянем как меняется работа с самим агрегатом

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
<?php

class User extends AggregateRoot implements Aggregate, AggregateEventable, AggregateReconstructable
{
protected Username $username;
protected UserEmail $email;
protected UserPasswordHash $password;

public static function register(
Username $username,
UserEmail $email,
UserPasswordHash $passwordHash,
): static {
$user = new static(UserId::random());

$user->recordAndApply(
new UserWasRegistered(
$user->id()->value(),
$username->value(),
$email->value(),
$passwordHash->value()
)
);

return $user;
}

protected function onUserWasRegistered(UserWasRegistered $event): void
{
$this->id = UserId::fromString($event->getAggregateId());
$this->username = Username::fromString($event->username);
$this->email = UserEmail::fromString($event->email);
$this->password = UserPasswordHash::fromString($event->passwordHash);
}

public function id(): AggregateId
{
return $this->id;
}

public function changeEmail(UserEmail $email): void
{
$this->recordAndApply(
new UserEmailWasChanged(
$this->id()->value(),
$email->value(),
)
);
}

protected function onUserEmailWasChanged(UserEmailWasChanged $event): void
{
$this->email = new UserEmail($event->email);
}

public function changePasswordHash(UserPasswordHash $passwordHash): void
{
$this->recordAndApply(
new UserPasswordHashWasChanged(
$this->id()->value(),
$passwordHash->value(),
)
);
}

protected function onUserPasswordHashWasChanged(UserPasswordHashWasChanged $event): void
{
$this->password = new UserPasswordHash($event->passwordHash);
}
}

Обратите внимание, что публичные методы для изменения состояния register, changePasswordHash, changeEmail не изменяют состояние объекта напрямую, они генерируют события UserWasRegistered, UserPasswordHashWasChanged, UserEmailWasChanged записывают и применяют с помощью метода recordAndApply. Но чтобы событие применилось, необходимо реализовать обработчики события, для этого на каждое событие есть хендлер внутри агрегата onUserWasRegistered, onUserPasswordHashWasChanged, onUserEmailWasChanged, который применяет событие и изменяет состояние модели.

Repository

Repository - репозиторий, должен отвечать за получение модели чтения по ид, и её сохранение.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?php


interface UserRepositoryPersistence
{
/**
* @throws UserNotFoundException
*/
public function ofId(AggregateId $userId): User;

public function persist(User $user): void;
}

class EsUserRepositoryPersistence implements UserRepositoryPersistence
{
public function __construct(
private EventStore $eventStore,
private Projector $projector,
)
{
}

public function ofId(AggregateId $userId): User
{
$events = $this->eventStore->getEventStream($userId);
return User::reconstruct($events);
}

public function persist(User $user): void
{
$events = $user->pullEvents();
$this->eventStore->append($events);
$this->projector->project($events);
}
}

Метод public function ofId(AggregateId $userId): User возвращает пользователя по id, обращаясь к Event Store, для получения всех событий и затем воссоздает агрегат с помощью User::reconstruct($events).

Метод public function persist(User $user): void отвечает за то, чтобы сохранить модель записи, получает все события изменения состояния, сохраняет их в Event Store и вызывает механизм проекций.

Event Store

Event Store - хранилище событий, где каждое событие, происходящее в системе, записывает друг за другом, в качестве хранилища может выступать что угодно: postgresql, mysql, mongodb, redis и т.д.

Мы рассмотрим реализацию на основе реляционной БД. В качестве хранилища будет выступать таблица event_store со следующими полями:

Простая реализация на PHP может выглядеть так

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
<?php

class DbalEventStore implements EventStore
{
public function __construct(
private readonly Connection $connection,
private readonly Serializer $serializer,
private readonly Deserializer $deserializer,
private readonly string $tableName = 'event_store',
) {
}

public function append(EventStream $eventStream): EventStore
{
$this->connection->beginTransaction();

try {
$version = $this->getCurrentVersion($eventStream->aggregateId);

foreach ($eventStream as $event) {
$this->connection->insert(
$this->tableName,
[
'id' => $event->getEventId(),
'aggregate_id' => $event->getAggregateId(),
'version' => $version++,
'occurred_on' => $event->getOccurredOn()->format(\DateTime::ATOM),
'payload' => $this->serializer->serialize($event),
]
);
}

$this->connection->commit();
} catch (\Exception $exception) {
$this->connection->rollBack();
throw new DbalEventStoreNotAppend('', 0, $exception);
}

return $this;
}

public function getEventStream(AggregateId $aggregateId): EventStream
{
$rawEvents = $this->connection->fetchAllAssociative(
'SELECT "id", "aggregate_id", "occurred_on", "version", "payload" '
. ' FROM ' . $this->tableName
. ' WHERE "aggregate_id" = ?'
. ' ORDER BY "version" ASC',
[$aggregateId->value()]
);

$eventStream = new EventStream($aggregateId);
foreach ($rawEvents as $rawEvent) {
try {
$event = $this->deserializer->deserialize((string)$rawEvent['payload']);
} catch (EventException) {
continue;
}

$eventStream->append($event);
}

return $eventStream;
}

public function getCurrentVersion(AggregateId $aggregateId): int
{
return (int) $this->connection->fetchOne(
'SELECT MAX(version)'
. ' FROM ' . $this->tableName
. ' WHERE "aggregate_id" = ?',
[$aggregateId->value()]
);
}
}

Event Store - является постоянным источником информации, поэтому данные о событиях никогда не должны обновляться.

Projection

Projection - проекция данных, позволяет создавать представления, ReadModel, и восстанавливать состояние системы на любую дату и время. Происходит это за счет того, что события проигрываются один за другим в порядке добавления в хранилище. Таким образом, projection, грубо говоря, это механизм синхронизации WriteModel с ReadModel.

DomainUserRepository умеет находить пользователя по ид, но для авторизации нужен логин и пароль. Выполнить поиск по логину в Event Store мы не можем, это будет очень трудозатратно и не прагматично. Поэтому, нужна ReadModel.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php

class UserSecurity
{
public function __construct(
public readonly string $id,
public readonly string $username,
public readonly string $passwordHash,
) {
}
}

interface UserSecurityRepository
{
/**
* @throws NotFoundException
*/
public function findByUsername(string $username): UserSecurity;
}

Также, необходим механизм синхронизации, в это роли будет выступать класс ProjectorInMemory, реализующий интерфейс Projector.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?php

interface Projector
{
public function project(EventStream $eventStream): void;
}

class ProjectorInMemory implements Projector
{
/**
* @var array<string, Projection[]>
*/
private array $projections = [];

/**
* @param Projection[] $projections
*/
public function __construct(iterable $projections)
{
foreach ($projections as $projection) {
foreach ($projection->listenTo() as $classEvent) {
$this->projections[$classEvent][] = $projection;
}
}
}

public function project(EventStream $eventStream): void
{
foreach ($eventStream as $event) {
if (isset($this->projections[get_class($event)]) === false) {
continue;
}

$projections = $this->projections[get_class($event)];

foreach ($projections as $projection) {
$projection->project($event);
}
}
}
}

И сами проекции UserWasPasswordHashChangedProjection и UserWasRegisteredProjection, которые обрабатывают события UserWasPasswordHashChanged и UserWasRegistered, соответственно, и выполняют синхронизацию с ReadModel.

Projection & Symfony DI

Не забываем сконфигурировать DI, на примере Symfony, пометим все проекции с интерфейсом Projection тегом shared.domain.bus.projector.projection, и попросим DI при создании объекта ProjectorInMemory передать отмеченные объекты этим тегом в конструктор.

1
2
3
4
5
6
7
8
9
10
11
12
13
...

services:
...

_instanceof:
\Shared\Domain\Bus\Projection\Projection:
tags: [ 'shared.domain.bus.projector.projection' ]

Shared\Infrastructure\Bus\Projection\Projector\InMemory\ProjectorInMemory:
arguments: [ !tagged shared.domain.bus.projector.projection ]

...

Query

Один из кейсов - это аутентификация и авторизация пользователя. Аутентификация - будет в примере происходить с помощью логина и пароля, в ответ клиент получит JWT токен, которым можно будет авторизоваться.

Для этого создаем Query GetAuthTokenQuery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?php

class GetAuthTokenQuery implements Query
{
public function __construct(
public readonly string $username,
public readonly string $password,
) {
}
}

class GetAuthTokenResponse implements QueryResponse
{
public function __construct(
public readonly string $token
) {
}
}

class GetAuthTokenHandler implements QueryHandler
{
public function __construct(
private readonly TokenService $tokenService,
private readonly UserSecurityRepository $repository,
private readonly PasswordEncryptor $passwordEncryptor,
) {
}

public function __invoke(GetAuthTokenQuery $query): GetAuthTokenResponse
{
try {
$user = $this->repository->findByUsername($query->username);
} catch (NotFoundException) {
throw new GenerateTokenException('Check credentials');
}

$password = $this->passwordEncryptor->encrypt($query->password);
if ($user->passwordHash === $password) {
return new GetAuthTokenResponse(
$this->tokenService->create($user)
);
}

throw new GenerateTokenException('Check credentials');
}
}

В обработчике пытаемся получить ReadModel пользователя по username, и проверяем hash пароля, если совпало, генерируем jwt токен с помощью сервиса TokenService.

Контроллер может быть реализован следующим образом:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?php

class AuthController extends AbstractController
{
...

#[Route('/auth/login/', name: 'auth_login', methods: 'POST')]
public function login(Request $request, GetAuthTokenHandler $handler): JsonResponse
{
$data = json_decode($request->getContent(), true, 512, JSON_THROW_ON_ERROR);
$query = new GetAuthTokenQuery(
$data['username'],
$data['password'],
);

return new JsonResponse([
'token' => $handler($query)->token,
]);
}

...
}

Авторизации в Symfony

Для авторизации можно применить lexik/LexikJWTAuthenticationBundle, нам остается реализовать Symfony\Component\Security\Core\User\UserInterface и Symfony\Component\Security\Core\User\UserProviderInterface

Для этого создадим декоратор User над моделью чтения UserSecurity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?php

class User implements UserInterface
{
public function __construct(
protected readonly UserSecurity $securityUser
) {
}

public function getRoles(): array
{
return [];
}

public function eraseCredentials()
{
}

public function getUserIdentifier(): string
{
return $this->securityUser->id;
}

public function getUsername(): string
{
return $this->securityUser->username;
}
}

class UserProvider implements UserProviderInterface
{
public function __construct(
protected readonly UserSecurityRepository $repository
) {
}

public function refreshUser(UserInterface $user): void
{
}

public function supportsClass(string $class): bool
{
return self::class === $class || is_subclass_of($class, self::class);
}

public function loadUserByIdentifier(string $identifier): UserInterface
{
$user = $this->repository->findByUsername($identifier);

return new User($user);
}
}

В security.yaml необходимо указать способ аутентификации и наш новый провайдер пользователя:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
security:
enable_authenticator_manager: true

providers:
app_user_provider:
id: Auth\Infrastructure\Symfony\UserProvider

firewalls:
api:
pattern: ^/.*
stateless: true
entry_point: jwt
provider: app_user_provider
jwt: ~
dev:
pattern: ^/(_(profiler|wdt)|css|images|js)/
security: false
main:
lazy: true
provider: app_user_provider

access_control:
- { path: ^/private, roles: IS_AUTHENTICATED_FULLY }
- { path: ^/auth/change_password, roles: IS_AUTHENTICATED_FULLY }

Command

Рассмотрим регистрацию пользователя и смену пароля в качестве команд.

UserRegisterCommand

Команды работают на запись, и работают непосредственно с WriteModel, в обработчике вызываем метод User::register, который, в свою очередь, сгенерирует событие UserWasRegistered, и сохраняем пользователя при помощи UserRepositoryPersistence, который позаботится о том, чтобы события были сохранены и переданы в механизм синхронизации моделей записи и чтения.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?php

class UserRegisterCommand implements Command
{
public function __construct(
public readonly string $username,
public readonly string $email,
public readonly string $password,
) {
}
}

class UserRegisterCommandResponse implements CommandResponse
{
public function __construct(
public readonly string $userId
) {
}
}

class UserRegisterCommandHandler implements CommandHandler
{
public function __construct(
private readonly UserRepositoryPersistence $repositoryPersistence,
private readonly PasswordEncryptor $passwordEncryptor,
) {
}

public function __invoke(UserRegisterCommand $command): UserRegisterCommandResponse
{
$user = User::register(
Username::fromString($command->username),
UserEmail::fromString($command->email),
UserPasswordHash::fromString($this->passwordEncryptor->encrypt($command->password))
);

$this->repositoryPersistence->persist($user);

return new UserRegisterCommandResponse($user->id()->value());
}
}

И реализация контроллера

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?php

class AuthController extends AbstractController
{
...

#[Route('/auth/registration/', name: 'auth_registration', methods: 'POST')]
public function registration(Request $request, UserRegisterCommandHandler $handler): JsonResponse
{
$data = json_decode($request->getContent(), true, 512, JSON_THROW_ON_ERROR);
$command = new UserRegisterCommand(
$data['username'],
$data['email'],
$data['password']
);

return new JsonResponse([
'user_id' => $handler($command)->userId,
]);
}

...
}

UserChangePasswordCommand

По аналогии с предыдущей командой мы работаем c WriteModel, вызываем соответствующий метод, который сгенерирует событие, и оно будет сохранено благодаря реализации UserRepositoryPersistence.

Обратите внимание, для смены пароля необходима авторизация, чтобы выполнить команду, необходимо передать id пользователя.

В контроллере symfony мы можем получить текущего пользователя с помощью метода $this->getUser(), и, соответственно, его идентификатор

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<?php

class AuthController extends AbstractController
{
...

#[Route('/auth/change_password/', name: 'auth_change_password', methods: 'POST')]
public function changePassword(Request $request, UserChangePasswordCommandHandler $handler): JsonResponse
{
$data = json_decode($request->getContent(), true, 512, JSON_THROW_ON_ERROR);
$query = new UserChangePasswordCommand(
$this->getUser()->getUserIdentifier(),
$data['new_password'],
);

return new JsonResponse([
'user_id' => $handler($query)->userId,
]);
}

...
}

Раскачиваем архитектуру

Async projection

Приложение может быть иметь множество различных ReadModel, которые обновлять в реалтайме или на пользовательском хите будет очень непроизводительно, поэтому в целях оптимизации следует реализовать асинхронный прожектор. В данном примере используется Redis в качестве очереди.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<?php

class ProjectorProducer implements Projector
{
public function __construct(
protected readonly ClientInterface $redis,
protected readonly Serializer $serializer,
protected readonly string $queueName = 'projection_event',
)
{
}

public function project(EventStream $eventStream): void
{
foreach ($eventStream as $event) {
$data = $this->serializer->serialize($event);
$this->client->lpush($this->queueName, [$data]);
}
}

}

Также потребуется реализовать консюмер для обработки очереди, и команду, которая запустит консюмер.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
<?php

class ProjectorConsumer
{
/**
* @var array<string, Projection[]>
*/
private array $projections = [];

/**
* @param Projection[] $projections
*/
public function __construct(
iterable $projections,
protected readonly ClientInterface $client,
protected readonly Deserializer $deserializer,
protected readonly string $queueName = 'projection_event',
)
{
foreach ($projections as $projection) {
foreach ($projection->listenTo() as $classEvent) {
$this->projections[$classEvent][] = $projection;
}
}
}

public function consume(): void
{
while ($data = $this->client->lpop($this->queueName)) {
try{
$event = $this->deserializer->deserialize($data);
if (false === isset($this->projections[get_class($event)])) {
continue;
}

$projections = $this->projections[get_class($event)];

foreach ($projections as $projection) {
$projection->project($event);
}
} catch (\Exception) {
}
}
}
}

#[AsCommand(name: 'worker:projection-redis')]
class WorkerProjectionRedis extends Command
{
public function __construct(
protected readonly ProjectorConsumer $workerProjectionRedis
)
{
parent::__construct();
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->workerProjectionRedis->consume();
return Command::SUCCESS;
}
}

Да, консюмер очень похож на синхронный прожектор, за тем исключением, что события поступают из очереди, а не передаются в качестве аргументов в метод.

Особо внимательные из вас, наверное, заметили, что это очень похоже на обычных подписчиков доменных событий. Так и есть, единственное отличие, что projections нацелены на синхронизацию ReadModel-ей. Так, синхронизация данных может иметь более высокий приоритет. В примерах выше это примитивные реализации, чтобы показать механику максимально доступно, вы можете использовать существующую шину событий у вас на проекте, настроить роуторы на Rabbit MQ, чтобы разделить потоки обработки событий и процесс синхронизации, либо использовать kafka, помимо этого есть готовые решения предоставляющие реализацию шин, например, тот же symfony/messenger.

Приложение может в любой момент прочитать журнал событий, а затем использовать для воссоздания или обновлений ReadModel, как в примере: в порядке очереди, по запросу или в рамках cron заданий.

Snapshot

Снапшоты - ещё один трюк по оптимизации. Представьте, ваша WriteModel будет иметь через полгода 1 000 или даже 100 000 событий, согласитесь, получать такое количество событий и воспроизводить будет весьма затратно. Идея заключается в том, чтобы создавать слепки WriteModel через какое-то количество событий, например, каждое пятое:

Репозиторий WriteModel должен будет получить последний снапшот, а затем применить свежие события из event store.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
<?php

class Snapshot
{
public function __construct(
public readonly string $aggregateId,
public readonly int $version,
public readonly string $aggregateSerialized,
) {
}
}

interface SnapshotRepository
{
public function save(Snapshot $snapshot): void;

public function get(string $aggregateId): Snapshot;
}

class DbalSnapshotRepository implements SnapshotRepository
{
public function __construct(
private readonly Connection $connection,
private readonly string $tableName = 'event_store_snapshot',
) {
}

public function save(Snapshot $snapshot): void
{
$data = [
'aggregate_id' => $snapshot->aggregateId,
'version' => $snapshot->version,
'aggregate_serialized' => $snapshot->aggregateSerialized,
];

$snapshotExist = null;

try {
$snapshotExist = $this->get($snapshot->aggregateId);
} catch (SnapshotNotFoundException) {
}

if ($snapshotExist) {
$this->connection->update(
$this->tableName,
$data,
['aggregate_id' => $snapshotExist->aggregateId]
);
} else {
$this->connection->insert($this->tableName, $data);
}
}

public function get(string $aggregateId): Snapshot
{
$rows = $this->connection->fetchAllAssociative(
'SELECT "version", "aggregate_id", "aggregate_serialized"'
.' FROM '.$this->tableName
.' WHERE "aggregate_id" = ?'
.' LIMIT 1',
[$aggregateId]
);

if (0 === count($rows)) {
throw new SnapshotNotFoundException();
}
$snapshot = $rows[0];

if (isset($snapshot['aggregate_id']) && isset($snapshot['version']) && isset($snapshot['aggregate_serialized'])) {
return new Snapshot(
(string) $snapshot['aggregate_id'],
(int) $snapshot['version'],
(string) $snapshot['aggregate_serialized'],
);
}

throw new SnapshotNotFoundException();
}
}

Snapshot должен иметь как минимум id, версию и состояние сущности.

Теперь необходимо добавить описанную логику в репозиторий сущности, сделаем это при помощи декоратора

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
<?php

class UserRepositoryDecorator implements UserRepositoryPersistence
{
public function __construct(
protected readonly UserRepositoryPersistence $repository,
) {
}

public function ofId(AggregateId $userId): User
{
return $this->repository->ofId($userId);
}

public function persist(User $user): void
{
$this->repository->persist($user);
}
}

class SnapshotUserRepositoryDecorator extends UserRepositoryDecorator implements UserRepositoryPersistence
{
public function __construct(
UserRepositoryPersistence $repository,
protected readonly SnapshotRepository $snapshotRepository,
protected readonly EventStore $eventStore,
protected readonly int $every = 100,
) {
parent::__construct($repository);
}

public function ofId(AggregateId $userId): User
{
try {
$snapshot = $this->snapshotRepository->get($userId->value());
$user = unserialize($snapshot->aggregateSerialized);

if (false === $user instanceof User) {
throw new \RuntimeException('Bad snapshot');
}

$newEvents = $this->eventStore->getEventStreamFromVersion($user->id(), $snapshot->version);
$user->replay($newEvents);

return $user;
} catch (\Exception) {
}

return $this->repository->ofId($userId);
}

public function persist(User $user): void
{
$this->repository->persist($user);

$oldSnapshotVersion = 0;
try {
$snapshot = $this->snapshotRepository->get($user->id()->value());
$oldSnapshotVersion = $snapshot->version;
} catch (SnapshotNotFoundException) {
}

if ($user->version() - $oldSnapshotVersion >= $this->every) {
$this->snapshotRepository->save(
new Snapshot(
$user->id()->value(),
$user->version(),
serialize($user),
)
);
}
}
}

Обратите внимание как выполняется серилаизация и десериализация агрегата, строки 36 и 68, применяется функции PHP serialize и unserialize. В данном подходе, могут возникнуть проблемы при рефакторинге, которые повлекут за собой пересборку всех снапшотов.

Upcasting

Рано или поздно системы меняются вместе с бизнесом, а значит сохраненные ранее события могут стать невалидными в новой версии приложения, может быть переименовано поле, удаленно или добавлено новое.

Обновление формата события потребует выполнить миграцию, и обновить существующие события. Но процесс может оказаться длительным. Поэтому, используют подход upcasting, суть его заключается в том, чтобы старое событие привести к новому.

Представим, что к приложению поступили новые требования - у пользователя должны быть роли. Необходимо создать новое событие с новым полем.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php

class UserWasRegisteredV2 extends Event
{
public function __construct(
string $aggregateId,
public readonly string $username,
public readonly string $email,
public readonly string $passwordHash,
public readonly array $roles,
) {
parent::__construct($aggregateId);
}

public static function eventName(): string
{
return 'auth.domain.user.registered.v2';
}
}

Для старых пользователей, созданных когда-то без ролей, по умолчанию должна присваиваться роль member.

На этапе десериализации события можем вызвать механизм upcating-а и получить новый объект события.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
<?php

class UpcastSerializer extends DeserializerDecorator
{
/**
* @var Upcaster[]
*/
protected array $upcasters = [];

public function __construct(
Deserializer $deserializer,
iterable $upcasters
) {
parent::__construct($deserializer);
/**
* @var Upcaster[] $upcasters
*/
foreach ($upcasters as $upcaster) {
$type = $this->extractType(get_class($upcaster));
if (null === $type) {
continue;
}
$this->upcasters[$type] = $upcaster;
}
}

/**
* @param string $stringEvent
* @return Event
*
* @throws \Shared\Domain\Bus\Event\EventException
*
* @psalm-suppress InvalidFunctionCall
*/
public function deserialize(string $stringEvent): Event
{
$event = $this->deserializer->deserialize($stringEvent);
$eventClass = get_class($event);

if (isset($this->upcasters[$eventClass])) {
/** @var Event $event */
$event = $this->upcasters[$eventClass]($event);
}

return $event;
}

/**
* @psalm-param class-string $class
*/
public function extractType(string $class): ?string
{
$reflector = new \ReflectionClass($class);
$method = $reflector->getMethod('__invoke');

$parameters = $method->getParameters();
if (empty($parameters)) {
return null;
}

/** @psalm-var \ReflectionNamedType|null $fistParameterType */
$fistParameterType = $parameters[0]->getType();

if (null === $fistParameterType) {
return null;
}

return $fistParameterType->getName();
}
}

Upcasting позволяет не обновлять события в истории, можно не поддерживать старые версии в коде, но появляется новый код в виде “конвертатора” версий, а также такой подход влияет на производительность получения модели чтения из хранилища.

Выбор как всегда между компромиссами: обновлять формат сообщений в хранилище или upcasting. Я бы применил upcasting на время, а затем обновил бы не спеша события в хранилище, это позволит быстро перейти на новую версию приложения и даст время на обновление хранилища без последствий.

Когда стоит применять Event Sourcing

CRUD имеет ряд ограничений, т.к. операции выполняются непосредственно в хранилище, то это может замедлить производительность, а при работе нескольких пользователей над одним ресурсом может приводить к блокировкам и конфликтам данных.

Стоит использовать Event Sourcing:

Не стоит применять Event Sourcing в простых доменных, с маленькой бизнес-логикой, вне домена, там где достаточно простого CRUD, и не нужен журнал аудита.

Выбор за вами 😉

Исходники на Github

Полезное