Examples and Tutorials of Event Sourcing in NodeJS
CC-BY-SA-4.0 License
Bot releases are hidden (Show)
Published by oskardudycz over 1 year ago
Event Sourcing is perceived as a complex pattern. Some believe that it's like Nessie, everyone's heard about it, but rarely seen it. In fact, Event Sourcing is a pretty practical and straightforward concept. It helps build predictable applications closer to business. Nowadays, storage is cheap, and information is priceless. In Event Sourcing, no data is lost.
The workshop aims to build the knowledge of the general concept and its related patterns for the participants. The acquired knowledge will allow for the conscious design of architectural solutions and the analysis of associated risks.
The emphasis will be on a pragmatic understanding of architectures and applying it in practice using Marten and EventStoreDB.
You can do the workshop as a self-paced kit. That should give you a good foundation for starting your journey with Event Sourcing and learning tools like Marten and EventStoreDB. If you'd like to get full coverage with all nuances of the private workshop, feel free to contact me via email.
Read also more in my article Introduction to Event Sourcing - Self Paced Kit.
Follow the instructions in exercises folders.
npm install
.npm run build
.docker-compose up
to start EventStoreDB docker image.npm run test:solved
. If all is fine then all tests should be green.docker-compose up
to start EventStoreDB docker image.You should automatically get EventStoreDB UI: http://localhost:2113/
npm run build:ts:watch
.npm run test:exercise
. For solutions run npm run test:solved
, for all npm run test
.npm run test:exercise:watch
.Published by oskardudycz almost 2 years ago
Added code for the entire sample with optimistic concurrency and eventual consistency described in articles:
Published by oskardudycz almost 2 years ago
Nowadays, storage is cheap, but the information is priceless. Event sourcing is a valuable pattern that gives you more options around running, tracking and understanding the business workflow. This sample shows how you could benefit and migrate from the traditional approach.
Imagine that the company you work for has an efficiently developed ECommerce platform. The business situation looks so stable that the business is looking for new opportunities. Your current system is now made classically: monolith in Cloud, normalized relational base, ORM, etc.
The business concluded that the information about the final state of the shopping cart is not sufficient. Having the entire workflow history, it would be possible to analyze better the operations performed by the user (e.g. analysis of related products, products taken out of the basket, abandoned baskets, etc.). The business also wants to handle diagnostics and support better to solve reported problems.
The sample focuses on the specific shopping cart workflow, assuming you can reuse the strategies for other functionalities accordingly.
Having the following shopping cart process:
The current application is a NodeJS monolith application. It has two endpoints:
The assumption to have such a generic approach was that we're using a rich front-end Single Page Application that drives the business logic. For such a case, it may be reasonable just to validate data from the client and put them into the database. It also assumes that the database will enforce invariants/constraints. It may be valid in the first phase, where workflow is simple, and we do not require more sophisticated business logic and are fine with losing business context on each update.
Published by oskardudycz over 2 years ago
Added code samples for "Introduction to Event Sourcing in TypeScript and NodeJS with EventStoreDB"
Watch it below π
Code: https://github.com/oskardudycz/EventSourcing.NodeJS/tree/main/samples/webinar
Published by oskardudycz over 3 years ago
In Event Sourcing, the state is stored in events. Events are logically grouped into streams. Streams can be thought of as the entities' representation. Traditionally (e.g. in relational or document approach), each entity is stored as a separate record.
Id | IssuerName | IssuerAddress | Amount | Number | IssuedAt |
---|---|---|---|---|---|
e44f813c | Oscar the Grouch | 123 Sesame Street | 34.12 | INV/2021/11/01 | 2021-11-01 |
In Event Sourcing, the entity is stored as the series of events that happened for this specific object, e.g. InvoiceInitiated
, InvoiceIssued
, InvoiceSent
.
[
{
"id": "e44f813c-1a2f-4747-aed5-086805c6450e",
"type": "invoice-initiated",
"streamId": "INV/2021/11/01",
"streamPosition": 1,
"timestamp": "2021-11-01T00:05:32.000Z",
"data":
{
"issuedTo": {
"name": "Oscar the Grouch",
"address": "123 Sesame Street",
},
"amount": 34.12,
"number": "INV/2021/11/01",
"initiatedAt": "2021-11-01T00:05:32.000Z"
}
},
{
"id": "5421d67d-d0fe-4c4c-b232-ff284810fb59",
"type": "invoice-issued",
"streamId": "INV/2021/11/01",
"streamPosition": 2,
"timestamp": "2021-11-01T00:11:32.000Z",
"data":
{
"issuedTo": "Cookie Monster",
"issuedAt": "2021-11-01T00:11:32.000Z"
}
},
{
"id": "637cfe0f-ed38-4595-8b17-2534cc706abf",
"type": "invoice-sent",
"streamId": "INV/2021/11/01",
"streamPosition": 3,
"timestamp": "2021-11-01T00:12:01.000Z",
"data":
{
"sentVia": "email",
"sentAt": "2021-11-01T00:12:01.000Z"
}
}
]
All of those events shares the stream id ("streamId": "INV/2021/11/01"
), and have incremented stream position.
We can get to conclusion that in Event Sourcing entity is represented by stream, so sequence of event correlated by the stream id ordered by stream position.
To get the current state of entity we need to perform the stream aggregation process. We're translating the set of events into a single entity. This can be done with the following the steps:
This process is called also stream aggregation or state rehydration.
For this process we'll use the reduce function. It executes a reducer function (that you can provide) on each array element, resulting in a single output value. TypeScript extends it with the type guarantees:
InvoiceInitiated
) will provide all required fields. The other events will just do a partial update (InvoiceSent
only changes the status and sets the sending method and date).Having event types defined as:
type InvoiceInitiated = Event<
'invoice-initiated',
{
number: string;
amount: number;
issuedTo: Person;
initiatedAt: Date;
}
>;
type InvoiceIssued = Event<
'invoice-issued',
{
number: string;
issuedBy: string;
issuedAt: Date;
}
>;
type InvoiceSent = Event<
'invoice-sent',
{
number: string;
sentVia: InvoiceSendMethod;
sentAt: Date;
}
>;
Entity as:
type Invoice = Readonly<{
number: string;
amount: number;
status: InvoiceStatus;
issuedTo: Person;
initiatedAt: Date;
issued?: Readonly<{
by?: string;
at?: Date;
}>;
sent?: Readonly<{
via?: InvoiceSendMethod;
at?: Date;
}>;
}>;
We can rebuild the state with events using the reduce function:
const result = events.reduce<Partial<Invoice>>((currentState, event) => {
switch (event.type) {
case 'invoice-initiated':
return {
number: event.data.number,
amount: event.data.amount,
status: InvoiceStatus.INITIATED,
issuedTo: event.data.issuedTo,
initiatedAt: event.data.initiatedAt,
};
case 'invoice-issued': {
return {
...currentState,
status: InvoiceStatus.ISSUED,
issued: {
by: event.data.issuedBy,
at: event.data.issuedAt,
},
};
}
case 'invoice-sent': {
return {
...currentState,
status: InvoiceStatus.SENT,
sent: {
via: event.data.sentVia,
at: event.data.sentAt,
},
};
}
default:
throw 'Unexpected event type';
}
}, {});
The only thing left is to translate Partial<Invoice>
into properly typed Invoice
. We'll use type guard for that:
function isInvoice(invoice: Partial<Invoice>): invoice is Invoice {
return (
!!invoice.number &&
!!invoice.amount &&
!!invoice.status &&
!!invoice.issuedTo &&
!!invoice.initiatedAt &&
(!invoice.issued || (!!invoice.issued.at && !!invoice.issued.by)) &&
(!invoice.sent || (!!invoice.sent.via && !!invoice.sent.at))
);
}
if(!isInvoice(result))
throw "Invoice state is not valid!";
const reservation: Invoice = result;
Thanks to that, we have a proper type definition. We can make the stream aggregation more generic and reusable:
export function aggregateStream<Aggregate, StreamEvents extends Event>(
events: StreamEvents[],
when: (
currentState: Partial<Aggregate>,
event: StreamEvents,
currentIndex: number,
allEvents: StreamEvents[]
) => Partial<Aggregate>,
check?: (state: Partial<Aggregate>) => state is Aggregate
): Aggregate {
const state = events.reduce<Partial<Aggregate>>(when, {});
if (!check) {
console.warn('No type check method was provided in the aggregate method');
return <Aggregate>state;
}
if (!check(state)) throw 'Aggregate state is not valid';
return state;
}
See full sample: link.
Read more in my article:
Published by oskardudycz over 3 years ago
Event Sourcing is a design pattern in which results of business operations are stored as a series of events.
It is an alternative way to persist data. In contrast with state-oriented persistence that only keeps the latest version of the entity state, Event Sourcing stores each state change as a separate event.
Thanks for that, no business data is lost. Each operation results in the event stored in the databse. That enables extended auditing and diagnostics capabilities (both technically and business-wise). What's more, as events contains the business context, it allows wide business analysis and reporting.
In this repository I'm showing different aspects, patterns around Event Sourcing. From the basic to advanced practices.
Events, represent facts in the past. They carry information about something accomplished. It should be named in the past tense, e.g. "user added", "order status changed to confirmed". Events are not directed to a specific recipient - they're broadcasted information. It's like telling a story at a party. We hope that someone listens to us, but we may quickly realise that no one is paying attention.
Events:
Read more in my blog posts:
Events are logically grouped into streams. In Event Sourcing, streams are the representation of the entities. All the entity state mutations ends up as the persisted events. Entity state is retrieved by reading all the stream events and applying them one by one in the order of appearance.
A stream should have a unique identifier representing the specific object. Each event has its own unique position within a stream. This position is usually represented by a numeric, incremental value. This number can be used to define the order of the events while retrieving the state. It can be also used to detect concurrency issues.
Technically events are messages.
They may be represented, e.g. in JSON, Binary, XML format. Besides the data, they usually contain:
correlation id
, causation id
, etc.Sample event JSON can look like:
{
"id": "e44f813c-1a2f-4747-aed5-086805c6450e",
"type": "invoice-issued",
"streamId": "INV/2021/11/01",
"streamPosition": 1,
"timestamp": "2021-11-01T00:05:32.000Z",
"data":
{
"issuer": {
"name": "Oscar the Grouch",
"address": "123 Sesame Street",
},
"amount": 34.12,
"number": "INV/2021/11/01",
"issuedAt": "2021-11-01T00:05:32.000Z"
},
"metadata":
{
"correlationId": "1fecc92e-3197-4191-b929-bd306e1110a4",
"causationId": "c3cf07e8-9f2f-4c2d-a8e9-f8a612b4a7f1"
}
}
This structure could be translated directly into the TypeScript class. However, to make the code less redundant and ensure that all events follow the same convention, it's worth adding the base type. It could look as follows:
type Event<
EventType extends string = string,
EventData extends Record<string, unknown> = Record<string, unknown>
> = {
readonly type: EventType;
readonly data: EventData;
};
Several things are going on there:
EventType extends string = string
). It's added to be able to define the alias for the event type. Thanks to that, we're getting compiler check and IntelliSense support,EventData extends Record<string, unknown> = Record<string, unknown>
). It is the way of telling the TypeScript compiler that it may expect any type but allows you to specify your own and get a proper type check.type
and data
are marked as readonly
. Having that compiler won't allow us to change the value after the initial object assignment. Thanks to that, we're getting the immutability.Having that we can define the event as eg.:
// alias for event type
type INVOICE_ISSUED = 'invoice-issued';
// issuer DTO used in event data
type Issuer = {
readonly name: string,
readonly address: string,
}
// event type definition
type InvoiceIssued = Event<
INVOICE_ISSUED,
{
readonly issuer: Issuer,
readonly amount: number,
readonly number: string,
readonly issuedAt: Date
}
>
then create it as:
const invoiceIssued: InvoiceIssued = {
type: 'invoice-issued',
data: {
issuer: {
name: 'Oscar the Grouch',
address: '123 Sesame Street',
},
amount: 34.12,
number: 'INV/2021/11/01',
issuedAt: new Date()
},
}
Event Sourcing is not related to any type of storage implementation. As long as it fulfils the assumptions, it can be implemented having any backing database (relational, document, etc.). The state has to be represented by the append-only log of events. The events are stored in chronological order, and new events are appended to the previous event. Event Stores are the databases' category explicitly designed for such purpose.
The simplest (dummy and in-memory) Event Store can be defined in TypeScript as:
class EventStore {
private events: { readonly streamId: string; readonly data: string }[] = [];
appendToStream(streamId: string, ...events: any[]): void {
const serialisedEvents = events.map((event) => {
return { streamId: streamId, data: JSON.stringify(event) };
});
this.events.push(...serialisedEvents);
}
readFromStream<T = any>(streamId: string): T[] {
return this.events
.filter((event) => event.streamId === streamId)
.map<T>((event) => JSON.parse(event.data));
}
}
In the further samples, I'll use EventStoreDB. It's the battle-tested OSS database created and maintained by the Event Sourcing authorities (e.g. Greg Young). It supports many dev environments via gRPC clients, including NodeJS.
Published by oskardudycz over 3 years ago
SuperTest is a useful library that allows testing Express HTTP applications.
To install it run:
npm i -D supertest @types/supertest
SuperTest
takes as input Express application. We have to structure our code to return it, e.g.
import express, { Application, Request, Response } from 'express';
import { getGreeting } from './greetings/getGreeting';
const app: Application = express();
app.get('/', (_req: Request, res: Response) => {
res.json(getGreeting());
});
export default app;
Our updated intex will look like:
import app from './app';
import http from 'http';
const server = http.createServer(app);
const PORT = 5000;
server.listen(PORT);
server.on('listening', () => {
console.info('server up listening');
});
Let's create the test for the default route. For that, create a file, e.g. getGreetings.api.test.ts
. We'll be using a different prefix, api.test.ts
, as those tests are not unit but integration/acceptance. They will be running the Express server. Having the Express app extracted, we can use the SuperTest
library as:
import request from 'supertest';
import app from '../app';
describe('GET /', () => {
it('should return greeting "Hello World!"', () => {
return request(app)
.get('/')
.expect('Content-Type', /json/)
.expect(200, { greeting: 'Hello World!' });
});
});
SuperTest
wraps the Express app and making the API calls easier. It also provides a set of useful methods to check the response params.
As the final step we'll add a separate NPM script to package.json for running API tests and also script to run all of them.
{
"scripts": {
"test": "npm run test:unit && npm run test:api", // <-- added
"test:unit": "jest unit",
"test:api": "jest api" // <-- added
}
}
Published by oskardudycz over 3 years ago
It's important to have your changes be verified during the pull request process. We'll use GitHub Actions as a sample of how to do that. You need to create the .github/workflows folder and putt there new file (e.g. samples_simple.yml). This file will contain YAML configuration for your action:
The simplest setup will look like this:
name: Node.js CI
on:
# run it on push to the default repository branch
push:
branches: [$default-branch]
# run it during pull request
pull_request:
defaults:
run:
# relative path to the place where source code (with package.json) is located
working-directory: samples/simple
jobs:
build:
# use system defined below in the tests matrix
runs-on: windows-latest
steps:
- uses: actions/checkout@v2
- name: Use Node.js 14.x
uses: actions/setup-node@v1
with:
# use the node version defined in matrix above
node-version: 14.x
# install dependencies based on the package log
- run: npm ci
# run linting (ESlint and Prettier)
- run: npm run lint
# run build
- run: npm run build:ts
# run tests
- run: npm run test:unit
If you want to make sure that your code will be running properly for a few Node.js versions and different operating systems (e.g. because developers may have different environment configuration) then you can use matrix tests:
name: Node.js CI
on:
# run it on push to the default repository branch
push:
branches: [$default-branch]
# run it during pull request
pull_request:
defaults:
run:
# relative path to the place where source code (with package.json) is located
working-directory: samples/simple
jobs:
build:
# use system defined below in the tests matrix
runs-on: ${{ matrix.os }}
strategy:
# define the test matrix
matrix:
# selected operation systems to run CI
os: [windows-latest, ubuntu-latest, macos-latest]
# selected node version to run CI
node-version: [14.x, 15.x]
steps:
- uses: actions/checkout@v2
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v1
with:
# use the node version defined in matrix above
node-version: ${{ matrix.node-version }}
# install dependencies
- run: npm ci
# run linting (ESlint and Prettier)
- run: npm run lint
# run build
- run: npm run build:ts
# run tests
- run: npm run test:unit
Published by oskardudycz over 3 years ago
npm i -D jest @types/jest ts-jest
npx ts-jest config:init
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
};
src/greetings/getGreeting.ts
export function getGreeting() {
return {
greeting: 'Hello World!',
};
}
src/greetings/getGreetings.unit.test.ts
import { getGreeting } from './getGreeting';
describe('getGreeting', () => {
it('should return greeting "Hello World!"', () => {
const result = getGreeting();
expect(result).toBeDefined();
expect(result.greeting).toBe('Hello World!');
});
});
{
"scripts": {
"test:unit": "jest unit",
}
}
Now you can run them with:
npm run test:unit
Jest will be smart enough to find by convention all files with .unit.test.ts
suffix.
7. To be able to debug our tests we have to add new debug configurations to launch.json. We'll be using watch
settings, so we don't have re-run tests when we updated logic or test code.
{
"version": "0.2.0",
"configurations": [
{
"name": "Jest all tests",
"type": "node",
"request": "launch",
"program": "${workspaceRoot}/node_modules/jest/bin/jest.js",
"args": ["--verbose", "-i", "--no-cache", "--watchAll"],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
},
{
"name": "Jest current test",
"type": "node",
"request": "launch",
"program": "${workspaceFolder}/node_modules/jest/bin/jest",
"args": [
"${fileBasename}",
"--verbose",
"-i",
"--no-cache",
"--watchAll"
],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen"
}
]
}
See more in https://github.com/oskardudycz/EventSourcing.JS/pull/3.
Published by oskardudycz over 3 years ago
To configure VSCode debug you need to add launch.json file in the .vscode folder.
To not need to synchronise two separate configurations, we'll reuse the existing NPM script dev:start
that starts the application.
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug",
"type": "node",
"request": "launch",
"runtimeExecutable": "npm",
"runtimeArgs": ["run-script", "dev:start", "--", "--inspect-brk=9229"],
"port": 9229
}
]
}
As we have TypeScript configured, then we don't need any additional setup. We're reusing the native node debugging capabilities by using the --inspect-brk=9229
parameter. Read more in the Node.js documentation
See more in https://github.com/oskardudycz/EventSourcing.JS/pull/2.
Published by oskardudycz over 3 years ago
Install NodeJS - https://nodejs.org/en/download/. Recommended NVM.
Create project:
npm init -y
ExpressJS - Web Server for REST API.
npm i express
TypeScript - We'll be doing Type Driven Development
NodeJS
and Express
and TS Node
npm i -D typescript @types/express @types/node ts-node
npm i -g typescript
{
"scripts": {
"build:ts": "tsc",
}
}
tsc
globally you can init TypeScript config by running:tsc --init
{
"compilerOptions": {
"target": "es2020",
"module": "commonjs",
"outDir": "./dist",
"strict": true,
"strictNullChecks": true,
"noUnusedLocals": true,
"noImplicitReturns": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
},
"include": ["./src"],
}
ESLint - We'd like to have static code analysis:
npx eslint --init
β How would you like to use ESLint? Β· style
β What type of modules does your project use? Β· esm
β Which framework does your project use? Β· none
β Does your project use TypeScript? Β· No / Yes
β Where does your code run? Β· node
β How would you like to define a style for your project? Β· guide
β Which style guide do you want to follow? Β· standard
β What format do you want your config file to be in? Β· JSON
npm
:npm i -D @typescript-eslint/eslint-plugin eslint-config-standard eslint eslint-plugin-import eslint-plugin-node eslint-plugin-promise @typescript-eslint/parser
{
"env": {
"es2020": true,
"node": true
},
"extends": [
"standard"
],
"parser": "@typescript-eslint/parser",
"parserOptions": {
"ecmaVersion": 12,
"sourceType": "module"
},
"plugins": [
"@typescript-eslint"
],
"rules": {
}
}
/node_modules/*
# build artifacts
dist/*coverage/*
# data definition files
**/*.d.ts
# custom definition files
/src/types/
Prettier, as we aim to write pretty code:
npm i -D prettier eslint-config-prettier eslint-plugin-prettier
{
"tabWidth": 2,
"singleQuote": true
}
{
"env": {
"es2020": true,
"node": true
},
"extends": [
"plugin:@typescript-eslint/recommended",
"prettier/@typescript-eslint", <-- added
"plugin:prettier/recommended" <-- added
],
"parser": "@typescript-eslint/parser",
"parserOptions": {
"ecmaVersion": 12,
"sourceType": "module"
},
"plugins": [
"@typescript-eslint"
],
"rules": {
}
}
Define tasks for ESLint and Prettier in package.json:
{
"scripts": {
"lint": "npm run lint:eslint && npm run lint:prettier",
"lint:prettier": "prettier --check \"src/**/**/!(*.d).{ts,json,md}\"",
"lint:eslint": "eslint src/**/*.ts",
}
}
{
"scripts": {
"lint:eslint": "eslint src/**/*.ts",
"prettier:fix": "prettier --write \"src/**/**/!(*.d).{ts,json,md}\"",
}
}
Husky is a tool that enables to run scripts on precommit git hook. We'll use it to run ESLint
and Prettier
to make sure that code is formatted and following rules.
npm i -D husky@4
{
"husky": {
"hooks": {
"pre-commit": "npm run lint"
}
}
}
Nodemon to have hot-reload of the running Express server code.
npm i -D nodemon
{
"scripts": {
"dev:start": "nodemon src/index.ts",
}
}
npm run dev:start
{ "greeting": "Hello World!" }
See more in https://github.com/oskardudycz/EventSourcing.JS/pull/1.