How to get all the messages from kafka topic and count them using java? -
this code gives me messages beginning , waiting message , it's waiting message
import java.util.hashmap; import java.util.list; import java.util.map; import java.util.properties; import kafka.consumer.consumerconfig; import kafka.consumer.consumeriterator; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumerconnector; import kafka.message.messageandmetadata; public class testconsumer{ public static void main(string[] args) { consumerconfig config; properties props = new properties(); props.put("zookeeper.connect","sandbox.hortonworks.com:2181"); props.put("group.id", "group-4"); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "200"); config = new consumerconfig(props); consumerconnector consumer = kafka.consumer.consumer.createjavaconsumerconnector (config); string topic = "news"; system.out.println("running"); run(consumer,topic); } public static void run(consumerconnector consumer,string topic){ hashmap<string,integer> topiccountmap = new hashmap<string,integer>(); topiccountmap.put(topic, 1); map<string,list<kafkastream<byte[],byte[]>>> consumermap = consumer.createmessagestreams(topiccountmap); kafkastream<byte[],byte[]> stream = consumermap.get(topic).get(0); consumeriterator<byte[],byte[]> = stream.iterator(); list<string> msgtopiclist = new arraylist<string>(); int count = 0; system.out.println("waiting"); while(it.hasnext()){ messageandmetadata<byte[],byte[]> msganddata = it.next(); string msg = new string(msganddata.message()); msgtopiclist.add(msg); string key = "nokey"; system.out.println(msg); count++; } } }
what have messages topic sent them user , count them
what best way this?
version kafka_2.10-0.8.1.2.2.4.2-2
here example.
the important here kafka consumer configuration properties:
will start beginning of queue.
props.put("auto.offset.reset", "smallest");
won't store offsets consumer.
props.put("auto.commit.enable", "false");
will wait 5 sec messages if no more messages available give up.
props.put("consumer.timeout.ms", "5000");
the whole example:
package com.xxx.yyy.zzz; import java.util.arraylist; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.properties; import kafka.consumer.consumer; import kafka.consumer.consumerconfig; import kafka.consumer.consumertimeoutexception; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumerconnector; public class kafkaconsumer { private final consumerconnector consumer; private final string topic; private int count = 0; public kafkaconsumer(final string zookeeper, final string groupid, final string topic) { this.consumer = consumer.createjavaconsumerconnector(createconsumerconfig(zookeeper, groupid)); this.topic = topic; } // initialize connection properties kafka , zookeeper private static consumerconfig createconsumerconfig(final string zookeeper, final string groupid) { properties props = new properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupid); props.put("zookeeper.session.timeout.ms", "2000"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); props.put("auto.commit.enable", "false"); props.put("consumer.timeout.ms", "5000"); return new consumerconfig(props); } private void getdata() { list<byte[]> msgs = new arraylist(); map<string, integer> topicmap = new hashmap<>(); // define single thread topic topicmap.put(topic, 1); try { map<string, list<kafkastream<byte[], byte[]>>> listmap = consumer.createmessagestreams(topicmap); list<kafkastream<byte[], byte[]>> kafkastreams = listmap.get(topic); // collect messages. kafkastreams.foreach(ks -> ks.foreach(mam -> msgs.add(mam.message()))); } catch (consumertimeoutexception exception) { // there no more messages available -> so, done. // print messages msgs.foreach(system.out::println); // count them count = msgs.size(); } { if (consumer != null) { consumer.shutdown(); } } } }
Comments
Post a Comment