Event Store Projection in C#

To process the irresponsible gambler alarms generated in our last post, we’ll be implementing an Event Store projection in C#. Why C# and not JavaScript, you ask? It’s true that this projection could be written in any language, but since there are plenty of C# projects and developers around the world, it only makes sense to demonstrate how to write an Event Store projection in C#.

Projection IrresponsibleGamblerAlarmPublisher

To retrieve the generated alarms, we’ll subscribe to all the events of the IrresponsibleGamblingAlarms stream. When we receive an alarm, we handle it by publishing it on a bus. After processing, we store the position of the last processed event in a checkpoint stream. These checkpoints allow us to continue from the last processed event in the event the projection is restarted or the connection to Event Store is dropped. In this example, we store a checkpoint every time an event is processed; however, if we need to reduce the load on Event Store, we can instead store checkpoints every, say, 1000 events.

In order to handle the generated alarm in this example, we will publish the generated alarms to a bus. This could be NServiceBus, some internal bus, or any other kind of bus we prefer. The advantage of this is that we can have multiple handlers for the same event.

C# implementation

Starting the projection

When the projection starts, it reads the last position from the checkpoint stream, which is passed as a starting point to SubscribeToStreamFrom. If no checkpoint is found, null is passed instead, indicating that the subscription should be read from the start.

Handling the event

When an event is received, it is serialized to a typed irresponsiblegamblerdetected event and published on a (fake) bus. The position of the last handled event is then stored to the checkpoint stream. The first time a position is stored, the checkpoint stream’s metadata is set to $maxCount: 1. This indicates that we only need one event in the stream (i.e. the last one) and that the other events are safe to remove when the database is scavenged.

What if the projection crashes?

On the rare occasion our projection crashes after publishing to the bus but before storing a checkpoint, some events will end up published to the bus twice. Therefore, we can only guarantee that events are published at least once, and all our handlers should be idempotent or otherwise ready to handle the same events twice.

Concurrency

Because we will have only one projection writing to the CheckpointStream, and from a single thread at that, we can safely ignore concurrency checks when we write the projection using ExpectedVersion.Any.

Reconnecting

If the TCP connection drops, all we need to do to reconnect is call the Connect() method again. The projection will wait until the connection is reestablished, and then it will continue processing events as usual. This, of course, assumes that we’ve specified the KeepReconnecting connection setting when creating our Event Store connection. Otherwise, we won’t be able to reconnect at all if the connection drops.

Source code

A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples

EventStore Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Event Store Projections – Temporal projection for generating alarms

To detect whether a player is gambling irresponsibly, we total the amounts won and lost over all games played in the last 24 hours. If this amount exceeds 500€, we generate an alarm – but since we don’t want to spam our players, we only do so if the previous alarm was more than 24 hours ago.

Projection IrresponsibleGamblingDetector

The projection takes all GameWon and GameLost events from all Player streams as input and stores all games from the last 24 hours in its state. When the amount of money a player loses exceeds 500€ in a 24-hour span, it generates an alarm by emitting an event to the IrresponsibleGambleAlarm stream.

Projection IrresponsibleGamblingDetector-Events

JavaScript Implementation

The irresponsibleGamblingDetector module implements the core projection logic:

The init method generates the initial state, which contains the timestamp of the last generated alarm and an empty list of the games that player has played in the last 24 hours.

The processGameLost and processGameWon methods process the corresponding events. First, they update the list of games played in the last 24 hours, removing games that were played more than 24 hours ago and adding newly received games. Next, they check whether an alarm was generated in the last 24 hours: if this is the case, processing halts immediately. Finally, they calculate the total amount won or lost in all games over the last 24 hours, and if the amount lost exceeds the threshold, then an IrresponsibleGamblerDetected event is emitted to the IrresponsibleGamblingAlarms stream.

Since we can only guarantee that events are processed at least once, we also have to ensure that our projection is idempotent. We need to check whether an incoming event has already been processed, or else we’ll get strange results if we process the results of the same game twice (or more). By storing the GameIds of each game we process in the state, we can determine whether we’ve already seen a specific game or not before we process it.

This module is instantiated once globally and then referenced from the projection’s definition:

This definition indicates that the projection is only interested in GameLost and GameWon events from Player streams.

Testing the projection

To test the projection, we’ll define four scenarios. The first will test a lost game, but without exceeding the threshold, so no alarm will be generated. The second will assert that an alarm is generated when the threshold is exceeded. The third scenario will also exceed the threshold, but in more than 24 hours, so no alarm will be generated. The last scenario will check for event duplication.

Once again, we see the power of event processing testing. We can arrange messages in the past and future without any issue.

Processing the generated alarms

Since we want to send email and text messages based on the output of this projection, we’ll be implementing a projection in C# to handle the alarms.

Source code

A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples

Event Store Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Event Store Projections – Distributing events to other streams

Since the GameOver events are appended to the Game stream, we need a way to distribute them to all participating players’ streams. The GameOver event contains a list of participating players, and the distributor emits an event for each player. The event type emitted depends on how much the player has won or lost: if the amount is less than zero, a GameLost event is emitted, but if the amount is greater than zero, a GameWon event is emitted instead. (For simplicity’s sake, we’ll assume GameDraw events don’t exist.)

Note that we don’t technically have to split the event in two to implement our alarm system, but it’s useful for the purposes of this example.

Projection GameOverToPlayerDistributor

This projection takes GameOver events from all Game streams as input. For each GameOver event, it will loop over the list of players and distribute (i.e. emit) GameLost and GameWon events to the players’ streams as appropriate.

Projection-GameOverToPlayerDistributor-Events

JavaScript implementation

This projection module has a single public method, process, which processes the GameOver events by looping over the participating players and emitting the appropriate events:

This module is instantiated once globally and then referenced from the projection’s definition:

Testing the projection

To test this projection, we’ll define three scenarios. The first will arrange an event with a positive amount and assert that a GameWon event is emitted. The second will arrange an event with a negative amount and assert that a GameLost event is emitted. The last will check for a combination of both.

Querying the results with the Client API

The results of this projection are used solely as input for another stream, so there’s no need to read them with the C# Client API.

Source code

A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples

Event Store Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Event Store Projections – The irresponsible gambler

Imagine a world where online gambling sites were nice enough to notify you when you’re gambling irresponsibly: if you’d spend too much money, they’d take notice and send you an alert via email or text message. Of course, online gambling sites bank on people being just so frivolous with their money, but there’s nothing stopping us from dreaming – or from implementing such a system as our next example.

First off, we’ll assume we have a Game aggregate which raises a GameOver that contains a list of the players that participated in the game and the amounts they lost or won.

Our irresponsible gambler alarm system needs three projections. The first projection will distribute the GameOver events to GameWon and GameLost events emitted to all participating players. The second projection is a temporal projection that will generate alarms based on the results of all games played in the last 24 hours. The last projection, implemented in C#, will send emails and text messages to any players who lost more than 500€ in the last 24 hours, our eponymous irresponsible gamblers. To process these alarms, we will publish all alarms generated in the second projection on a bus in C#.

You can find the projections here:

EventStore Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#