= clj-kafka-x
A Clojure library for the Apache Kafka (distributed stream-processing software platform).
Uses KF protocol and does not rely on ZooKeeper.
NOTE: Still contains zookeeper as a dependency.
Tries to be as lightweigh as possible thus depends only on
org.apache.kafka/kafka_2.12 "3.4.0"
org.apache.kafka/kafka-clients "3.4.0"
org.apache.zookeeper/zookeeper "3.8.1"
but excluding jms
,jmx*
and logging.
NOTE: Some builds (for instance of v0.4.x
branch) may partially (sometimes even fully) be incompatible with some versions of other libraries that also use NIO! If you’re experiencing build problems and/or your application is unexpectedly crashed on start - try check your project dependencies more deeply, may be you will need to correct existing dependencies version or to add an actual version of full [io.netty/netty-all]
Actual library info:
image:https://img.shields.io/github/license/source-c/clj-kafka-x?style=for-the-badge[GitHub] image:https://img.shields.io/clojars/v/net.tbt-post/clj-kafka-x.svg?style=for-the-badge[] image:https://img.shields.io/clojars/dt/net.tbt-post/clj-kafka-x?style=for-the-badge[ClojarsDownloads] image:https://img.shields.io/github/v/release/source-c/clj-kafka-x?style=for-the-badge[GitHub release (latest by date)] image:https://img.shields.io/github/release-date/source-c/clj-kafka-x?style=for-the-badge[GitHub Release Date] image:https://img.shields.io/github/v/tag/source-c/clj-kafka-x?style=for-the-badge[GitHub tag (latest by date)] image:https://img.shields.io/github/last-commit/source-c/clj-kafka-x?style=for-the-badge[GitHub last commit]
== Installation
Add the following to your http://github.com/technomancy/leiningen[Leiningen's]
project.clj
:
== Usage
=== Producer
(require '[clj-kafka-x.producer :as kp])
=== Consumer
(require '[clj-kafka-x.consumers.simple :as kc])
NOTE: When you use multiple partitions per topic it is required
to specify them explicitly when subscribing, i.e.
(kc/subscribe c [{:topic "topic-a" :partitions #{0 1}} {:topic "topic-b" :partitions #{0 1 2}}])
(ns buzz.consumer.kafka (:require [clj-kafka-x.consumers.simple :as kc] [clojure.tools.logging :as log]))
(defn processor [msg schema] msg) (def schema nil) (def config {"bootstrap.servers" "localhost:9092" "group.id" "consumer-id"})
(defn process-message [msg] (let [{:keys [value topic partition offset]} msg processor processor ;; choose one by topic name schema schema] ;; choose one by topic name (if (fn? processor) (processor value schema) value)))
you may also use specific timeouts form
message count per poll execution may be specified by max.poll.records
field of configuration
== Manual Build
== License
Copyright © 2016-2023
Distributed under the http://www.apache.org/licenses/LICENSE-2.0[Apache License v 2.0]