decidr · core platform
Traditional systems store current state — a row
that gets overwritten.
Event Sourcing stores what happened — an
append-only log of immutable events.
Traditional: Entity { name: "Acme Corp", status: "active" } ← overwrites
Event Sourced:
1 TenantRegistered { name: "Acme" }
2 TenantUpdated { name: "Acme Corp" }
3 TenantSuspended { reason: "..." }
↑ nothing is ever deleted or overwritten
| Concern | Traditional | Event Sourced |
|---|---|---|
| Audit trail | Extra tables, easy to miss | Free — it's the event log |
| Debugging | "How did it get into this state?" | Replay and find out |
| Read models | One schema for everything | Optimise each independently |
| Integrations | Poll for changes, risk missing updates | Subscribe to event stream |
| Recovery | Complex compensating logic | Replay to any point in time |
Each aggregate is a consistency boundary — handles its own commands, emits events, never reads another aggregate's state.
Read models are projections — built from events, never written to directly. Each is optimised for its access pattern.
Section 1
One step at a time
# Mutation → write path → command dispatched to Commanded
mutation {
createEntity(collectionId: "...", values: { name: "Acme" }) {
id
}
}
# Query → read path → Ash resource, no aggregates involved
query {
entities(collectionId: "...") { id values }
}
# apps/core_web/lib/core_web/graphql/resolvers/data_resolver.ex
def create_entity(%{input: input}, %{context: %{core_context: ctx}}) do
cmd = %Core.Data.Commands.CreateEntity{
id: Ecto.UUID.generate(),
tenant_id: ctx.tenant_id,
collection_id: input.collection_id,
values: input.values || %{}
}
Core.CommandedApp.dispatch(cmd)
end
# apps/core/lib/core/router.ex
identify(Core.Data.Aggregates.Entity, by: :id)
dispatch(
[Core.Data.Commands.CreateEntity, ...],
to: Core.Data.Aggregates.Entity
)
execute/2 with the reconstituted struct.
# apps/core/lib/core/data/aggregates/entity.ex
def execute(%__MODULE__{id: nil}, %CreateEntity{} = cmd) do
# ✓ valid — emit the event
%EntityCreated{
id: cmd.id,
tenant_id: cmd.tenant_id,
collection_id: cmd.collection_id,
values: cmd.values
}
end
def execute(%__MODULE__{id: id}, %CreateEntity{}) when not is_nil(id),
do: {:error, :entity_already_exists}
EventStore (PostgreSQL, append-only)
┌──────────────────────────────────────────────────────────┐
│ stream: "entity-550e8400-..." │
│ │
│ seq │ event_type │ data │
│ 1 │ EntityCreated │ { id, tenant_id, values… } │
│ 2 │ AttributeValueSet │ { attribute_id, value… } │
│ 3 │ EntitiesLinked │ { target_id… } │
└──────────────────────────────────────────────────────────┘
def apply(%__MODULE__{} = e, %EntityCreated{} = ev) do
%__MODULE__{e |
id: ev.id,
tenant_id: ev.tenant_id,
collection_id: ev.collection_id,
values: ev.values,
status: :active
}
end
apply/2 is pure — no IO, no side
effects. The updated struct is held in memory (via Commanded's
process registry) for the next command.
defmodule Core.Data.Projectors.EntityProjector do
use Commanded.Projections.Ecto,
application: Core.CommandedApp,
repo: Core.Repo,
name: "entity_projector"
project(%EntityCreated{} = e, _meta, fn multi ->
Ecto.Multi.insert(multi, :entity, %Core.Data.Resources.Entity{
id: e.id,
tenant_id: e.tenant_id,
collection_id: e.collection_id,
values: e.values,
status: :active
})
end)
end
# Resolver:
Ash.read(Core.Data.Resources.Entity,
action: :in_collection,
actor: ctx,
params: %{collection_id: collection_id}
)
# Ash resource:
defmodule Core.Data.Resources.Entity do
use Ash.Resource, domain: Core.Data.Domain,
data_layer: AshPostgres.DataLayer
read :in_collection do
argument :collection_id, :uuid, allow_nil?: false
filter expr(collection_id == ^arg(:collection_id) and status == :active)
end
end
Section 2
Example: adding Comment on an Entity
AddComment{ id, entity_id, author_id, body }
CommentAdded{ id, entity_id, author_id, body }
execute/2 +
apply/2 (~20 lines)
identify +
dispatch (3 lines)
comments table
use Ash.Resource with
attributes and read actions
project(%CommentAdded{}, ...) writes to table
Example: Dgraph (property graph database) —
relevant because our Entity link /
backlink attributes are literally a graph. Multi-hop
traversals are expensive in PostgreSQL, trivial in Dgraph.
defmodule Core.Data.Projectors.EntityProjector.Dgraph do
use Commanded.EventHandler, application: Core.CommandedApp,
name: "entity_dgraph_projector"
def handle(%EntityCreated{} = e, _meta) do
Dlex.mutate(conn(), %{"uid" => "_:#{e.id}",
"dgraph.type" => "Entity", "tenant_id" => e.tenant_id})
end
def handle(%EntitiesLinked{} = e, _meta) do
Dlex.mutate(conn(), %{"uid" => e.from_id,
"links" => [%{"uid" => e.to_id}]})
end
def handle(_, _), do: :ok
end
| Layer | Approach |
|---|---|
| Elixir / OTP | Each aggregate is a process — thousands run concurrently. Cold aggregates load on demand by replaying their stream from last snapshot. |
| Oban Pro | Compute jobs are Oban workers — horizontally scalable, retried automatically. Workflows handle multi-step compute graphs with guaranteed exactly-once execution. |
| EventStore (PG) | Read replicas for subscription fan-out; partition by tenant. |
| Read model (PG) |
Read replicas; table partitioning by tenant_id.
|
| Cassandra |
Add nodes — linear horizontal scale. Partition key =
tenant_id gives physical per-tenant isolation.
|
| App nodes |
Stateless Phoenix nodes behind a load balancer. BEAM
clustering via libcluster for distributed
aggregate registry.
|
| Point | Failure | Recovery |
|---|---|---|
| Command dispatch | Aggregate process crashes | OTP supervisor restarts it; state rebuilt by replaying event stream from snapshot |
| EventStore write | PostgreSQL unavailable |
Returns {:error, …} to caller; no partial state;
safe to retry
|
| Projector | Crashes mid-batch |
Restarts from last confirmed event position (stored in
projections table)
|
| Oban job | Worker crashes | Automatically retried with backoff; max attempts configurable |
| Read model | Corrupted / out of sync | Drop and rebuild — replay event stream through the projector from last snapshot |
All command dispatch routes to the primary region. Reads served from
any region. Tenant affinity (consistent hash on
tenant_id) avoids distributed transaction complexity.