Outbox implementation for Kafka with Couchbase, MSSQL and PostgreSQL.
MIT License
git clone https://github.com/Trendyol/PollingOutboxPublisher.git
/src/config/config.json
) and secret file (/src/config/secret.json
).0
the first row in the OutboxOffset
.OutboxEvents
data store.OutboxEvents
: The data store that holds the messages to be published.MissingEvents
: The data store that holds the messages that could not be published.ExceededEvents
: The data store that holds the messages that could not be published after the retry limit isOutboxOffset
: The data store that holds the last published OutboxEvents
ID.OutboxEvents
data store.MissingEvents
OutboxEvents
incremental ID list expected to be continuous. If thereMissingEvents
and retry theExceededEvents
.OutboxOffset
with theOutboxEvents
data store.OutboxOffset
data store.You can see some basic diagrams below to summarize the algorithm:
Multiple instances of the application can be run at the same time to increase the reliability of the system. But, only
one instance should be the Master Pod at a time.
MasterPodLock
is a distributed lock that is used to determine the Master Pod, and It has a TTL(time-to-live
value).
MasterPodLock
will be the Master Pod and will start publishing messages.
MasterPodLock
is a simple redis key. If lock taken, the value of the key is setted as pod name which is getting from Environment value. If Environment doesn't have pod name, GUID is used.MasterPodLock
every certain amount ofMasterPodLock
every certain amount of time.MasterPodLock
and become the newMasterPodSettings.IsActive
to false
, but don't forget to make sure[!WARNING] If there will be multiple instances, the
MasterPodSettings.IsActive
should be set totrue
. Otherwise, messages can be duplicated or not published.
The application can be configured using the config.json
and secret.json
files. Here are the configurations you can
set:
Key | Type | Description |
---|---|---|
Kafka.SaslUsername |
string | The username for the SASL authentication of the Kafka cluster. |
Kafka.Brokers |
string | The addresses of the Kafka brokers. |
Kafka.SaslPassword |
string | The password for the SASL authentication of the Kafka cluster. |
Kafka.SslCaLocation |
string | The location of the SSL certificate for the Kafka cluster. |
Kafka.SslKeystorePassword |
string | The SSL Keystore Password |
Kafka.SaslMechanism |
string | The SSL Mechanism. Default: ScramSha512 |
Kafka.SecurityProtocol |
string | The SSL Protocol. Default: SaslSsl |
Kafka.BatchSize |
string | The Batch Size of the Kafka Publisher. Default: 512 * 1024 |
Kafka.LingerMs |
string | The Linger of the Kafka Publisher. Default: 10 |
Kafka.CompressionType |
string | The Compression Type of the Kafka Publisher. Default: Snappy |
Kafka.MessageMaxBytes |
string | The Message Max Bytes of the Kafka Publisher. Default: 30000000 |
Kafka.Acks |
string | The Acks of the Kafka Publisher Default: Leader |
BenchMarkOptions.IsPublishingOn |
bool | A flag indicating whether publishing is on. Can be used when load testing. |
WorkerSettings.OutboxEventsBatchSize |
int | The batch size for outbox events. The amount of messages is published in paralells. |
WorkerSettings.QueueWaitDuration |
int | The wait duration for to take next batch of messages. |
WorkerSettings.MissingEventsBatchSize |
int | The batch size for missing events. The amount of missing messages is published in paralells. |
WorkerSettings.MissingEventsWaitDuration |
int | The wait duration to take next batch of missing events. |
WorkerSettings.MissingEventsMaxRetryCount |
int | The maximum retry count for missing events. |
WorkerSettings.BrokerErrorsMaxRetryCount |
int | The maximum retry count for broker errors. |
WorkerSettings.RedeliveryDelayAfterError |
int | The delay after an error before redelivery the batch of missing events. |
ConnectionString |
string | The connection string for the SQL database. It can be used for both MsSQL and PostgreSQL. |
ReadOnlyConnectionString |
string | The connection string for the Readonly SQL database. This config is optional and only work for MsSQL. If doesn't provided ConnectionString will be used. |
Couchbase.Host |
string | The hostname or IP address of the Couchbase server. |
Couchbase.Username |
string | The username for authentication with Couchbase. |
Couchbase.Bucket |
string | The name of the Couchbase bucket to access data. |
Couchbase.Scope |
string | The scope within the bucket to access data. |
Couchbase.Password |
string | The password for authentication with Couchbase. |
DataStoreSettings.DatabaseType |
string | The chosen database type. It can be MSSQL, Couchbase or PostgreSQL |
DataStoreSettings.OutboxEvents |
string | The name of the outbox event data store. |
DataStoreSettings.MissingEvents |
string | The name of the missing event data store. |
DataStoreSettings.ExceededEvents |
string | The name of the exceeded event data store. |
DataStoreSettings.OutboxOffset |
string | The name of the outbox offset data store. Holds the last published OutboxEvents Id |
MasterPodSettings.IsActive |
bool | A flag indicating whether the master pod checker is active. Should be active if multiple pods is using. |
MasterPodSettings.CacheName |
string | The name of the distributed lock key. This key should be same for the multiple instances of the app. |
MasterPodSettings.MasterPodLifetime |
int | The lifetime of the master pod. The TTL of the distributed lock. |
MasterPodSettings.MasterPodRaceInterval |
int | The interval to take MasterPodLock for MasterPod and FollowerPods. |
MasterPodSettings.IsMasterPodCheckInterval |
int | The check interval for the FollowerPods . The check without intervals causes high CPU usage; because of that, this is needed. |
Redis.Endpoints |
string | The endpoints for the Redis instance. |
Redis.DefaultDatabase |
int | The default database for the Redis instance. |
Redis.Config |
string | The configuration for the Redis instance. |
Redis.Password |
string | The password for the Redis instance. |
Serilog |
object | The configuration for Serilog. |
In the examples folder, you'll find example files for config.json
, secret.json
, and implementation on how to insert messages for each database type
[!WARNING] For the Couchbase, incremental ID is used for the
OutboxEvents
data store. If you want to use the Couchbase, you should use a Counter for the ID. You can find the example code in theCouchbaseExample
class.
Released under the MIT License.
See the CONTRIBUTING file for details.