Event Sourcing
Architecture

decidr · core platform

What is Event Sourcing?

Traditional systems store current state — a row that gets overwritten.
Event Sourcing stores what happened — an append-only log of immutable events.


  Traditional:   Entity { name: "Acme Corp", status: "active" }  ← overwrites

  Event Sourced:
    1  TenantRegistered  { name: "Acme" }
    2  TenantUpdated     { name: "Acme Corp" }
    3  TenantSuspended   { reason: "..." }
                ↑ nothing is ever deleted or overwritten
  
Current state is always derived by replaying events from the the most recent snapshot (or worst case, from beginning).

How It Helps Us

Concern Traditional Event Sourced
Audit trail Extra tables, easy to miss Free — it's the event log
Debugging "How did it get into this state?" Replay and find out
Read models One schema for everything Optimise each independently
Integrations Poll for changes, risk missing updates Subscribe to event stream
Recovery Complex compensating logic Replay to any point in time

Our Goals

  1. Write side — all mutations through Commanded aggregates. No direct DB writes.
  2. Read side — Ash resources projected from events, optimised per query pattern.
  3. Multiple read models — same event stream fans out to PostgreSQL, Cassandra, and beyond.
  4. Compute pipeline — formula / rollup / lookup values computed reactively from events.
  5. Workflow engine — task execution driven by events, not polling.
  6. Multi-region — event log is source of truth; read models rebuilt anywhere.

The Model — Write Side (Aggregates)

Each aggregate is a consistency boundary — handles its own commands, emits events, never reads another aggregate's state.

flowchart LR subgraph identity["identity"] U[User] end subgraph schema["schema_definition"] Sp[Space] --> Co[Collection] Co --> At[Attribute] Co --> Vi[View] --> Va[ViewAttribute] end subgraph data["data"] En[Entity] end subgraph workflow["workflow"] Go[Goal] Fl[Flow] Ta[Task] -->|depends_on| Tb[Task] end Co --> En Fl --> En

The Model — Read Side (Ash Resources)

Read models are projections — built from events, never written to directly. Each is optimised for its access pattern.

flowchart LR subgraph pg["PostgreSQL — primary reads"] T[Tenant] --> U[User] T --> Sp[Space] --> Co[Collection] Co --> At[Attribute] Co --> Vi[View] --> Va[ViewAttribute] Co --> En[Entity] T --> Go[Goal] T --> Ta[Task] Ta -.->|optional| Fl[Flow] end subgraph cass["Cassandra — high-throughput"] CE["Entity (denormalised)"] end subgraph ets["ETS — hot cache"] CS[Collection schema] end

The Event Sourcing Flow

One step at a time

Step 1

Client sends a GraphQL request


# Mutation → write path → command dispatched to Commanded
mutation {
  createEntity(collectionId: "...", values: { name: "Acme" }) {
    id
  }
}

# Query → read path → Ash resource, no aggregates involved
query {
  entities(collectionId: "...") { id values }
}
  
Mutations trigger the write path  ·  Queries go directly to the read model — no aggregates involved.
Step 2

Resolver translates mutation → Command


# apps/core_web/lib/core_web/graphql/resolvers/data_resolver.ex

def create_entity(%{input: input}, %{context: %{core_context: ctx}}) do
  cmd = %Core.Data.Commands.CreateEntity{
    id:            Ecto.UUID.generate(),
    tenant_id:     ctx.tenant_id,
    collection_id: input.collection_id,
    values:        input.values || %{}
  }
  Core.CommandedApp.dispatch(cmd)
end
  
The resolver's only job: translate GraphQL input → Command struct → dispatch. No business logic lives here.
Step 3

Router routes Command to the Aggregate


# apps/core/lib/core/router.ex

identify(Core.Data.Aggregates.Entity, by: :id)

dispatch(
  [Core.Data.Commands.CreateEntity, ...],
  to: Core.Data.Aggregates.Entity
)
  
Commanded loads the aggregate by replaying its event stream from the EventStore, then calls execute/2 with the reconstituted struct.
Step 4

Aggregate validates → returns an Event


# apps/core/lib/core/data/aggregates/entity.ex

def execute(%__MODULE__{id: nil}, %CreateEntity{} = cmd) do
  # ✓ valid — emit the event
  %EntityCreated{
    id:            cmd.id,
    tenant_id:     cmd.tenant_id,
    collection_id: cmd.collection_id,
    values:        cmd.values
  }
end

def execute(%__MODULE__{id: id}, %CreateEntity{}) when not is_nil(id),
  do: {:error, :entity_already_exists}
  
The aggregate never writes to a database — it returns an event struct (or an error).
Step 5

Event persisted to the EventStore


  EventStore (PostgreSQL, append-only)
  ┌──────────────────────────────────────────────────────────┐
  │ stream: "entity-550e8400-..."                            │
  │                                                          │
  │  seq │ event_type        │ data                          │
  │    1 │ EntityCreated     │ { id, tenant_id, values… }    │
  │    2 │ AttributeValueSet │ { attribute_id, value… }      │
  │    3 │ EntitiesLinked    │ { target_id… }                │
  └──────────────────────────────────────────────────────────┘
  
Immutable. Nothing is ever updated or deleted. This log is the permanent source of truth — everything else is derived from it.
Step 6

Aggregate state updated in memory


def apply(%__MODULE__{} = e, %EntityCreated{} = ev) do
  %__MODULE__{e |
    id:            ev.id,
    tenant_id:     ev.tenant_id,
    collection_id: ev.collection_id,
    values:        ev.values,
    status:        :active
  }
end
  
