Event Store Projections – Partitioning events based on data found in previous events
This example is a variation on the default indexing projection described on the Event Store blog. Another example of partition projections can be found on Rob Aston’s blog.
An indexing projection partitions events from multiple streams by linking said events to new streams to based on common properties. These new streams can be used as input for another projection or to retrieve the events from the store in a single read operation. In this example, all MeasurementRead events from all streams in the Device category are partitioned based on their DeviceType. The difference between this example and a basic indexing projection is that the DeviceType property used for the partitioning is not available in every event: it is only available in the first DeviceConfigured event that happens before the MeasurementRead events. So in order to create the index, we store the DeviceType in the projection’s state (per aggregate stream) when the projection handles the DeviceConfigured event. This allows the projection to link each subsequent MeasurementRead event to the corresponding DeviceType-* stream.
JavaScript implementation
The projection contains two methods: storeDeviceType, which stores the DeviceType in the state, and linkToDeviceType, which links the subsequent events to the DeviceType-* stream. This module accepts an $eventServices argument in its constructor which is used to pass in a mock object when the module is tested. This mock object is used to verify whether the linkTo method is called with the correct argument. When no $eventServices are passed in, the default Event Store linkTo implementation is used.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | var deviceTypePartitioner = function ($eventServices) { var eventServices = !$eventServices ? { linkTo: linkTo } : $eventServices; var storeDeviceType = function (state, eventEnvelope) { state.deviceType = eventEnvelope.body.DeviceType; return state; }; var linkToDeviceType = function(s, e) { if (!s.deviceType) return s; eventServices.linkTo("DeviceType-" + s.deviceType, e); return s; }; return { storeDeviceType: storeDeviceType, linkToDeviceType: linkToDeviceType }; }; |
The partitioner module is instantiated once globally and is referenced from the projection’s definition:
1 2 3 4 5 6 7 8 9 | var partitioner = deviceTypePartitioner(); fromCategory("Device") .foreachStream() .when({ DeviceConfigured: partitioner.storeDeviceType, MeasurementRead: partitioner.linkToDeviceType }); |
The fromCategory(“Device”) definition tells the engine that this projection’s events come from all streams in the Device category. Streams are categorized based on the stream name before the final dash by default: for example, Device-0 and Device-1 are both in the Device category. To enable categories, the $stream_by_category and $by_category system projections should be enabled.
Next, we delegate the handling of the DeviceConfigured and MeasurementRead events to the partitioner module:
Testing the projection
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | /// /// describe("when partioning Device MeasurementRead events by type", function () { var partitioner; var projections; beforeEach(function () { projections = jasmine.createSpyObj('projections', ['linkTo']); partitioner = deviceTypePartitioner(projections); }); describe("when storing devicetype", function () { it("should store the deviceType in the state", function () { var state = partitioner.storeDeviceType({}, { body: { DeviceType: 'someDeviceType' } }); expect(state.deviceType).toEqual('someDeviceType'); }); }); describe("when linking devicetype", function () { describe("given devicetype is not", function () { var state; beforeEach(function () { state = partitioner.linkToDeviceType({ }, { body: { } }); }); it("should return the previous state", function () { expect(state).toEqual({ }); }); it("should not link the event", function () { expect(projections.linkTo).wasNotCalled(); }); }); describe("given devicetype is set", function () { var state; beforeEach(function () { state = partitioner.linkToDeviceType( { deviceType: 'someDeviceType' }, { body: { someEvent: 'someDate' } }); }); it("should return the previous state", function () { expect(state).toEqual({ deviceType: 'someDeviceType' }); }); it("should link the event", function () { expect(projections.linkTo) .wasCalledWith("DeviceType-someDeviceType", { body: { someEvent: 'someDate' } }); }); }); }); }); |
The first scenario tests how the DeviceType is stored and verifies that the state is updated correctly. The other scenarios test how the MeasurementRead events are linked, validating the cases where the DeviceType is not set and the resulting state after a call to the linkTo method of the $projectionServices mock object.
Even though this project requires practically no testing at all, it’s still a good example for how to use the Jasmine JavaScript test framework’s mock objects.
Querying the results with the Client API
Since the partitioned events are all linked to streams, reading them is straightforward. All we have to do is subscribe to the streams’ updates:
1 2 3 4 5 6 | _connection.SubscribeToStream( "DeviceType-TV", true, (subscription, resolvedEvent) => // do something with the lineked events, (subscription, subscriptionDropReason, exception) => // do something when subscription is dropped, EventStoreCredentials.Default); |
In most cases, the output of an indexing projection will be used as the input for another projection. Combining projections in this fashion provides us with a powerful way to process events. Future examples will show you how this works in practice.
Source code
A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples
EventStore Projections by Example
This post is part of a series:
Leave a Reply
Want to join the discussion?Feel free to contribute!