A .NET 8 library built on top of Akka.NET and Akka.Streams for building event-sourcing projections. It reads a stream of events, routes them to per-document actor pipelines, and persists the resulting read-model documents — all with configurable batching, restarts, and storage backends.
| Package | Description |
|---|---|
MJ.Akka.Projections |
Core library — projection pipeline, in-memory storage, event/position batching |
MJ.Akka.Projections.Storage.RavenDb |
RavenDB storage backend and RavenDbProjection base class |
MJ.Akka.Projections.Storage.InfluxDb |
InfluxDB storage backend |
MJ.Akka.Projections.Cluster.Sharding |
Cluster Singleton and Sharded Daemon coordinator modes |
MJ.Akka.Projections.TestKit |
ProjectionTestKit base class for xUnit tests |
- .NET 8
- An Akka.NET
ActorSystem - A storage backend (in-memory, RavenDB, …)
Subclass the storage-specific base class, implement Configure to wire events to handlers, and implement StartSource to provide the upstream event stream.
// RavenDB-backed projection
public class OrderProjection : RavenDbProjection<OrderDocument>
{
private readonly ActorSystem _system;
public OrderProjection(ActorSystem system) => _system = system;
public override ISetupProjectionHandlers<SimpleIdContext<string>, RavenDbProjectionContext<OrderDocument, SimpleIdContext<string>>>
Configure(ISetupProjection<SimpleIdContext<string>, RavenDbProjectionContext<OrderDocument, SimpleIdContext<string>>> config)
{
return config
.On<OrderPlaced>().WithId(e => e.OrderId)
.CreateDocument(e => new OrderDocument { Id = e.OrderId, Status = "Placed" })
.On<OrderShipped>().WithId(e => e.OrderId)
.ModifyDocument((e, doc) => doc! with { Status = "Shipped" });
}
public override Source<EventWithPosition, NotUsed> StartSource(long? fromPosition)
{
// Return your Akka.Streams Source of events here.
// fromPosition is the last stored position, or null if starting fresh.
return MyEventStore.ReadAll(fromPosition);
}
}For an in-memory projection (useful for tests and benchmarks):
public class OrderInMemoryProjection : InMemoryProjection<SimpleIdContext<string>, OrderDocument>
{
public override ISetupProjectionHandlers<...> Configure(...) { ... }
public override Source<EventWithPosition, NotUsed> StartSource(long? fromPosition) { ... }
}Call the Projections extension method on your ActorSystem:
IProjectionsCoordinator coordinator = await actorSystem
.Projections(config => config
.WithProjection(new OrderProjection(actorSystem)),
storageSetup) // e.g. new SetupRavenDbStorage(documentStore, new BulkInsertOptions())
.Start();IProjectionProxy proxy = coordinator.Get("OrderProjection")!;
// Wait until the source stream completes (one-time projections)
await proxy.WaitForCompletion(timeout: TimeSpan.FromSeconds(30));
// Stop the projection
await proxy.Stop();Inside Configure, chain calls to build the handler pipeline:
config
// Map an event to a document id
.On<MyEvent>().WithId(e => e.Id)
// Option A – mutate the document (RavenDb / InMemory helpers)
.ModifyDocument((evnt, doc) => doc! with { Name = evnt.Name })
// Option B – full async handler
.WhenAny(h => h.HandleWith(async (evnt, context, position, ct) =>
{
context.Document!.Name = evnt.Name;
}))
// Transform one event into multiple before routing
.On<OriginalEvent>().Transform(e =>
ImmutableList.Create<object>(new DerivedEvent(e.Id), new AnotherEvent(e.Id)))
// Filter: only run handlers when a condition is met
.On<MyEvent>().WithId(e => e.Id)
.When(filter => filter.WithEventFilter(e => e.IsRelevant), h => h.HandleWith(...))Multiple .HandleWith calls on the same .On<> chain execute in order.
Sometimes routing or handling an event requires data that isn't in the event itself — for example, looking up a related document in a database. Use .WithData to fetch that data once per event; it is then carried alongside the event through WithId and into the handler, so no extra round-trips occur.
config
.On<OrderShipped>()
.WithData(async evnt => await orderRepository.LoadAsync(evnt.OrderId))
.WithId((evnt, order) => new SimpleIdContext<string>(order.CustomerId)) // data available here
.WhenAny(h => h.HandleWith(async (evnt, ctx, order, position, ct) => // and here
{
ctx.ModifyDocument(doc =>
{
doc ??= new CustomerDocument { Id = order.CustomerId };
doc.ShippedOrders = doc.ShippedOrders.Add(evnt.OrderId);
return doc;
});
}))You can also use the fetched data inside Transform to decide which derived events to produce:
config
.On<OrderPlaced>()
.WithData(async evnt => await catalogService.GetProductAsync(evnt.ProductId))
.Transform((evnt, product) => product.RequiresWarehouseUpdate
? ImmutableList.Create<object>(new WarehouseReservation(evnt.OrderId, product.Sku))
: ImmutableList<object>.Empty)How it works: getData is called exactly once per event, immediately after the transform/flatten step. The result is bundled into an internal envelope that travels through the routing and handler stages — no caching or repeated fetches.
| Extension | Description |
|---|---|
.CreateDocument(e => new Doc()) |
Creates the document when the event arrives |
.ModifyDocument((e, doc) => ...) |
Modifies an existing document |
.DeleteDocument() |
Marks the document for deletion |
.SetMetadata(key, value) |
Sets RavenDB document metadata |
All options are set via the fluent Projections(config => ...) builder:
actorSystem.Projections(config => config
// Restart the stream on failure
.WithRestartSettings(RestartSettings.Create(
minBackoff: TimeSpan.FromSeconds(3),
maxBackoff: TimeSpan.FromSeconds(30),
randomFactor: 0.2))
// Group upstream events before entering the sequencer (default: 100 items / 1 s)
.WithEventBatchingStrategy(new BatchWithinEventBatchingStrategy(maxItems: 200, timeout: TimeSpan.FromSeconds(2)))
// Batch position writes (default: same as above)
.WithPositionStorageBatchingStrategy(new BatchWithinEventPositionBatchingStrategy(200, TimeSpan.FromSeconds(2)))
// Add batched document writes (reduces storage round-trips)
.WithBatchedStorage(parallelism: 4)
.WithProjection(new OrderProjection(actorSystem)),
storageSetup);| Type | Behaviour |
|---|---|
BatchWithinEventBatchingStrategy (default) |
Groups up to N events or until a timeout elapses |
BatchEventBatchingStrategy |
Groups exactly N events |
NoEventBatchingStrategy |
No batching — each event is processed individually |
| Type | Behaviour |
|---|---|
BatchWithinEventPositionBatchingStrategy (default) |
Persists the position up to N times or until a timeout |
NoBatchingPositionStrategy |
Persists after every event |
| Type | Behaviour |
|---|---|
BufferWithinStorageBatchingStrategy |
Batches writes by count + time window |
BatchSizeStorageBatchingStrategy |
Batches writes by count only |
No external dependencies — stores documents in a ConcurrentDictionary. Ideal for tests.
var storageSetup = new SetupInMemoryStorage();var storageSetup = new SetupRavenDbStorage(documentStore, new BulkInsertOptions());Projection position is stored in a dedicated ProjectionPosition document in RavenDB.
Coordinator modes control how and where the projection actor runs.
A single coordinator actor per projection, running inside the current process.
// This is the default — no extra configuration needed.Runs the coordinator as an Akka Cluster Singleton so only one node in the cluster drives the projection at a time. Requires MJ.Akka.Projections.Cluster.Sharding.
config.WithCoordinator(new ClusterSingletonProjectionCoordinator.Setup(
actorSystem,
ClusterSingletonManagerSettings.Create(actorSystem)))Distributes projections across the cluster using Akka ShardedDaemonProcess. Each projection is assigned to a node; if that node leaves, another picks it up.
config.WithCoordinator(new ShardedDaemonProjectionCoordinator.Setup(
actorSystem,
name: "projections",
ShardedDaemonProcessSettings.Create(actorSystem)))The framework spawns one actor per unique document id. To cap memory usage, configure passivation:
// Default: keep the 1,000 most-recently-active projectors
config.WithProjectionFactory(
new KeepTrackOfProjectorsInProc(actorSystem, new MaxNumberOfProjectorsPassivation(maxNumberOfProjectors: 500)))
// Or keep all projectors alive indefinitely
config.WithProjectionFactory(
new KeepTrackOfProjectorsInProc(actorSystem, new KeepAllProjectors()))ActorSystem.Projections(...)
└── IConfigureProjectionCoordinator
└── ProjectionsCoordinator (ReceiveActor)
Reads last position → materialises Source<EventWithPosition>
Groups events via IEventBatchingStrategy
└── ProjectionSequencer (ReceiveActor)
Transform events → flatten
PrepareEvent (fetches WithData payload, wraps in envelope)
Route by id, serialise per id, parallelise across ids
└── DocumentProjection (ReceiveActor, one per id)
Loads context from storage
Runs registered handlers (unwraps envelope if present)
Saves updated context
Advances stream position via IEventPositionBatchingStrategy
Implement IStorageSetup to plug in any storage backend:
public class MyStorageSetup : IStorageSetup
{
public IProjectionStorage CreateProjectionStorage() => new MyProjectionStorage();
public IProjectionPositionStorage CreatePositionStorage() => new MyPositionStorage();
}IProjectionStorage persists the read-model documents; IProjectionPositionStorage persists the last processed stream position so the projection can resume after a restart.
Use ProjectionTestKit<TIdContext, TContext, TStorageSetup> from MJ.Akka.Projections.TestKit as the base class for xUnit tests:
public class OrderProjectionTests
: ProjectionTestKit<SimpleIdContext<string>, InMemoryProjectionContext<SimpleIdContext<string>, OrderDocument>, SetupInMemoryStorage>
{
protected override IProjection<...> GetProjectionToTest() => new OrderInMemoryProjection();
protected override Task Given() =>
// Publish events into the test source
PublishEvent(new OrderPlaced("order-1"));
protected override Task Then()
{
var doc = GetDocument<OrderDocument>("order-1");
Assert.Equal("Placed", doc.Status);
return Task.CompletedTask;
}
}The kit wires up the projection, runs it to completion, and then calls Then().