clj-kafka-x

Clojure client for Apache Kafka

APACHE-2.0 License

Downloads
4.1K
Stars
6
Committers
4

= clj-kafka-x

A Clojure library for the Apache Kafka (distributed stream-processing software platform).

Uses KF protocol and does not rely on ZooKeeper.

NOTE: Still contains zookeeper as a dependency.

Tries to be as lightweigh as possible thus depends only on

  • org.apache.kafka/kafka_2.12 "3.4.0"
  • org.apache.kafka/kafka-clients "3.4.0"
  • org.apache.zookeeper/zookeeper "3.8.1"

but excluding jms,jmx* and logging.

NOTE: Some builds (for instance of v0.4.x branch) may partially (sometimes even fully) be incompatible with some versions of other libraries that also use NIO! If you’re experiencing build problems and/or your application is unexpectedly crashed on start - try check your project dependencies more deeply, may be you will need to correct existing dependencies version or to add an actual version of full [io.netty/netty-all]

Actual library info:

image:https://img.shields.io/github/license/source-c/clj-kafka-x?style=for-the-badge[GitHub] image:https://img.shields.io/clojars/v/net.tbt-post/clj-kafka-x.svg?style=for-the-badge[] image:https://img.shields.io/clojars/dt/net.tbt-post/clj-kafka-x?style=for-the-badge[ClojarsDownloads] image:https://img.shields.io/github/v/release/source-c/clj-kafka-x?style=for-the-badge[GitHub release (latest by date)] image:https://img.shields.io/github/release-date/source-c/clj-kafka-x?style=for-the-badge[GitHub Release Date] image:https://img.shields.io/github/v/tag/source-c/clj-kafka-x?style=for-the-badge[GitHub tag (latest by date)] image:https://img.shields.io/github/last-commit/source-c/clj-kafka-x?style=for-the-badge[GitHub last commit]

== Installation

Add the following to your http://github.com/technomancy/leiningen[Leiningen's] project.clj:

[source,clojure]

[net.tbt-post/clj-kafka-x "0.7.5"]

== Usage

=== Producer

[source,clojure]

(require '[clj-kafka-x.producer :as kp])

(with-open [p (kp/producer {"bootstrap.servers" "localhost:9092"}
(kp/string-serializer)
(kp/string-serializer))]
@(kp/send p (kp/record "topic-a" "Hi there!")))

=== Consumer

[source,clojure]

(require '[clj-kafka-x.consumers.simple :as kc])

(with-open [c (kc/consumer {"bootstrap.servers" "localhost:9092"
"group.id" "consumer-id"}
(kc/string-deserializer)
(kc/string-deserializer))]
(kc/subscribe c "topic-a")
(kc/messages c))

NOTE: When you use multiple partitions per topic it is required to specify them explicitly when subscribing, i.e. (kc/subscribe c [{:topic "topic-a" :partitions #{0 1}} {:topic "topic-b" :partitions #{0 1 2}}])

.Real-life (almost) example
[source,clojure]

(ns buzz.consumer.kafka (:require [clj-kafka-x.consumers.simple :as kc] [clojure.tools.logging :as log]))

(defn processor [msg schema] msg) (def schema nil) (def config {"bootstrap.servers" "localhost:9092" "group.id" "consumer-id"})

(defn process-message [msg] (let [{:keys [value topic partition offset]} msg processor processor ;; choose one by topic name schema schema] ;; choose one by topic name (if (fn? processor) (processor value schema) value)))

(defn consume []
(with-open [c (kc/consumer config
(kc/byte-array-deserializer)
(kc/byte-array-deserializer))]
(kc/subscribe c (config/kafka-topics))
(let [pool (kc/messages c)]
(doseq [message pool]
(log/warn (process-message message))))))

you may also use specific timeouts form

[source, clojure]

(defn- consume [instance process-message]
(when-let [co (kc/consumer config
(kc/byte-array-deserializer)
(kc/byte-array-deserializer))
messages (kc/messages
co
:timeout (:request-timeout-ms config))]
(doall (map process-message messages))))

message count per poll execution may be specified by max.poll.records field of configuration

== Manual Build

[source,text]

$ lein install

== License

Copyright © 2016-2023

Distributed under the http://www.apache.org/licenses/LICENSE-2.0[Apache License v 2.0]

Package Rankings
Top 19.94% on Clojars.org