kafka-connect-orientdb

Kafka Sink Connect OrientDB https://www.confluent.io/hub/sanjuthomas/kafka-connect-orientdb

MIT License

Stars
10

Overview

Kafka Connect OrientDB is a sink-only connector that pulls messages from Kafka and stores them in OrientDB as JSON documents.

Prerequisites

Apache ZooKeeper and Apache Kafka installed and running in your machine. Please refer to respective sites to download, install, and start ZooKeeper and Kafka.

What is OrientDB

OrientDB is an open-source NoSQL database management system written in Java. It is a multi-model database with supporting graphs, documents, key/values, and object models, but the relationships are managed as in graph databases with direct connections between records. It supports schema-less, schema-full and schema-mixed modes. For more details about OrientDB, please refer to OrientDB's official website.

What is Apache Kafka

Apache Kafka is an open-source stream processing platform developed by the Apache Software Foundation written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. For more details, please refer to kafka home page.

Configuration

Please take a look at the orientdb-sink.properties

name=orientdb-sink
connector.class=com.sanjuthomas.orientdb.OrientDBSinkConnector
tasks.max=10
#topics to consume from [comma separated list for multiple topics]
topics=quote_request,open_weather_data
databaseConfigFileLocation={absolute or relative location of the config files for the topic}
write.retries=2
retry.back.off.seconds=1

Connector expects a .yml file per topic at the location given in the databaseConfigFileLocation. So if your topic name is test, the connector would look for topic.yml file in databaseConfigFileLocation Please take a look at the sample topic to database mapping file given here

connectionString: {OrientDB connection string. eg - remote:hostname}
database: {name of the database. database must exist in the server}
username: {username to connect to database}
password: {pasword to connect to database}
className: {name of the the class to which the json document to be written. If this class does not exist, the connector will create one.}
keyField: {name of the document key/id element/field, please note that this key is not record id. Ideally, this key should be distinct, and a unique index should be in place so that the UPSERT works as expected.}
writeMode: INSERT or UPSERT 

Please create the database in the OrientDB server in advance. The connector will not start if the database is not present.

Write Modes

INSERT - Connector would assume that every message is a new document. In case of duplicate(s), the error is ignored. UPSERT - Insert if new document, and update if the document already exist on the database -> class.

Tested Version

Name Version
Java 11
OrientDB 3.1.10
Apache Kafka 2.12-2.6.0
Apache Zookeeper 3.6.1

Data Mapping

OrientDB can operate both in schema-full and schemaless mode. This Sink Connector assumes that the OrientDB is operating in schemaless mode. Upon receiving a collection of messages from the broker, the connector transformer would transform the message to a format that can be written to OrientDB document store. As of today, this connector supports JSON messages. If anyone wants, I'm happy to write support for other serialization formats, such as Apache Avro.

For stand-alone mode, please copy kafka_home/config/connect-standalone.properties to create kafka_home/config/orientdb-connect-standalone.properties file. Open kafka_home/config/orientdb-connect-standalone.properties and set the following properties to false.

key.converter.schemas.enable=false
value.converter.schemas.enable=false

For distributed mode, please copy kafka_home/config/connect-distributed.properties to create kafka_home/config/orientdb-connect-distributed.properties file. Open kafka_home/config/orientdb-connect-distributed.properties and set the following properties to false.

key.converter.schemas.enable=false
value.converter.schemas.enable=false

In distributed mode, if you run more than one worker per host, the rest.port settings must have different values for each instance. By default, the REST interface is available at 8083.

How to deploy the connector in Kafka

This is maven project. To create an uber jar, execute the following maven goals.

mvn clean install

Copy the artifact kafka-connect-orientdb-x.x.x-SNAPSHOT-shaded.jar to kafka_home/lib folder.

Copy the orientdb-sink.properties file into kafka_home/config folder. Update the content of the property file according to your environment.

How to start the connector in stand-alone mode

Open a shell prompt, move to kafka_home and execute the following.

bin/connect-standalone.sh config/orientdb-connect-standalone.properties config/orientdb-sink.properties

How to start the connector in distributed mode

Open a shell prompt, move to kafka_home and execute the following.

bin/connect-distributed.sh config/orientdb-connect-distributed.properties config/orientdb-sink.properties

Contact

Please send a note to [email protected] or create an issue in GitHub.

License

Please feel free to rip it apart. This is licensed using an MIT license.