Event Store Projection in C#
To process the irresponsible gambler alarms generated in our last post, we’ll be implementing an Event Store projection in C#. Why C# and not JavaScript, you ask? It’s true that this projection could be written in any language, but since there are plenty of C# projects and developers around the world, it only makes sense to demonstrate how to write an Event Store projection in C#.
To retrieve the generated alarms, we’ll subscribe to all the events of the IrresponsibleGamblingAlarms stream. When we receive an alarm, we handle it by publishing it on a bus. After processing, we store the position of the last processed event in a checkpoint stream. These checkpoints allow us to continue from the last processed event in the event the projection is restarted or the connection to Event Store is dropped. In this example, we store a checkpoint every time an event is processed; however, if we need to reduce the load on Event Store, we can instead store checkpoints every, say, 1000 events.
In order to handle the generated alarm in this example, we will publish the generated alarms to a bus. This could be NServiceBus, some internal bus, or any other kind of bus we prefer. The advantage of this is that we can have multiple handlers for the same event.
C# implementation
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 | public class IrresponsibleGamblerAlarmPublisher { private const string CheckpointStream = "$publisher-IrresponsibleGamblerAlarmPublisher-checkpoint"; private const string AlarmStream = "IrresponsibleGamblingAlarms"; private readonly IEventStoreConnection _eventStoreConnection; private readonly IBus _bus; private readonly IConsole _console; private bool _running; public IrresponsibleGamblerAlarmPublisher(IEventStoreConnection eventStoreConnection, IBus bus, IConsole console) { if (eventStoreConnection == null) throw new ArgumentNullException("eventStoreConnection"); if (bus == null) throw new ArgumentNullException("bus"); if (console == null) throw new ArgumentNullException("console"); _eventStoreConnection = eventStoreConnection; _bus = bus; _console = console; } public void Start() { if (_running) throw new InvalidOperationException("Projection already running"); _running = true; Connect(); } public void Stop() { if (!_running) throw new InvalidOperationException("Projection not running"); _running = false; } private void Connect() { var position = GetLastCheckpoint(CheckpointStream); _eventStoreConnection.SubscribeToStreamFrom(AlarmStream, position, true, ProcessEvent, userCredentials: EventStoreCredentials.Default, subscriptionDropped: TryToReconnect); } private void TryToReconnect(EventStoreCatchUpSubscription catchUpSubscription, SubscriptionDropReason reason, Exception exception) { _console.Error("Projection subscription dropped: " + reason, exception); Connect(); } private void ProcessEvent(EventStoreCatchUpSubscription subscribtion, ResolvedEvent resolvedEvent) { var alarm = resolvedEvent.ParseJson<IrresponsibleGamblerDetected>(); Publish(alarm); StoreCheckpoint(resolvedEvent); } private void Publish(IrresponsibleGamblerDetected alarm) { _bus.Publish(alarm); } private void StoreCheckpoint(ResolvedEvent resolvedEvent) { var eventNumber = resolvedEvent.Event.EventNumber; var checkpoint = new IrresponsibleGamblerAlarmPublisherCheckpoint(eventNumber) .AsJsonEvent(); SetCheckpointStreamMaxCount(eventNumber); _eventStoreConnection.AppendToStream(CheckpointStream, ExpectedVersion.Any, EventStoreCredentials.Default, checkpoint); } private void SetCheckpointStreamMaxCount(int eventNumber) { if (eventNumber != 0) return; var metadata = StreamMetadata.Build().SetMaxCount(1); _eventStoreConnection.SetStreamMetadata(CheckpointStream, ExpectedVersion.Any, metadata, EventStoreCredentials.Default); } private int? GetLastCheckpoint(string stateStream) { var state = _eventStoreConnection.GetLastEvent<IrresponsibleGamblerAlarmPublisherCheckpoint>(stateStream); return state != null ? state.LastEventProcessed : (int?) null; } } |
Starting the projection
When the projection starts, it reads the last position from the checkpoint stream, which is passed as a starting point to SubscribeToStreamFrom. If no checkpoint is found, null is passed instead, indicating that the subscription should be read from the start.
Handling the event
When an event is received, it is serialized to a typed irresponsiblegamblerdetected event and published on a (fake) bus. The position of the last handled event is then stored to the checkpoint stream. The first time a position is stored, the checkpoint stream’s metadata is set to $maxCount: 1. This indicates that we only need one event in the stream (i.e. the last one) and that the other events are safe to remove when the database is scavenged.
What if the projection crashes?
On the rare occasion our projection crashes after publishing to the bus but before storing a checkpoint, some events will end up published to the bus twice. Therefore, we can only guarantee that events are published at least once, and all our handlers should be idempotent or otherwise ready to handle the same events twice.
Concurrency
Because we will have only one projection writing to the CheckpointStream, and from a single thread at that, we can safely ignore concurrency checks when we write the projection using ExpectedVersion.Any.
Reconnecting
If the TCP connection drops, all we need to do to reconnect is call the Connect() method again. The projection will wait until the connection is reestablished, and then it will continue processing events as usual. This, of course, assumes that we’ve specified the KeepReconnecting connection setting when creating our Event Store connection. Otherwise, we won’t be able to reconnect at all if the connection drops.
1 2 3 4 5 6 | var settings = ConnectionSettings.Create() .KeepReconnecting() .UseConsoleLogger(); var connection = EventStoreConnection.Create(settings, IPEndPointFactory.DefaultTcp()); connection.Connect(); |
Source code
A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples
EventStore Projections by Example
This post is part of a series: