EventStore Projections – Calculating daily averages

This projection will calculate the daily average consumption per device. We take our Device streams as input and store an intermediate calculation in the state for each one. This calculation contains the total sum of measurements for the current day. The projection’s result is a new stream for each device which contains a new calculated average for each day.

Projection-MeasurementReadAveragePerDayCalculator

The $init method initializes the state. Each subsequent MeasurementRead event will invoke the calculate method, which will update the state with the total number of measurements, the sum of the measurements, and the timestamp of the last measurement. When we determine from the timestamp that a full day has passed, we emit an event which contains the date, the sum, and the calculated average, obtained by dividing the sum of the measurements by the number of measurements.

Projection-MeasurementReadAveragePerDayCalculator-Events

JavaScript implementation

First, we implement a daily timestamp formatter which will handle formatting the date part of a JavaScript Date object. Normally, we’d use an existing library such as momentjs to format the date part, but Event Store doesn’t currently let us include external libraries. One workaround for this would be to inject the library in the projection source when the projection is created. I might cover this option in a future blog post, but for now, we’ll have to roll our own function.

Next, we implement the projection module with the init and the cacluate methods:

Just like previous projections, we instantiate the calculator module once globally and then reference it in the projection’s definition:

Again we use the fromCategory(“Device”) definition to tell the engine that this projection’s events come from all streams in the Device category. Streams are categorized based on the stream name before the final dash by default: for example, Device-0 and Device-1 are both in the Device category. To enable categories, the $stream_by_category and $by_category system projections should be enabled. We delegate $init and MeasurementRead event handling to the calculator module, and we’re done.

Testing the projection

First, we test the formatting of the date part of the timestamp:

Next, we define the scenarios for testing the projection module:

The first two examples are almost too simple to be worth writing tests for at all, but this example demonstrates how easy it is to test projections with JavaScript and Jasmine. All you have to do is call the projection module with the event stream as its input and then check whether the resulting state matches the expected state. We can also verify the emitted events by testing whether the mocked emit methods are called with the correct events.

Querying the results with the Client API

All new events are emitted to a stream, so reading them is straightforward:

The implementation of the ReadStreamEventsBackward method is explained in the Client API post.

Conclusion

This a fairly naive implementation. First of all, the calculation doesn’t handle processing the same event twice properly: it will just add the value to the total twice, throwing off the average. It also expects events in some semblance of chronological order: if it receives an event whose timestamp indicates that a day has passed before it’s actually processed all the events for the day, then the emitted average event will be erroneous. Finally, it will fail to calculate the average entirely if a device stops sending messages during the day. To fix this, we’ll need to implement some kind of timeout mechanism. I’ll cover how to fix these issues in a future post.

Source code

A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples

Event Store Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#

Event Store Projections – Partitioning events based on data found in previous events

This example is a variation on the default indexing projection described on the Event Store blog. Another example of partition projections can be found on Rob Aston’s blog.

Projection-DeviceTypePartitioner

An indexing projection partitions events from multiple streams by linking said events to new streams to based on common properties. These new streams can be used as input for another projection or to retrieve the events from the store in a single read operation. In this example, all MeasurementRead events from all streams in the Device category are partitioned based on their DeviceType. The difference between this example and a basic indexing projection is that the DeviceType property used for the partitioning is not available in every event: it is only available in the first DeviceConfigured event that happens before the MeasurementRead events. So in order to create the index, we store the DeviceType in the projection’s state (per aggregate stream) when the projection handles the DeviceConfigured event. This allows the projection to link each subsequent MeasurementRead event to the corresponding DeviceType-* stream.
Projection-DeviceTypePartitioner-Events

JavaScript implementation

The projection contains two methods: storeDeviceType, which stores the DeviceType in the state, and linkToDeviceType, which links the subsequent events to the DeviceType-* stream. This module accepts an $eventServices argument in its constructor which is used to pass in a mock object when the module is tested. This mock object is used to verify whether the linkTo method is called with the correct argument. When no $eventServices are passed in, the default Event Store linkTo implementation is used.

The partitioner module is instantiated once globally and is referenced from the projection’s definition:

The fromCategory(“Device”) definition tells the engine that this projection’s events come from all streams in the Device category. Streams are categorized based on the stream name before the final dash by default: for example, Device-0 and Device-1 are both in the Device category. To enable categories, the $stream_by_category and $by_category system projections should be enabled.

Next, we delegate the handling of the DeviceConfigured and MeasurementRead events to the partitioner module:

Testing the projection

The first scenario tests how the DeviceType is stored and verifies that the state is updated correctly. The other scenarios test how the MeasurementRead events are linked, validating the cases where the DeviceType is not set and the resulting state after a call to the linkTo method of the $projectionServices mock object.

Even though this project requires practically no testing at all, it’s still a good example for how to use the Jasmine JavaScript test framework’s mock objects.

Querying the results with the Client API

Since the partitioned events are all linked to streams, reading them is straightforward. All we have to do is subscribe to the streams’ updates:

In most cases, the output of an indexing projection will be used as the input for another projection. Combining projections in this fashion provides us with a powerful way to process events. Future examples will show you how this works in practice.

Source code

A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples

EventStore Projections by Example

This post is part of a series:

  1. EventStore Client API Basics (C#)
  2. Counting events of a specific type
  3. Partition events based on data found in previous events
  4. Calculating an average per day
  5. The irresponsible gambler
  6. Distribute events to other streams
  7. Temporal Projection to generate alarms
  8. Projection in C#