Here I’ll give a overview on how to use the Event Store Client API from C#. I’ll be highlighting the features used to implement the examples described in the following blog posts. If you’re only interested in projections written in JavaScript, you can safely skip this post and go directly to the first example.
Running Event Store
These examples are based on the latest main branch version available at the time of writing (RC3.0). Since you have to run the Client API against the same version of Event Store, I’ve included the binaries in the github repository. This means you can check out the examples and run them without worrying about version compatibility or building Event Store yourself.
To run the examples, run Event Store as an administrator with –run-projections=all. The following command will run Event Store with projections enabled and an in-memory database:
| lib\EventStore\EventStore.SingleNode.exe --run-projections=all --mem-db |
Event Store Client API Connections
Event Store is accessible through two protocols: HTTP and TCP. The default HTTP endpoint is port 1113 and the default TCP endpoint to 2113, though both of these can be modified from the command line using –tcp-port and –http-port:
| EventStore.SingleNode.exe --tcp-port=5555 --http-port=5556 |
Creating a connection
The Event Store Client API connection is accessible through the IEventStoreConnection interface and connects to the TCP endpoint:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | internal class IPEndPointFactory { public static IPEndPoint DefaultTcp() { return CreateIPEndPoint(1113); } private static IPEndPoint CreateIPEndPoint(int port) { var address = IPAddress.Parse("127.0.0.1"); return new IPEndPoint(address, port); } } public static class EventStoreConnectionFactory { public static IEventStoreConnection Default() { var connection = EventStoreConnection.Create(IPEndPointFactory.DefaultTcp()); connection.Connect(); return connection; } } |
Most connection methods accept credentials for connecting to Event Store. These examples always use the default admin account:
| public class EventStoreCredentials { private static readonly UserCredentials _credentials = new UserCredentials("admin", "changeit"); public static UserCredentials Default { get { return _credentials; } } } |
Converting events to and from JSON
These examples use static-typed events which are serialized to JSON when they are sent to Event Store. When they are retrieved from the database, they are deserialized from JSON into static-typed events. Two extension methods take care of this process:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public static EventData AsJson(this object value) { if (value == null) throw new ArgumentNullException("value"); var json = JsonConvert.SerializeObject(value); var data = Encoding.UTF8.GetBytes(json); var eventName = value.GetType().Name; return new EventData(Guid.NewGuid(), eventName, true, data, new byte[] { }); } public static T ParseJson<T>(this RecordedEvent data) { if (data == null) throw new ArgumentNullException("data"); var value = Encoding.UTF8.GetString(data.Data); return JsonConvert.DeserializeObject<T>(value); } |
These methods use JSON.NET for JSON serialization.
Appending an event to a stream
Now that we’ve created a connection and know how to serialize and deserialize events, we can append an event to a stream. We don’t have to create the stream first in this case: if we try to append an event to a stream that doesn’t exist, the stream is automatically created for us.
| var @event = new MeasurementRead(DateTime.Now, 10.12m) .AsJson(); _connection.AppendToStream(deviceName, ExpectedVersion.Any, new[] { @event }); |
Note that concurrency control in Event Store is handled through versioning. In this example, we skip concurrency control by passing ExpectedVersion.Any when an event is appended to a stream.
Getting the last event number from a stream
To read events from a stream in reverse order, we need to know the number of the last event in the stream. To get this number, we ask for the last event of the stream by passing an EventNumber of -1 when calling the ReadEvent method:
| private static int? GetLastEventNumber(this IEventStoreConnection connection, string streamName) { var lastEvent = connection.ReadEvent(streamName, -1, false, EventStoreCredentials.Default); if (lastEvent == null || lastEvent.Event == null) return null; return lastEvent.Event.Value.OriginalEventNumber; } |
Getting events from a stream in reverse order
Now we can read events in reverse order starting from the number of the last event. This example reads events in pages of 10 at a time:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | private const int PageSize = 10; public static IEnumerable<T> ReadStreamEventsBackward<T>(this IEventStoreConnection connection, string streamName) { if (connection == null) throw new ArgumentNullException("connection"); var lastEventNumber = connection.GetLastEventNumber(streamName); return lastEventNumber == null ? new T[0] : ReadResult<T>(connection, streamName, lastEventNumber.Value); } private static IEnumerable<T> ReadResult<T>(IEventStoreConnection connection, string streamName, int lastEventNumber) { var result = new List<T>(); do { var events = connection.ReadStreamEventsBackward(streamName, lastEventNumber, PageSize, false, EventStoreCredentials.Default); result.AddRange(events.Events.Select(e => e.ParseJson<T>())); lastEventNumber = events.NextEventNumber; } while (lastEventNumber != -1); return result; } |
Subscribing to all new events
We can use the connection’s SubscribeToAll method to subscribe to all new events. This method is called each time a new event is stored in Event Store:
| public void StartReading() { _connection.SubscribeToAll(true, Appeared, Dropped, EventStoreCredentials.Default); } private void Appeared(EventStoreSubscription subscription, ResolvedEvent resolvedEvent) { // do something with the events here // var @event = resolvedEvent.ParseJson(); } private void Dropped(EventStoreSubscription subscription, SubscriptionDropReason subscriptionDropReason, Exception exception) { // is called when the tcp connection is dropped, we could // implement recovery here } |
Subscribing to new events from a specific stream
Similarly to SubscribeToAll, we can also use SubscribeToStream to retrieve events for a specific stream:
| public void SubscribeValueChange(string eventName) { _connection.SubscribeToStream(eventName, false, ValueChanged, Dropped, EventStoreCredentials.Default); } private void ValueChanged(EventStoreSubscription eventStoreSubscription, ResolvedEvent resolvedEvent) { // do something with the events here // var @event = resolvedEvent.ParseJson(); } private void Dropped(EventStoreSubscription subscription, SubscriptionDropReason subscriptionDropReason, Exception exception) { // is called when the tcp connection is dropped, we could // implement recovery here } |
Managing projections
We can manage projections using the ProjectionsManager class of the Client API. This class allows us to retrieve information about projections as well as enable, create, and update them.
Creating a ProjectionsManager
This ProjectionsManager will use HTTP (default port 2113) to connect to Event Store:
| internal class IPEndPointFactory { public static IPEndPoint DefaultHttp() { return CreateIPEndPoint(2113); } private static IPEndPoint CreateIPEndPoint(int port) { var address = IPAddress.Parse("127.0.0.1"); return new IPEndPoint(address, port); } } var logger = new ConsoleLogger(); var projectionsManager = new ProjectionsManager(logger, IPEndPointFactory.DefaultHttp()); |
In the example, we’re using the ConsoleLogger provided by the Client API. You can replace it with your own logger if necessary.
Manipulating projections
The methods of the ProjectionsManager class are fairly straightforward:
| var all = projectionsManager.ListAll(EventStoreCredentials.Default); var json = JsonConvert.DeserializeObject(all); foreach (var projection in json.projections) { var name = projection.name.Value; var status = projection.status.Value; //Do something with the projection } |
| var name = "MyNewContinuousProjection"; var projection = @"fromAll().when({ $any: function (s, e) { s.lastEventName = e.eventType; return s; } } )"; projectionsManager.CreateContinuous(name, projection, EventStoreCredentials.Default); |
| var name = "MyNewContinuousProjection"; var expectedQuery = @"fromAll().when({ $any: function (s, e) { s.lastEventName = e.eventType; return s; } } )"; var currentQuery = _projectionsManager.GetQuery(name, EventStoreCredentials.Default); if (expectedQuery != currentQuery) { _projectionsManager.UpdateQuery(name, expectedQuery, EventStoreCredentials.Default); } |
| public T GetState(string projectionName) { var state = _projectionsManager.GetState(projectionName); return state.ParseJson(); } |
Subscribing to a projection’s state change
We can store the state of an event in a stream. This allows us to subscribe to the stream and retrieve updates when its state changes. If there’s only one state stored for all projected streams, the stream with the state is named “$projections-ProjectionName-result”, where ProjectionName is the name of the stream.
| _connection.SubscribeToStream($projections-MeasurementReadCounter-result", false, ValueChanged, Dropped, EventStoreCredentials.Default); |
Keep in mind that the projection needs to know that it should store the state in the stream. We can do this by setting the producesResults option to true or by calling the outputState method in the JavaScript projection definition:
| options({ producesResults: true }); // or fromAll() .when({ // ... }) .outputState() |
Source code
A working project for these examples can be found on github: https://github.com/tim-cools/EventStore-Examples
Event Store Projections by Example
This post is part of a series:
- EventStore Client API Basics (C#)
- Counting events of a specific type
- Partition events based on data found in previous events
- Calculating an average per day
- The irresponsible gambler
- Distribute events to other streams
- Temporal Projection to generate alarms
- Projection in C#