Posts

Event Store Projection in C#

To process the irresponsible gambler alarms generated in our last post, we’ll be implementing an Event Store projection in C#. Why C# and not JavaScript, you ask? It’s true that this projection could be written in any language, but since there are plenty of C# projects and developers around the world, it only makes sense to demonstrate how to write an Event Store projection in C#.

Projection IrresponsibleGamblerAlarmPublisher

To retrieve the generated alarms, we’ll subscribe to all the events of the IrresponsibleGamblingAlarms stream. When we receive an alarm, we handle it by publishing it on a bus. After processing, we store the position of the last processed event in a checkpoint stream. These checkpoints allow us to continue from the last processed event in the event the projection is restarted or the connection to Event Store is dropped. In this example, we store a checkpoint every time an event is processed; however, if we need to reduce the load on Event Store, we can instead store checkpoints every, say, 1000 events.

In order to handle the generated alarm in this example, we will publish the generated alarms to a bus. This could be NServiceBus, some internal bus, or any other kind of bus we prefer. The advantage of this is that we can have multiple handlers for the same event.

C# implementation

Starting the projection

When the projection starts, it reads the last position from the checkpoint stream, which is passed as a starting point to SubscribeToStreamFrom. If no checkpoint is found, null is passed instead, indicating that the subscription should be read from the start.

Handling the event

When an event is received, it is serialized to a typed irresponsiblegamblerdetected event and published on a (fake) bus. The position of the last handled event is then stored to the checkpoint stream. The first time a position is stored, the checkpoint stream’s metadata is set to $maxCount: 1. This indicates that we only need one event in the stream (i.e. the last one) and that the other events are safe to remove when the database is scavenged.

What if the projection crashes?

On the rare occasion our projection crashes after publishing to the bus but before storing a checkpoint, some events will end up published to the bus twice. Therefore, we can only guarantee that events are published at least once, and all our handlers should be idempotent or otherwise ready to handle the same events twice.

Concurrency

Because we will have only one projection writing to the CheckpointStream, and from a single thread at that, we can safely ignore concurrency checks when we write the projection using ExpectedVersion.Any.

Reconnecting

If the TCP connection drops, all we need to do to reconnect is call the Connect() method again. The projection will wait until the connection is reestablished, and then it will continue processing events as usual. This, of course, assumes that we’ve specified the KeepReconnecting connection setting when creating our Event Store connection. Otherwise, we won’t be able to reconnect at all if the connection drops.

Source code

A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples

EventStore Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Event Store Projections – Temporal projection for generating alarms

To detect whether a player is gambling irresponsibly, we total the amounts won and lost over all games played in the last 24 hours. If this amount exceeds 500€, we generate an alarm – but since we don’t want to spam our players, we only do so if the previous alarm was more than 24 hours ago.

Projection IrresponsibleGamblingDetector

The projection takes all GameWon and GameLost events from all Player streams as input and stores all games from the last 24 hours in its state. When the amount of money a player loses exceeds 500€ in a 24-hour span, it generates an alarm by emitting an event to the IrresponsibleGambleAlarm stream.

Projection IrresponsibleGamblingDetector-Events

JavaScript Implementation

The irresponsibleGamblingDetector module implements the core projection logic:

The init method generates the initial state, which contains the timestamp of the last generated alarm and an empty list of the games that player has played in the last 24 hours.

The processGameLost and processGameWon methods process the corresponding events. First, they update the list of games played in the last 24 hours, removing games that were played more than 24 hours ago and adding newly received games. Next, they check whether an alarm was generated in the last 24 hours: if this is the case, processing halts immediately. Finally, they calculate the total amount won or lost in all games over the last 24 hours, and if the amount lost exceeds the threshold, then an IrresponsibleGamblerDetected event is emitted to the IrresponsibleGamblingAlarms stream.

Since we can only guarantee that events are processed at least once, we also have to ensure that our projection is idempotent. We need to check whether an incoming event has already been processed, or else we’ll get strange results if we process the results of the same game twice (or more). By storing the GameIds of each game we process in the state, we can determine whether we’ve already seen a specific game or not before we process it.

This module is instantiated once globally and then referenced from the projection’s definition:

This definition indicates that the projection is only interested in GameLost and GameWon events from Player streams.

Testing the projection

To test the projection, we’ll define four scenarios. The first will test a lost game, but without exceeding the threshold, so no alarm will be generated. The second will assert that an alarm is generated when the threshold is exceeded. The third scenario will also exceed the threshold, but in more than 24 hours, so no alarm will be generated. The last scenario will check for event duplication.