apply/2 is pure — no IO, no side effects. The updated struct is held in memory (via Commanded's process registry) for the next command.
Step 7

Event broadcast to Projectors

flowchart LR ES[EventStore] -->|broadcast| P1["EntityProjector (PostgreSQL)"] ES -->|broadcast| P2["ComputeTrigger Projector"] ES -->|broadcast| P3["EntityProjector (Cassandra)"]
Every projector receives every event. Each independently decides what to do. A slow or failed projector does not block the others.
Step 8

Projector writes to the Read Model


defmodule Core.Data.Projectors.EntityProjector do
  use Commanded.Projections.Ecto,
    application: Core.CommandedApp,
    repo:        Core.Repo,
    name:        "entity_projector"

  project(%EntityCreated{} = e, _meta, fn multi ->
    Ecto.Multi.insert(multi, :entity, %Core.Data.Resources.Entity{
      id:            e.id,
      tenant_id:     e.tenant_id,
      collection_id: e.collection_id,
      values:        e.values,
      status:        :active
    })
  end)
end
  
Projectors track their last processed event position — safe to restart, idempotent.
Step 9

Query reads from Read Model via Ash


# Resolver:
Ash.read(Core.Data.Resources.Entity,
  action: :in_collection,
  actor:  ctx,
  params: %{collection_id: collection_id}
)

# Ash resource:
defmodule Core.Data.Resources.Entity do
  use Ash.Resource, domain: Core.Data.Domain,
                    data_layer: AshPostgres.DataLayer

  read :in_collection do
    argument :collection_id, :uuid, allow_nil?: false
    filter expr(collection_id == ^arg(:collection_id) and status == :active)
  end
end
  
No aggregate involved at all. Queries hit the projected table directly.

Full Flow Summary

flowchart LR C([Client]) ES[(EventStore)] RM[(Read Models)] C -->|GraphQL mutation| R[Resolver] R -->|Command| Router[Router] Router --> Exec[Aggregate] Exec -->|Event| ES Exec --> Mem["In-memory state"] ES -->|broadcast| Proj[Projectors] Proj -->|write| RM C -->|GraphQL query| Ash["Ash Resource"] Ash --> RM

Questions

Adding new things to the model

Example: adding Comment on an Entity

  1. Command structAddComment{ id, entity_id, author_id, body }
  2. Event structCommentAdded{ id, entity_id, author_id, body }
  3. Aggregateexecute/2 + apply/2 (~20 lines)
  4. Router entryidentify + dispatch (3 lines)
  5. Migration — create comments table
  6. Ash resourceuse Ash.Resource with attributes and read actions
  7. Projectorproject(%CommentAdded{}, ...) writes to table
  8. Resolver — translate GraphQL ↔ Command / Ash
Each step is isolated and independently testable. The EventStore and existing aggregates are untouched. No migration of existing data.

New projection for a novel database

Example: Dgraph (property graph database) — relevant because our Entity link / backlink attributes are literally a graph. Multi-hop traversals are expensive in PostgreSQL, trivial in Dgraph.


defmodule Core.Data.Projectors.EntityProjector.Dgraph do
  use Commanded.EventHandler, application: Core.CommandedApp,
                              name: "entity_dgraph_projector"

  def handle(%EntityCreated{} = e, _meta) do
    Dlex.mutate(conn(), %{"uid" => "_:#{e.id}",
      "dgraph.type" => "Entity", "tenant_id" => e.tenant_id})
  end

  def handle(%EntitiesLinked{} = e, _meta) do
    Dlex.mutate(conn(), %{"uid" => e.from_id,
      "links" => [%{"uid" => e.to_id}]})
  end

  def handle(_, _), do: :ok
end
  
The EventStore is unchanged. The projector catches up by replaying from the beginning (or latest snapshot) — Commanded tracks each projector's position independently.

How do we scale?

Layer Approach
Elixir / OTP Each aggregate is a process — thousands run concurrently. Cold aggregates load on demand by replaying their stream from last snapshot.
Oban Pro Compute jobs are Oban workers — horizontally scalable, retried automatically. Workflows handle multi-step compute graphs with guaranteed exactly-once execution.
EventStore (PG) Read replicas for subscription fan-out; partition by tenant.
Read model (PG) Read replicas; table partitioning by tenant_id.
Cassandra Add nodes — linear horizontal scale. Partition key = tenant_id gives physical per-tenant isolation.
App nodes Stateless Phoenix nodes behind a load balancer. BEAM clustering via libcluster for distributed aggregate registry.

Failure recovery

Point Failure Recovery
Command dispatch Aggregate process crashes OTP supervisor restarts it; state rebuilt by replaying event stream from snapshot
EventStore write PostgreSQL unavailable Returns {:error, …} to caller; no partial state; safe to retry
Projector Crashes mid-batch Restarts from last confirmed event position (stored in projections table)
Oban job Worker crashes Automatically retried with backoff; max attempts configurable
Read model Corrupted / out of sync Drop and rebuild — replay event stream through the projector from last snapshot
Oban Pro Workflows guarantee durable multi-step execution — each step is persisted before execution. If a node dies mid-workflow, it resumes from the last completed step on restart.

Multi-region deployment

flowchart TD subgraph regionA["Region A - primary write"] PA[Phoenix nodes] --> ES_A[(EventStore)] ES_A --> OB[Oban workers] end subgraph regionB["Region B - read replica"] PB[Phoenix nodes] --> PG_B[(PostgreSQL)] end subgraph regionC["Region C - read replica"] PC[Phoenix nodes] --> CASS[(Cassandra)] end ES_A -->|stream replication| PG_B ES_A -->|stream replication| CASS

All command dispatch routes to the primary region. Reads served from any region. Tenant affinity (consistent hash on tenant_id) avoids distributed transaction complexity.