Projections

WARNING

The programming model for projections was completely rewritten for Marten V4

Marten has a (we hope) strong model for user-defined projections of the raw event data. Projections are used within Marten to create read-side views of the raw event data. The basics of the Marten projection model are shown below:

Projection Class Diagram

Do note that all the various types of aggregated projections all inherit from a common base and have the same core set of conventions.

Projection Types

  1. Aggregate Projections combine either a stream or some other related set of events into a single view.
  2. View Projections are a specialized form of aggregate projections that allow you to aggregate against arbitrary groupings of events across streams.
  3. Event Projections are a recipe for building projections that create or delete one or more documents for a single event
  4. If one of the built in projection recipes doesn't fit what you want to do, you can happily build your own custom projection

Projection Lifecycles

Marten varies a little bit in that projections can be executed with three different lifecycles:

  1. Inline Projections are executed at the time of event capture and in the same unit of work to persist the projected documents
  2. Live Aggregations are executed on demand by loading event data and creating the projected view in memory without persisting the projected documents
  3. Asynchronous Projections are executed by a background process

For other descriptions of the Projections pattern inside of Event Sourcing architectures, see:

Aggregates

Aggregates condense data described by a single stream. As of v1.0, Marten only supports aggregation via .Net classes. Aggregates are calculated upon every request by running the event stream through them, as compared to inline projections, which are computed at event commit time and stored as documents.

The out-of-the box convention is to expose public Apply([Event Type]) methods on your aggregate class to do all incremental updates to an aggregate object. This can be customized using AggregatorLookup.

Sticking with the fantasy theme, the QuestParty class shown below could be used to aggregate streams of quest data:

public class QuestParty
{
    public List<string> Members { get; set; } = new();
    public IList<string> Slayed { get; } = new List<string>();
    public string Key { get; set; }
    public string Name { get; set; }

    // In this particular case, this is also the stream id for the quest events
    public Guid Id { get; set; }

    // These methods take in events and update the QuestParty
    public void Apply(MembersJoined joined) => Members.Fill(joined.Members);
    public void Apply(MembersDeparted departed) => Members.RemoveAll(x => departed.Members.Contains(x));
    public void Apply(QuestStarted started) => Name = started.Name;

    public override string ToString()
    {
        return $"Quest party '{Name}' is {Members.Join(", ")}";
    }
}

snippet source | anchor

New in Marten 1.2 is the ability to use Event<T> metadata within your projections, assuming that you're not trying to run the aggregations inline.

The syntax using the built in aggregation technique is to take in Event<T> as the argument to your Apply(event) methods, where T is the event type you're interested in:

public class QuestPartyWithEvents
{
    private readonly IList<string> _members = new List<string>();

    public string[] Members
    {
        get
        {
            return _members.ToArray();
        }
        set
        {
            _members.Clear();
            _members.AddRange(value);
        }
    }

    public IList<string> Slayed { get; } = new List<string>();

    public void Apply(MembersJoined joined)
    {
        _members.Fill(joined.Members);
    }

    public void Apply(MembersDeparted departed)
    {
        _members.RemoveAll(x => departed.Members.Contains(x));
    }

    public void Apply(QuestStarted started)
    {
        Name = started.Name;
    }

    public string Name { get; set; }

    public Guid Id { get; set; }

    public override string ToString()
    {
        return $"Quest party '{Name}' is {Members.Join(", ")}";
    }
}

snippet source | anchor

Aggregates Across Multiple Streams

Example coming soon, and check Jeremy's blog for a sample soon.

It's possible currently by using either a custom IProjection or using the existing aggregation capabilities with a custom IAggregateFinder<T>, where T is the projected view document type.

Aggregator Lookup

EventGraph.UseAggregatorLookup(IAggregatorLookup aggregatorLookup) can be used to register an IAggregatorLookup that is used to look up IAggregator<T> for aggregations. This allows a generic aggregation strategy to be used, rather than registering aggregators case-by-case through EventGraphAddAggregator<T>(IAggregator<T> aggregator).

A shorthand extension method EventGraph.UseAggregatorLookup(this EventGraph eventGraph, AggregationLookupStrategy strategy) can be used to set default aggregation lookup, whereby

  • AggregationLookupStrategy.UsePublicApply resolves aggregators that use public Apply
  • AggregationLookupStrategy.UsePrivateApply resolves aggregators that use private Apply
  • AggregationLookupStrategy.UsePublicAndPrivateApply resolves aggregators that use public or private Apply

The aggregation lookup can also be set in the StoreOptions.Events.UserAggregatorLookup

// TODO: fix this sample <[sample:register-custom-aggregator-lookup]>

Live Aggregation via .Net

You can always fetch a stream of events and build an aggregate completely live from the current event data by using this syntax:

using (var session = store.OpenSession())
{
    // questId is the id of the stream
    var party = session.Events.AggregateStream<QuestParty>(questId);
    Console.WriteLine(party);

    var party_at_version_3 = await session.Events
        .AggregateStreamAsync<QuestParty>(questId, 3);

    var party_yesterday = await session.Events
        .AggregateStreamAsync<QuestParty>(questId, timestamp: DateTime.UtcNow.AddDays(-1));
}

snippet source | anchor

There is also a matching asynchronous AggregateStreamAsync() mechanism as well. Additionally, you can do stream aggregations in batch queries with IBatchQuery.Events.AggregateStream<T>(streamId).

Inline Projections

First off, be aware that event metadata (e.g. stream version and sequence number) are not available during the execution of inline projections. If you need to use event metadata in your projections, please use asynchronous or live projections.

If you would prefer that the projected aggregate document be updated inline with the events being appended, you simply need to register the aggregation type in the StoreOptions upfront when you build up your document store like this:

var store = DocumentStore.For(_ =>
{
    _.Connection(ConnectionSource.ConnectionString);
    _.Events.TenancyStyle = tenancyStyle;
    _.DatabaseSchemaName = "quest_sample";
    if (tenancyStyle == TenancyStyle.Conjoined)
    {
        _.Schema.For<QuestParty>().MultiTenanted();
    }

    // This is all you need to create the QuestParty projected
    // view
    _.Projections.SelfAggregate<QuestParty>();
});

snippet source | anchor

At this point, you would be able to query against QuestParty as just another document type.

Rebuilding Projections

Projections need to be rebuilt when the code that defines them changes in a way that requires events to be reapplied in order to maintain correct state. Using an IDaemon this is easy to execute on-demand:

Refer to Rebuilding Projections for more details.

WARNING

Marten by default while creating new object tries to use default constructor. Default constructor doesn't have to be public, might be also private or protected.

If class does not have the default constructor then it creates an uninitialized object (see here for more info)

Because of that, no member initializers will be run so all of them need to be initialized in the event handler methods.