Event Store Projections – Temporal projection for generating alarms
To detect whether a player is gambling irresponsibly, we total the amounts won and lost over all games played in the last 24 hours. If this amount exceeds 500€, we generate an alarm – but since we don’t want to spam our players, we only do so if the previous alarm was more than 24 hours ago.
The projection takes all GameWon and GameLost events from all Player streams as input and stores all games from the last 24 hours in its state. When the amount of money a player loses exceeds 500€ in a 24-hour span, it generates an alarm by emitting an event to the IrresponsibleGambleAlarm stream.
JavaScript Implementation
The irresponsibleGamblingDetector module implements the core projection logic:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | // <reference path="References\1Prelude.js"></reference> // <reference path="References\Projections.js"></reference> var irresponsibleGamblingDetector = function irresponsibleGamblingDetectorConstuctor($eventServices) { var eventServices = !$eventServices ? { emit: emit } : $eventServices; var millisecondsPerDay = 24 * 60 * 60 * 1000; var amountLostThreshold = 500; var emitAlarm = function (state, playerId, total, timestamp) { state.LastAlarm = timestamp; eventServices.emit('IrresponsibleGamblingAlarms', 'IrresponsibleGamblerDetected', { PlayerId: playerId, AmountSpentLAst24Hours: total, Timestamp: timestamp }); }; var init = function () { return { LastAlarm: null, GamesLast24Hour: [], }; }; var isMoreAs24HoursDifference = function (timestamp, reference) { if (reference === null) return true; var millisecondsDifference = new Date(timestamp).getTime() - new Date(reference).getTime(); return millisecondsDifference > millisecondsPerDay ; }; var removeGamesOlderThan24HoursFromCache = function (gamesResults, timestamp) { while (gamesResults.length > && isMoreAs24HoursDifference(timestamp, gamesResults[].Timestamp)) { gamesResults.pop(); } }; var addNewGameToCache = function (gamesResults, timestamp, amount, gameId) { gamesResults.push({ Timestamp: timestamp, Amount: amount, GameId: gameId }); }; var updateCachedGamesLast24Hour = function (gamesResults, timestamp, amount, gameId) { removeGamesOlderThan24HoursFromCache(gamesResults, timestamp); addNewGameToCache(gamesResults, timestamp, amount, gameId); }; var calculateTotalAmount = function (gamesResults) { var total = ; for (var resultIndex = ; resultIndex < gamesResults.length; resultIndex++) { var result = gamesResults[resultIndex]; total += result.Amount; } return total; }; var duplicated = function (gamesResults, gameId) { for (var resultIndex = ; resultIndex < gamesResults.length; resultIndex++) { var result = gamesResults[resultIndex]; if (result.GameId == gameId) { return true; } } return false; }; var process = function (state, playerId, timestamp, amount, gameId) { var gamesResults = state.GamesLast24Hour; if (duplicated(gamesResults, gameId)) { return state; } updateCachedGamesLast24Hour(gamesResults, timestamp, amount, gameId); if (!isMoreAs24HoursDifference(timestamp, state.LastAlarm)) { return state; } var total = calculateTotalAmount(gamesResults); if (total < -amountLostThreshold) { emitAlarm(state, playerId, total, timestamp); } return state; }; var processEvent = function (state, event) { var playerId = event.body.PlayerId; var timestamp = event.body.Timestamp; var amount = event.body.Amount; var gameId = event.body.GameId; return process(state, playerId, timestamp, amount, gameId); }; return { init: init, processGameLost: processEvent, processGameWon: processEvent }; }; |
The init method generates the initial state, which contains the timestamp of the last generated alarm and an empty list of the games that player has played in the last 24 hours.
The processGameLost and processGameWon methods process the corresponding events. First, they update the list of games played in the last 24 hours, removing games that were played more than 24 hours ago and adding newly received games. Next, they check whether an alarm was generated in the last 24 hours: if this is the case, processing halts immediately. Finally, they calculate the total amount won or lost in all games over the last 24 hours, and if the amount lost exceeds the threshold, then an IrresponsibleGamblerDetected event is emitted to the IrresponsibleGamblingAlarms stream.
Since we can only guarantee that events are processed at least once, we also have to ensure that our projection is idempotent. We need to check whether an incoming event has already been processed, or else we’ll get strange results if we process the results of the same game twice (or more). By storing the GameIds of each game we process in the state, we can determine whether we’ve already seen a specific game or not before we process it.
This module is instantiated once globally and then referenced from the projection’s definition:
1 2 3 4 5 6 7 8 9 | var detector = irresponsibleGamblingDetector(); fromCategory('Player') .foreachStream() .when({ $init: detector.init, GameLost: detector.processGameLost, GameWon: detector.processGameWon }); |
This definition indicates that the projection is only interested in GameLost and GameWon events from Player streams.
Testing the projection
To test the projection, we’ll define four scenarios. The first will test a lost game, but without exceeding the threshold, so no alarm will be generated. The second will assert that an alarm is generated when the threshold is exceeded. The third scenario will also exceed the threshold, but in more than 24 hours, so no alarm will be generated. The last scenario will check for event duplication.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | /// <reference path="../References/jasmine/jasmine.js"></reference> /// <reference path="../IrresponsibleGamblingDetector.js"></reference> describe("when detecting irresponsible gamblers", function () { var detector; var projections; beforeEach(function () { projections = jasmine.createSpyObj('projections', ['emit']); detector = irresponsibleGamblingDetector(projections); }); describe("when processing GameLost event", function () { describe("given amount lost is less than 500", function () { var state; beforeEach(function () { state = detector.init(); state = detector.processGameLost(state, { sequenceNumber: 10, body: { PlayerId: 'Player-1', GameId: 'Game-1', Amount: -100, Timestamp: '2013-12-18T08:02:39.687Z' } }); }); it("state should contain the one amount", function () { expect(state).toEqual({ LastAlarm: null, GamesLast24Hour: [ { Timestamp: '2013-12-18T08:02:39.687Z', Amount: -100, GameId: 'Game-1' } ] }); }); it("no event should be emitted", function () { expect(projections.emit).wasNotCalled(); }); }); describe("given amount lost is greater than 500 in last 24 hours", function () { var state; beforeEach(function () { state = detector.init(); state = detector.processGameLost(state, { sequenceNumber: 1, body: { PlayerId: 'Player-1', GameId: 'Game-1', Amount: -100, Timestamp: '2013-12-18T08:02:39.687Z' } }); state = detector.processGameLost(state, { sequenceNumber: 2, body: { PlayerId: 'Player-1', GameId: 'Game-2', Amount: -401, Timestamp: '2013-12-19T08:02:39.687Z' } }); }); it("state should contain the both amounts", function () { expect(state).toEqual({ LastAlarm: '2013-12-19T08:02:39.687Z', GamesLast24Hour: [ { Timestamp: '2013-12-18T08:02:39.687Z', Amount: -100, GameId: 'Game-1' }, { Timestamp: '2013-12-19T08:02:39.687Z', Amount: -401, GameId: 'Game-2' } ] }); }); it("should emit an alarm", function () { expect(projections.emit).wasCalledWith("IrresponsibleGamblingAlarms", "IrresponsibleGamblerDetected", { PlayerId: 'Player-1', AmountSpentLast24Hours: -501, Timestamp: '2013-12-19T08:02:39.687Z' } ); }); }); describe("given amount lost is greater than 500 but not in the last 24 hours", function () { var state; beforeEach(function () { state = detector.init(); state = detector.processGameLost(state, { body: { PlayerId: 'Player-1', GameId: 'Game-1', Amount: -100, Timestamp: '2013-12-18T08:02:39.687Z' } }); state = detector.processGameLost(state, { body: { PlayerId: 'Player-1', GameId: 'Game-2', Amount: -401, Timestamp: '2013-12-19T09:02:39.687Z' } }); }); it("state should contain the only last amount", function () { expect(state).toEqual({ LastAlarm: null, GamesLast24Hour: [ { Timestamp: '2013-12-19T09:02:39.687Z', Amount: -401, GameId: 'Game-2' } ] }); }); it("should not emit an alarm", function () { expect(projections.emit).wasNotCalled(); }); }); describe("given an event has already been processed", function () { var state; beforeEach(function () { var event = { body: { PlayerId: 'Player-1', GameId: 'Game-1', Amount: -100, Timestamp: '2013-12-18T08:02:39.687Z' } }; state = detector.init(); state = detector.processGameLost(state, event); state = detector.processGameLost(state, event); state = detector.processGameLost(state, event); }); it("state should contain the only one event", function () { expect(state).toEqual({ LastAlarm: null, GamesLast24Hour: [ { Timestamp: '2013-12-18T08:02:39.687Z', Amount: -100, GameId: 'Game-1' } ] }); }); }); }); }); |
Once again, we see the power of event processing testing. We can arrange messages in the past and future without any issue.
Processing the generated alarms
Since we want to send email and text messages based on the output of this projection, we’ll be implementing a projection in C# to handle the alarms.
Source code
A working project for this example can be found on github: https://github.com/tim-cools/EventStore-Examples
Event Store Projections by Example
This post is part of a series:
Leave a Reply
Want to join the discussion?Feel free to contribute!