Once again, we see the power of event processing testing. We can arrange messages in the past and future without any issue.

Processing the generated alarms

Since we want to send email and text messages based on the output of this projection, we’ll be implementing a projection in C# to handle the alarms.

Source code

A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples

Event Store Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Event Store Projections – The irresponsible gambler

Imagine a world where online gambling sites were nice enough to notify you when you’re gambling irresponsibly: if you’d spend too much money, they’d take notice and send you an alert via email or text message. Of course, online gambling sites bank on people being just so frivolous with their money, but there’s nothing stopping us from dreaming – or from implementing such a system as our next example.

First off, we’ll assume we have a Game aggregate which raises a GameOver that contains a list of the players that participated in the game and the amounts they lost or won.

Our irresponsible gambler alarm system needs three projections. The first projection will distribute the GameOver events to GameWon and GameLost events emitted to all participating players. The second projection is a temporal projection that will generate alarms based on the results of all games played in the last 24 hours. The last projection, implemented in C#, will send emails and text messages to any players who lost more than 500€ in the last 24 hours, our eponymous irresponsible gamblers. To process these alarms, we will publish all alarms generated in the second projection on a bus in C#.

You can find the projections here:

EventStore Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

EventStore Projections – Calculating daily averages

This projection will calculate the daily average consumption per device. We take our Device streams as input and store an intermediate calculation in the state for each one. This calculation contains the total sum of measurements for the current day. The projection’s result is a new stream for each device which contains a new calculated average for each day.

Projection-MeasurementReadAveragePerDayCalculator

The $init method initializes the state. Each subsequent MeasurementRead event will invoke the calculate method, which will update the state with the total number of measurements, the sum of the measurements, and the timestamp of the last measurement. When we determine from the timestamp that a full day has passed, we emit an event which contains the date, the sum, and the calculated average, obtained by dividing the sum of the measurements by the number of measurements.

Projection-MeasurementReadAveragePerDayCalculator-Events

JavaScript implementation

First, we implement a daily timestamp formatter which will handle formatting the date part of a JavaScript Date object. Normally, we’d use an existing library such as momentjs to format the date part, but Event Store doesn’t currently let us include external libraries. One workaround for this would be to inject the library in the projection source when the projection is created. I might cover this option in a future blog post, but for now, we’ll have to roll our own function.

Next, we implement the projection module with the init and the cacluate methods:

Just like previous projections, we instantiate the calculator module once globally and then reference it in the projection’s definition:

Again we use the fromCategory(“Device”) definition to tell the engine that this projection’s events come from all streams in the Device category. Streams are categorized based on the stream name before the final dash by default: for example, Device-0 and Device-1 are both in the Device category. To enable categories, the $stream_by_category and $by_category system projections should be enabled. We delegate $init and MeasurementRead event handling to the calculator module, and we’re done.

Testing the projection

First, we test the formatting of the date part of the timestamp:

Next, we define the scenarios for testing the projection module:

The first two examples are almost too simple to be worth writing tests for at all, but this example demonstrates how easy it is to test projections with JavaScript and Jasmine. All you have to do is call the projection module with the event stream as its input and then check whether the resulting state matches the expected state. We can also verify the emitted events by testing whether the mocked emit methods are called with the correct events.

Querying the results with the Client API

All new events are emitted to a stream, so reading them is straightforward:

The implementation of the ReadStreamEventsBackward method is explained in the Client API post.

Conclusion

This a fairly naive implementation. First of all, the calculation doesn’t handle processing the same event twice properly: it will just add the value to the total twice, throwing off the average. It also expects events in some semblance of chronological order: if it receives an event whose timestamp indicates that a day has passed before it’s actually processed all the events for the day, then the emitted average event will be erroneous. Finally, it will fail to calculate the average entirely if a device stops sending messages during the day. To fix this, we’ll need to implement some kind of timeout mechanism. I’ll cover how to fix these issues in a future post.

Source code

A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples

Event Store Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Event Store Projections – Partitioning events based on data found in previous events

This example is a variation on the default indexing projection described on the Event Store blog. Another example of partition projections can be found on Rob Aston’s blog.

Projection-DeviceTypePartitioner

