EventStore Projections – Calculating daily averages
This projection will calculate the daily average consumption per device. We take our Device streams as input and store an intermediate calculation in the state for each one. This calculation contains the total sum of measurements for the current day. The projection’s result is a new stream for each device which contains a new calculated average for each day.
The $init method initializes the state. Each subsequent MeasurementRead event will invoke the calculate method, which will update the state with the total number of measurements, the sum of the measurements, and the timestamp of the last measurement. When we determine from the timestamp that a full day has passed, we emit an event which contains the date, the sum, and the calculated average, obtained by dividing the sum of the measurements by the number of measurements.
JavaScript implementation
First, we implement a daily timestamp formatter which will handle formatting the date part of a JavaScript Date object. Normally, we’d use an existing library such as momentjs to format the date part, but Event Store doesn’t currently let us include external libraries. One workaround for this would be to inject the library in the projection source when the projection is created. I might cover this option in a future blog post, but for now, we’ll have to roll our own function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | // // var dailyTimestampFormatter = function dailyTimestampFormatterConstructor() { var zeroPad = function (num, places) { var zero = places - num.toString().length + 1; return Array(+(zero > && zero)).join("0") + num; }; var format = function (timestamp) { var date = new Date(timestamp); return date.getFullYear().toString() + '/' + zeroPad(date.getMonth() + 1, 2) + '/' + zeroPad(date.getDate(), 2); }; return { format: format }; }(); |
Next, we implement the projection module with the init and the cacluate methods:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | var measurementReadAveragPerDayCalculator = function measurementReadAveragPerDayCalculatorConstuctor($eventServices) { var eventServices = !$eventServices ? { emit: emit } : $eventServices; var isFirstTime = function(previousState) { return !previousState.lastTimestamp; }; var isSameTimeSlot = function (currentTimestamp, lastTimestamp) { return dailyTimestampFormatter.format(currentTimestamp) == dailyTimestampFormatter.format(lastTimestamp); }; var createEvent = function (timeStamp, reading, count) { return { Timeslot: dailyTimestampFormatter.format(timeStamp), Total: reading, Count: count, Average: reading / count }; }; var createState = function (reading, count, timestamp) { return { total: reading, count: count, average: reading / count, lastTimestamp: timestamp }; }; var emitAverageEvent = function (measurementEvent, previousState) { var streamName = 'MeasurementAverageDay-' + measurementEvent.streamId; var event = createEvent(previousState.lastTimestamp, previousState.total, previousState.count); eventServices.emit(streamName, "MeasurementAverageDay", event); }; var init = function () { return createState(, , null); }; var update = function (previousState, measurementEvent) { var timestamp = measurementEvent.body.Timestamp; var reading = measurementEvent.body.Reading; if (isFirstTime(previousState) || isSameTimeSlot(timestamp, previousState.lastTimestamp)) { return createState(previousState.total + reading, previousState.count + 1, timestamp); } else { emitAverageEvent(measurementEvent, previousState); return createState(reading, 1, timestamp); } }; return { init: init, update: update }; }; |
Just like previous projections, we instantiate the calculator module once globally and then reference it in the projection’s definition:
1 2 3 4 5 6 7 8 | var calculator = measurementReadAveragPerDayCalculator(); fromCategory('Device') .foreachStream() .when({ $init: calculator.init, MeasurementRead: calculator.update }); |
Again we use the fromCategory(“Device”) definition to tell the engine that this projection’s events come from all streams in the Device category. Streams are categorized based on the stream name before the final dash by default: for example, Device-0 and Device-1 are both in the Device category. To enable categories, the $stream_by_category and $by_category system projections should be enabled. We delegate $init and MeasurementRead event handling to the calculator module, and we’re done.
Testing the projection
First, we test the formatting of the date part of the timestamp:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | /// /// describe("when formatting the timeframe", function () { var formattedTimeSlot = function(value) { return dailyTimestampFormatter.format(value); }; it("should contain year month and date", function () { expect(formattedTimeSlot("2000-01-01T08:02:39.687Z")).toMatch("2000/01/01"); expect(formattedTimeSlot("2005-06-24T08:02:39.687Z")).toMatch("2005/06/24"); expect(formattedTimeSlot("2012-10-10T08:02:39.687Z")).toMatch("2012/10/10"); expect(formattedTimeSlot("2015-11-12T08:02:39.687Z")).toMatch("2015/11/12"); expect(formattedTimeSlot("2030-12-18T08:02:39.687Z")).toMatch("2030/12/18"); }); }); |
Next, we define the scenarios for testing the projection module:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | describe("when projecting measurement reads average per day", function () { var handler; var defaultEvent; var projections; var eventEnvelope = function(timestamp, reading) { return { streamId: "device1", body: { Timestamp: timestamp, Reading: reading } }; }; beforeEach(function () { defaultEvent = { body: {} }; projections = jasmine.createSpyObj('projections', ['emit']); handler = measurementReadAveragPerDayCalculator(projections); }); describe("given state is empty", function () { it("should return initial state for first event", function() { var event = { body: { Timestamp: "2030-12-18T08:02:39.687Z", Reading: 1.23 } }; var state = handler.init(); var actcual = handler.update(state, event); expect(actcual).toEqual({ total: 1.23, count: 1, average: 1.23, lastTimestamp: "2030-12-18T08:02:39.687Z" }); }); }); describe("given events are in the same timeslot", function () { it("should return calculated average for all events", function() { var state = handler.init(); state = handler.update(state, eventEnvelope("2030-12-18T08:00:39.687Z", 1.23)); state = handler.update(state, eventEnvelope("2030-12-18T09:01:39.687Z", 2.34)); state = handler.update(state, eventEnvelope("2030-12-18T10:02:39.687Z", 3.45)); expect(state).toEqual({ total: 7.02, count: 3, average: 2.34, lastTimestamp: "2030-12-18T10:02:39.687Z" }); }); }); describe("given events are in the different timeslot", function () { var state; beforeEach(function () { state = handler.init(); state = handler.update(state, eventEnvelope("2030-12-18T08:00:39.687Z", 1.23)); state = handler.update(state, eventEnvelope("2030-12-18T09:01:39.687Z", 2.34)); state = handler.update(state, eventEnvelope("2030-12-19T10:02:39.687Z", 3.44)); state = handler.update(state, eventEnvelope("2030-12-19T11:02:39.687Z", 4.56)); }); it("an event with calculated average of first timeframe should be emitted", function () { expect(projections.emit).toHaveBeenCalledWith( 'MeasurementAverageDay-device1', 'MeasurementAverageDay', { Timeslot: '2030/12/18', Total: 3.57, Count: 2, Average: 1.785 } ); }); it("should return calculated average for events of new timeframe", function () { expect(state).toEqual({ total: 8, count: 2, average: 4, lastTimestamp: "2030-12-19T11:02:39.687Z" }); }); }); }); |
The first two examples are almost too simple to be worth writing tests for at all, but this example demonstrates how easy it is to test projections with JavaScript and Jasmine. All you have to do is call the projection module with the event stream as its input and then check whether the resulting state matches the expected state. We can also verify the emitted events by testing whether the mocked emit methods are called with the correct events.
Querying the results with the Client API
All new events are emitted to a stream, so reading them is straightforward:
1 2 3 4 | var projectionResultStream = "MeasurementAverageDay-" + deviceStreamName; var averages = _connection .ReadStreamEventsBackward(projectionResultStream); |
The implementation of the ReadStreamEventsBackward method is explained in the Client API post.
Conclusion
This a fairly naive implementation. First of all, the calculation doesn’t handle processing the same event twice properly: it will just add the value to the total twice, throwing off the average. It also expects events in some semblance of chronological order: if it receives an event whose timestamp indicates that a day has passed before it’s actually processed all the events for the day, then the emitted average event will be erroneous. Finally, it will fail to calculate the average entirely if a device stops sending messages during the day. To fix this, we’ll need to implement some kind of timeout mechanism. I’ll cover how to fix these issues in a future post.
Source code
A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples
Event Store Projections by Example
This post is part of a series:
Leave a Reply
Want to join the discussion?Feel free to contribute!