The Easy Path to Event Sourcing with Marten and PostgreSQL

Intro
What is event sourcing? First of all, to avoid misconception it is a design pattern rather than a non architecture pattern.
-
A design pattern is focuses on solving specific coding issues (e.g., how to create objects, handle communication, or manage state). In a nutshell examples are GOF patterns like Singleton, Factory Method, Observer, Strategy etc...
-
An architecture pattern is define the overall structure and orgnization of the sytem/systems and how the components should interact at a larger scale. In a contrast examples are CQRS, Hexagonal Architecture, Domain Driven Design and so on...
Focused on storing application state by recording all immutable events that change its state. The main purpose that event sourcing provides us is:
- auditability/traceability
- system resilience
- replaying past reconstruction to the last stage etc..
To be more precise, you can literally take snapshots of stream and events copy it no another environment and replay it from zero. To see what went wrong. It is a common need for very sensitive systems like banking system, financial trading system, e-commerce platform paltform, but almost every event system needs something like an audit log, so it might be worth considering event stores.
If your choice is a PostgreSQL database that supports a great type jsonb. That allows you to use it as a relational database with NoSQL features. Is there a good library for that? Does any exist?? Yes, there is such a thing as Marten. It's an open-source .NET library that provides event sourcing capabilities on top of PostgreSQL. Marten has two feature one is an abstraction for PostgreSQL to use as Document Database that supports to use our favorits and love feature - linq extensions and queries another is a EventStore.
Lets integrate the Marten in our project.
- nuget:
dotnet add package Marten
- configuration Asp.net core
// This is the absolute, simplest way to integrate Marten into your
// .NET application with Marten's default configuration
builder.Services.AddMarten(options =>
{
// Establish the connection string to your Marten database
options.Connection(builder.Configuration.GetConnectionString("Marten")!);
// Specify that we want to use STJ as our serializer
options.UseSystemTextJsonForSerialization();
// If we're running in development mode, let Marten just take care
// of all necessary schema building and patching behind the scenes
if (builder.Environment.IsDevelopment())
{
options.AutoCreateSchemaObjects = AutoCreate.All;
}
});
- Let's create our first stream (aka aggregate root in term of DDD) github repository with full example
public record Entity(
Guid Id,
string Name,
string Description,
string UserId,
DateTime OccuredAt,
bool IsDeleted)
In event stores, we have basic terms like stream and events. Lets assume the stream is a collection of events that happend in time order. Each event has unique id and version number.
And some events as an a example crud operation:
public record OnCreatedEntity(
Guid Id,
string Name,
string Description,
string UserId,
DateTime OccuredAt
);
public record OnUpdatedEntity(
Guid Id,
string Name,
string Description,
string UserId,
DateTime OccuredAt
);
public record OnDescriptionUpdatedEntity(
Guid Id,
string Description,
string UserId,
DateTime OccuredAt
);
public record OnDeletedEntity(
Guid Id,
string UserId,
DateTime OccuredAt
);
Now we need to register our events in Marten
builder.Services.AddMarten(o =>
{
o.DatabaseSchemaName = "postgres";
o.Events.AddEventType<OnCreatedEntity>();
o.Events.AddEventType<OnUpdatedEntity>();
o.Events.AddEventType<OnDescriptionUpdatedEntity>();
o.Events.AddEventType<OnDeletedEntity>();
After running with option AutoCreate.All
Marten will create tables for us automatically. Here is several options of this behavior:
// Marten will create any new objects that are missing or
// attempt to update tables if it can. Will *never* drop
// any existing objects, so no data loss
o.AutoCreateSchemaObjects = AutoCreate.CreateOrUpdate;
//
//
// Marten will create missing objects on demand, but
// will not change any existing schema objects
o.AutoCreateSchemaObjects = AutoCreate.CreateOnly;
We might assume that stream is our entity in terms of Marten. It has some db structure with important fields like:
create table mt_streams
(
id uuid not null constraint pkey_mt_streams_id primary key,
type varchar,
version bigint,
-- ... ect
);
where is:
id
is a stream identifier (our entity identifier), is a FK field that has relation with mt_streams tabletype
is a name of aggregate root class in C# code to be base applay events,version
is current version of stream - .
create table mt_streams
(
id uuid not null constraint pkey_mt_streams_id primary key,
type varchar,
version bigint,
-- ... ect
);
The sturcture of events in a table: mt_events
create table mt_events
(
seq_id bigint not null
constraint pkey_mt_events_seq_id
primary key,
id uuid not null,
stream_id uuid
constraint fkey_mt_events_stream_id
references mt_streams
version bigint not null,
data jsonb not null,
type varchar(500)
-- ... ect
);
where is:
seq_id
is a sequence id of event in streamid
is a just indentifierstream_id
is a foreign key to mt_streams tableversion
is a version of stream when event was created to avoid race conditions and concurrency controldata
is payload of eventtype
is a name of event class in C# code is goint to be (our on_created_entity, on_updated_entity etc...)
It is remainds me a dictionary where keys are streams and values are lists of events.
Then all logic of each event should be add in Entity with method Apply(...) by Marten convention.
public record Entity(
Guid Id,
string Name,
string Description,
string UserId,
DateTime OccuredAt,
bool IsDeleted)
{
public static Entity Create(OnCreatedEntity @event)
{
ArgumentException.ThrowIfNullOrWhiteSpace(@event.Name);
ArgumentException.ThrowIfNullOrWhiteSpace(@event.Description);
return new Entity(@event.Id, @event.Name, @event.Description, @event.UserId, @event.OccuredAt, false);
}
public Entity Apply(OnUpdatedEntity @event)
{
if (IsDeleted)
{
throw new ArgumentException("Current entity already removed");
}
ArgumentException.ThrowIfNullOrWhiteSpace(@event.Name);
ArgumentException.ThrowIfNullOrWhiteSpace(@event.Description);
return new Entity(Id: @event.Id, Name: @event.Name, Description: @event.Description, UserId: @event.UserId,
OccuredAt: @event.OccuredAt, false);
}
public Entity Apply(OnDescriptionUpdatedEntity @event)
{
if (IsDeleted)
{
throw new ArgumentException("Current entity already removed");
}
return this with
{
Id = @event.Id,
Description = @event.Description,
UserId = @event.UserId,
OccuredAt = @event.OccuredAt
};
}
public Entity Apply(OnDeletedEntity deleted) =>
this with
{
Id = deleted.Id,
UserId = deleted.UserId,
OccuredAt = deleted.OccuredAt,
IsDeleted = true
};
}
It is very convinuent to have all logic for handling event inside apropreate metnod. Sutch a validation or another logic in place of applying event.
IDocumentSession as an a access to database
Marten has a data provider called a IDoumentSession. We can assume it as a DBContext in EF Core and has set of extensions methods to work with events and streams. it is going to be as an example of API to create event:
api.MapPost("/create",
async ([FromBody] OnCreatedEntity @event,
[FromServices] IDocumentSession documentSession,
CancellationToken cancellationToken) =>
{
await documentSession.Events.WriteToAggregate<Entity>( // where is is going to be create stream and append event
@event.Id,
stream => stream.AppendOne(@event),
cancellationToken);
return Results.Ok(@event.Id);
});
to Update/assume is working in a same way WriteToAggregate
api.MapPut("/update",
async ([FromBody] OnUpdatedEntity @event,
[FromServices] IDocumentSession documentSession,
CancellationToken cancellationToken) =>
{
await documentSession.Events.WriteToAggregate<Entity>(
@event.Id,
stream => stream.AppendOne(@event),
cancellationToken);
return Results.Ok(@event.Id);
});
after append each event the fucntion WriteToAggregate save to database changes and increments version of stream.
to retrieve an event in API
api.MapGet("/events/root/{id:Guid}",
async ([FromRoute] Guid id,
[FromServices] IDocumentSession documentSession) =>
{
var categoryAggregateRoot = await documentSession.Events.AggregateStreamAsync<Entity>(id);
return Results.Ok(categoryAggregateRoot);
});
the AggregateStreamAsync function returns a list of events per stream id(entity id) that happened in the stream identified by the given id. And applay to our Entity each event through Apply method sequntially.
Projection a game changer
You might how I can retrive the latest state of entity. But without loading all events due to it is heavy operation. For that purpose Marten has a concept of projection. Even you can have a any flat view - that is going to react on event as you defined and use for serach and search and retrive any information about entity.
To create a projection for your entity you need to implelent IProjection interface where is similar concept of conversion of methoded Apply, and lest assuming that if entity ahd been deleted then you need to delete it from projection as non softdeteled:
public class EntityStatus : IProjection
{
public Guid Id { get; set; }
public string Name { get; set; }
public string Description { get; set; }
public int Version { get; set; }
public string UserId { get; set; }
public DateTime OccuredAt { get; set; }
public bool IsDeleted { get; set; }
public void Apply(
IDocumentOperations operations,
IReadOnlyList<StreamAction> streams) =>
ApplyAsync(operations, streams, CancellationToken.None).Wait();
public async Task ApplyAsync(
IDocumentOperations operations,
IReadOnlyList<StreamAction> streams,
CancellationToken cancellationToken)
{
foreach (var data in streams.SelectMany(x => x.Events).OrderBy(x => x.Sequence).Select(x => x.Data))
{
switch (data)
{
case OnCreatedEntity published:
operations.Store(
new EntityStatus
{
Id = published.Id,
Name = published.Name,
Description = published.Description,
OccuredAt = published.OccuredAt,
UserId = published.UserId,
IsDeleted = false,
});
break;
case OnUpdatedEntity updated:
operations.Store(
new EntityStatus
{
Id = updated.Id,
Name = updated.Name,
Description = updated.Description,
OccuredAt = updated.OccuredAt,
UserId = updated.UserId,
IsDeleted = false,
});
break;
case OnDescriptionUpdatedEntity @event:
var existingEntity = await operations.LoadAsync<EntityStatus>(@event.Id, cancellationToken);
if (existingEntity != null)
{
existingEntity.Description = @event.Description;
existingEntity.OccuredAt = @event.OccuredAt;
existingEntity.UserId = @event.UserId;
// Store the updated projection
operations.Store(existingEntity);
}
break;
case OnDeletedEntity deleted:
operations.HardDelete<EntityStatus>(deleted.Id);
break;
}
}
}
finally just need to register a new projection in Marten:
builder.Services.AddMarten(o =>{
// ...
o.Projections.Add(new EntityStatus(), ProjectionLifecycle.Async);
// ...
});
the enum ProjectionLifecycle.Async means that projection will be executed asynchronously. If you prefere a strong consistency you can use ProjectionLifecycle.Synchronous instead. To simplify it will create a background service that periodically checks for new events and applies them to projections.. And it ia awesome because you can build any datastructore and view based on your business requirements and query them eficeiently.
Lets create another projection for audit log:
public record AuditLog(Guid Id, AuditLogEntry[] Entries);
public record AuditLogEntry(
string EventType,
DateTime OccuredAt,
string Description,
object? Metadata);
public class AuditLogProjection : SingleStreamProjection<AuditLog>
{
public static AuditLog Create(OnCreatedEntity createdEntity) =>
new(createdEntity.Id, [
new AuditLogEntry(
nameof(OnCreatedEntity),
createdEntity.OccuredAt,
$"The Entity {createdEntity.Id} was published by user:{createdEntity.UserId} at:{createdEntity.OccuredAt}",
createdEntity)
]);
public AuditLog Apply(OnUpdatedEntity updatedEntity, AuditLog current) =>
current with
{
Entries = current.Entries.Union([
new AuditLogEntry(
nameof(OnUpdatedEntity),
updatedEntity.OccuredAt,
$"The Entity updated: {current.Id} by user:{updatedEntity.UserId} at:{updatedEntity.OccuredAt}",
updatedEntity)
]).ToArray()
};
public AuditLog Apply(OnDescriptionUpdatedEntity updatedEntity, AuditLog current) =>
current with
{
Entries = current.Entries.Union([
new AuditLogEntry(
nameof(OnDescriptionUpdatedEntity),
updatedEntity.OccuredAt,
$"The Description: `{updatedEntity.Description}` was update version of Entity: {current.Id} by user:{updatedEntity.UserId} at:{updatedEntity.OccuredAt}",
updatedEntity)
]).ToArray()
};
public AuditLog Apply(OnDeletedEntity onDeletedEntity, AuditLog current) =>
current with
{
Entries = current.Entries.Union([
new AuditLogEntry(
nameof(OnDeletedEntity),
onDeletedEntity.OccuredAt,
$"The Entity: {current.Id} was deleted by user:{onDeletedEntity.UserId} at:{onDeletedEntity.OccuredAt}",
onDeletedEntity)
]).ToArray()
};
}
How it is time to register it in Marten:
o.Projections.Add<AuditLogProjection>(ProjectionLifecycle.Async);
It is awesome to have human readable logs of what happen with entities already ready.
It is possible to create any format of view based on your needs.
public class EntityFlatView : FlatTableProjection
{
public EntityFlatView() : base("entity_flat_view", SchemaNameSource.DocumentSchema)
{
Table.AddColumn<Guid>("id").AsPrimaryKey();
Table.AddColumn<string>("user_id").AllowNulls();
Table.AddColumn<DateTime>("occured_at").AllowNulls();
Table.AddColumn<bool>("is_deleted").NotNull();
Table.AddColumn<string>("name").AllowNulls();
Table.AddColumn<string>("description").AllowNulls();
Project<OnCreatedEntity>(map =>
{
map.Map(x => x.Name, "name").AllowNulls();
map.Map(x => x.Description, "description").AllowNulls();
map.Map(x => x.UserId, "user_id").AllowNulls();
map.SetValue("is_deleted", "FALSE");
});
Project<OnDescriptionUpdatedEntity>(map =>
{
map.Map(x => x.Description, "description").AllowNulls();
map.Map(x => x.UserId, "user_id").AllowNulls();
map.SetValue("is_deleted", "FALSE");
});
Delete<OnDeletedEntity>();
}
}
you sould register it also in Marten:
o.Projections.Add<EntityFlatView>(ProjectionLifecycle.Async);
Indexing
Let say I was to search EntityStatus by Name and Description:
for this purpuce just need to create index in Marten it is going to be reflect on database depens of approach:
builder.Services.AddMarten(o =>{
// ...
o.Schema.For<EntityStatus>()
.NgramIndex(x => x.Name)
.NgramIndex(x => x.Description);
// ...
});
A potential API endpoint could look like this:
api.MapGet("/events/lastStatus/search/{phrase}",
async ([FromRoute] string phrase, [FromServices] IDocumentSession documentSession) =>
{
var results = await documentSession
.Query<EntityStatus>()
.Where(x => x.Name.NgramSearch(phrase) || x.Description.NgramSearch(phrase)).ToListAsync();
return Results.Ok(results);
});
The query is going to generate use index for Name or Description columns and find all records that match the search term using Ngram Search algorithm.
Conclusion
Marten provides a powerful event store and document database abstraction for PostgreSQL. It supports event sourcing, projections, indexing, and more, making it a great choice for developers working with event-driven architectures. For further details, check out the official documentation.
GitHub Source Code You can find the complete source code on GitHub.
Happy coding! 😊