An indexing projection partitions events from multiple streams by linking said events to new streams to based on common properties. These new streams can be used as input for another projection or to retrieve the events from the store in a single read operation. In this example, all MeasurementRead events from all streams in the Device category are partitioned based on their DeviceType. The difference between this example and a basic indexing projection is that the DeviceType property used for the partitioning is not available in every event: it is only available in the first DeviceConfigured event that happens before the MeasurementRead events. So in order to create the index, we store the DeviceType in the projection’s state (per aggregate stream) when the projection handles the DeviceConfigured event. This allows the projection to link each subsequent MeasurementRead event to the corresponding DeviceType-* stream.
Projection-DeviceTypePartitioner-Events

JavaScript implementation

The projection contains two methods: storeDeviceType, which stores the DeviceType in the state, and linkToDeviceType, which links the subsequent events to the DeviceType-* stream. This module accepts an $eventServices argument in its constructor which is used to pass in a mock object when the module is tested. This mock object is used to verify whether the linkTo method is called with the correct argument. When no $eventServices are passed in, the default Event Store linkTo implementation is used.

The partitioner module is instantiated once globally and is referenced from the projection’s definition:

The fromCategory(“Device”) definition tells the engine that this projection’s events come from all streams in the Device category. Streams are categorized based on the stream name before the final dash by default: for example, Device-0 and Device-1 are both in the Device category. To enable categories, the $stream_by_category and $by_category system projections should be enabled.

Next, we delegate the handling of the DeviceConfigured and MeasurementRead events to the partitioner module:

Testing the projection

The first scenario tests how the DeviceType is stored and verifies that the state is updated correctly. The other scenarios test how the MeasurementRead events are linked, validating the cases where the DeviceType is not set and the resulting state after a call to the linkTo method of the $projectionServices mock object.

Even though this project requires practically no testing at all, it’s still a good example for how to use the Jasmine JavaScript test framework’s mock objects.

Querying the results with the Client API

Since the partitioned events are all linked to streams, reading them is straightforward. All we have to do is subscribe to the streams’ updates:

In most cases, the output of an indexing projection will be used as the input for another projection. Combining projections in this fashion provides us with a powerful way to process events. Future examples will show you how this works in practice.

Source code

A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples

EventStore Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Event Store Projections – Counting events of a specific type

Our first Event Store Projections example simply counts the number of events of a specific type. Pretty mundane, I know, but it’s a good place to start.

Projection-MeasurementRead

In this example, the projection will take all MeasurementRead events from all streams and increase a counter in its global state for each one. The end result is a global state containing the number of MeasurementRead events.

Projection-MeasurementReadCounter-Events

JavaScript implementation

The projection logic is implemented in a self-revealing module. This pattern allows us to decide which methods of a module should be accessible from outside, much like creating a class with public and private methods. The pattern is probably overkill for this projection, since it only has public methods, but it allows us to create unit tests for its logic:

The init method initializes the state for the first time by setting the count property to 0. The increase method increases the count property by 1 when called.

This module is instantiated once globally and then referenced from the projection’s definition:

The fromAll definition tells the engine that the projection’s events come from all streams. Inside, we delegate $init and MeasurementRead to our counter module implementation. The $init field defines the method that initializes the projection’s state, and the MeasurementRead field defines that all MeasurementRead events are to be handled by the counter.increase method.

producesResults is set to true so that the projection’s state is stored in a stream. This allows us to subscribe to projection state updates without polling and use the output of the projection as input for other projections, as I’ll demonstrate later. In the future, we can replace this with the outputState method.

Testing the projection

I used Jasmine to test this logic. Jasmine is a really nice BDD framework for testing JavaScript code that encourages developers to write readable test scenarios:

The test contains two cases: the first one verifies the initial state of the projection, and the second one verifies the count property is increased by 1 for each call of the increase method. The first two lines define the referenced JavaScript files, which enables Intellisense in Visual Studio and allows R# to run the tests from your IDE.

Querying the results with the Client API

To read the value of the projection, we use the getState method of the ProjectionsManager class.

Since we specified that the output of the state should be stored in a stream, we can subscribe to event updates without polling:

Source code

A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples

