package kafka_lwt
OCaml bindings for Kafka, Lwt bindings
Install
Dune Dependency
Authors
Maintainers
Sources
kafka-0.5.tbz
sha256=7ec32681c104062a4fad43e2736e206128eb88273118071a044081abbc082255
sha512=7485d83cb20705f21b39c7e40cc6564cee30dba2c1993dc93c2791b4527488a33ef557e9fdaa47d3c0777986468f42460bb16c4d6d4076b1c43443f2cb5c6e3f
README.md.html
OCaml bindings for Kafka
Pre-requisites
OCaml (
ocaml-version >= "4.02.3"
)librdkafka (
version >= 0.8.6
)
License
MIT License
Install
$ opam install kafka
From source:
$ make # use dune
$ make test # assuming kafka is running at localhost:9092 with a 'test' topic.
$ make install # use opam
Usage
#use "topfind";;
#require "kafka";;
(* Prepare a producer handler. *)
let producer = Kafka.new_producer ["metadata.broker.list","localhost:9092"];;
let producer_topic = Kafka.new_topic producer "test" ["message.timeout.ms","10000"];;
(* Prepare a consumer handler *)
let consumer = Kafka.new_consumer ["metadata.broker.list","localhost:9092"];;
let consumer_topic = Kafka.new_topic consumer "test" ["auto.commit.enable","false"];;
let partition = 0;;
let timeout_ms = 1000;;
(* Start collecting messages *)
(* Here we start from offset_end, i.e. we will consume only messages produced from now. *)
Kafka.consume_start consumer_topic partition Kafka.offset_end;;
(* Produce some messages *)
Kafka.produce producer_topic partition "message 0";;
Kafka.produce producer_topic partition "message 1";;
Kafka.produce producer_topic partition "message 2";;
(* Consume messages *)
let rec consume t p = match Kafka.consume ~timeout_ms t p with
| Kafka.Message(_,_,_,msg,_) -> msg
| Kafka.PartitionEnd(_,_,_) -> consume t p
| exception Kafka.Error(Kafka.TIMED_OUT,_) ->
(Printf.fprintf stderr "Timeout after: %d ms\n%!" timeout_ms; consume t p)
in
let msg = consume consumer_topic partition in assert (msg = "message 0");
let msg = consume consumer_topic partition in assert (msg = "message 1");
let msg = consume consumer_topic partition in assert (msg = "message 2");
(* Stop collecting messages. *)
Kafka.consume_stop consumer_topic partition;;
(* Topics, consumers and producers must be released. *)
Kafka.destroy_topic producer_topic;;
Kafka.destroy_handler producer;;
Kafka.destroy_topic consumer_topic;;
Kafka.destroy_handler consumer;;
Documentation
The API is documented in lib/kafka.mli, and the Lwt extension is documented in lib_lwt/kafka_lwt.mli.
See bin/tail_kafka_topic.ml for a consumer using queues, batches and lwt. See bin/sendto_kafka_topic.ml for a producer.
For Async producer and consumer, see test_async/producer/producer.ml and test_async/consumer/consumer.ml.
The configuration options for producers, consumers and topics are inherited from librdkafka/CONFIGURATION.
sectionYPositions = computeSectionYPositions($el), 10)"
x-init="setTimeout(() => sectionYPositions = computeSectionYPositions($el), 10)"
>