Event Store Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Event Store Client API Basics (C#)

Here I’ll give a overview on how to use the Event Store Client API from C#. I’ll be highlighting the features used to implement the examples described in the following blog posts. If you’re only interested in projections written in JavaScript, you can safely skip this post and go directly to the first example.

Running Event Store

These examples are based on the latest main branch version available at the time of writing (RC3.0). Since you have to run the Client API against the same version of Event Store, I’ve included the binaries in the github repository. This means you can check out the examples and run them without worrying about version compatibility or building Event Store yourself.

To run the examples, run Event Store as an administrator with –run-projections=all. The following command will run Event Store with projections enabled and an in-memory database:

Event Store Client API Connections

Event Store is accessible through two protocols: HTTP and TCP. The default HTTP endpoint is port 1113 and the default TCP endpoint to 2113, though both of these can be modified from the command line using –tcp-port and –http-port:

Creating a connection

The Event Store Client API  connection is accessible through the IEventStoreConnection interface and connects to the TCP endpoint:

Most connection methods accept credentials for connecting to Event Store. These examples always use the default admin account:

Converting events to and from JSON

These examples use static-typed events which are serialized to JSON when they are sent to Event Store. When they are retrieved from the database, they are deserialized from JSON into static-typed events. Two extension methods take care of this process:

These methods use JSON.NET for JSON serialization.

Appending an event to a stream

Now that we’ve created a connection and know how to serialize and deserialize events, we can append an event to a stream. We don’t have to create the stream first in this case: if we try to append an event to a stream that doesn’t exist, the stream is automatically created for us.

Note that concurrency control in Event Store is handled through versioning. In this example, we skip concurrency control by passing ExpectedVersion.Any when an event is appended to a stream.

Getting the last event number from a stream

To read events from a stream in reverse order, we need to know the number of the last event in the stream. To get this number, we ask for the last event of the stream by passing an EventNumber of -1 when calling the ReadEvent method:

Getting events from a stream in reverse order

Now we can read events in reverse order starting from the number of the last event. This example reads events in pages of 10 at a time:

Subscribing to all new events

We can use the connection’s SubscribeToAll method to subscribe to all new events. This method is called each time a new event is stored in Event Store:

Subscribing to new events from a specific stream

Similarly to SubscribeToAll, we can also use SubscribeToStream to retrieve events for a specific stream:

Managing projections

We can manage projections using the ProjectionsManager class of the Client API. This class allows us to retrieve information about projections as well as enable, create, and update them.

Creating a ProjectionsManager

This ProjectionsManager will use HTTP (default port 2113) to connect to Event Store:

In the example, we’re using the ConsoleLogger provided by the Client API. You can replace it with your own logger if necessary.

Manipulating projections

The methods of the ProjectionsManager class are fairly straightforward:

Subscribing to a projection’s state change

We can store the state of an event in a stream. This allows us to subscribe to the stream and retrieve updates when its state changes. If there’s only one state stored for all projected streams, the stream with the state is named “$projections-ProjectionName-result”, where ProjectionName is the name of the stream.

Keep in mind that the projection needs to know that it should store the state in the stream. We can do this by setting the producesResults option to true or by calling the outputState method in the JavaScript projection definition:

Source code

A working project for these examples can be found on github: https://github.com/tim-cools/EventStore-Examples

Event Store Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Event Store Projections By Example

I’ve spent some time recently exploring Event Store Projections written in JavaScript. Since there isn’t a lot of documentation yet, I’ll be publishing my findings in a series of blog posts.

Event Store Projections

Event Store is an open-source functional database that stores data as a series of immutable events. The concept isn’t new: logs have been used to store data in this exact fashion for decades, but it’s still garnered much attention from the DDD community in the past few years, especially from people implementing CQRS+ES. Event Store Projections allow us to process events, filter events, and even produce new events, features we can leverage for event processing in systems based on Event-Driven Architecture.

Why JavaScript?

Although it’s possible to write projections in any language, Event Store supports JavaScript natively. This means we don’t have to rely on external programs to process our projections if we write them in JavaScript, and we don’t have to manage their progress and state manually.

I used to hate having to deal with JavaScript; in most projects where I’ve seen it used, it’s an absolute mess. But the language has matured in the past few years: now we have test frameworks, design patterns, and guidelines to work with. Thanks to these tools, I’ve actually learned to like programming in JavaScript.

Examples

In the following posts, I provide examples of solutions to real-world problems I’ve faced using JavaScript and Event Store Projections. These include developing maintainable and testable code, filtering data, and calculating averages. Here are the topics I intend to write about in the future:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Source code

The source for these examples can be found on github:

https://github.com/tim-cools/EventStore-Examples

Be sure to read the README file first.

More Event Store Projections documentation

  1. Official Wiki
  2. Greg Young’s Blog
  3. Rob Ashton’s Blog