How to get all the messages from kafka topic and count them using java? -


this code gives me messages beginning , waiting message , it's waiting message

import java.util.hashmap; import java.util.list; import java.util.map; import java.util.properties; import kafka.consumer.consumerconfig; import kafka.consumer.consumeriterator; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumerconnector; import kafka.message.messageandmetadata;  public class testconsumer{  public static void main(string[] args) {     consumerconfig config;     properties props = new properties();      props.put("zookeeper.connect","sandbox.hortonworks.com:2181");     props.put("group.id", "group-4");     props.put("zookeeper.session.timeout.ms", "400");     props.put("zookeeper.sync.time.ms", "200");     props.put("auto.commit.interval.ms", "200");     config = new consumerconfig(props);     consumerconnector consumer = kafka.consumer.consumer.createjavaconsumerconnector             (config);     string topic = "news";      system.out.println("running");     run(consumer,topic);  }  public static void run(consumerconnector consumer,string topic){     hashmap<string,integer> topiccountmap =              new hashmap<string,integer>();     topiccountmap.put(topic, 1);     map<string,list<kafkastream<byte[],byte[]>>>      consumermap = consumer.createmessagestreams(topiccountmap);     kafkastream<byte[],byte[]> stream = consumermap.get(topic).get(0);     consumeriterator<byte[],byte[]> =  stream.iterator();     list<string> msgtopiclist = new arraylist<string>();     int count = 0;     system.out.println("waiting");     while(it.hasnext()){         messageandmetadata<byte[],byte[]> msganddata = it.next();          string msg = new string(msganddata.message());         msgtopiclist.add(msg);         string key = "nokey";         system.out.println(msg);         count++;     } } } 

what have messages topic sent them user , count them

what best way this?

version kafka_2.10-0.8.1.2.2.4.2-2

here example.

the important here kafka consumer configuration properties:

will start beginning of queue.

props.put("auto.offset.reset", "smallest");   

won't store offsets consumer.

props.put("auto.commit.enable", "false"); 

will wait 5 sec messages if no more messages available give up.

props.put("consumer.timeout.ms", "5000"); 

the whole example:

package com.xxx.yyy.zzz;  import java.util.arraylist; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.properties;  import kafka.consumer.consumer; import kafka.consumer.consumerconfig; import kafka.consumer.consumertimeoutexception; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumerconnector;  public class kafkaconsumer {     private final consumerconnector consumer;     private final string topic;     private int count = 0;      public kafkaconsumer(final string zookeeper, final string groupid, final string topic) {         this.consumer = consumer.createjavaconsumerconnector(createconsumerconfig(zookeeper, groupid));         this.topic = topic;     }      // initialize connection properties kafka , zookeeper     private static consumerconfig createconsumerconfig(final string zookeeper, final string groupid) {         properties props = new properties();         props.put("zookeeper.connect", zookeeper);         props.put("group.id", groupid);         props.put("zookeeper.session.timeout.ms", "2000");         props.put("zookeeper.sync.time.ms", "250");         props.put("auto.commit.interval.ms", "1000");         props.put("auto.offset.reset", "smallest");         props.put("auto.commit.enable", "false");         props.put("consumer.timeout.ms", "5000");          return new consumerconfig(props);     }      private void getdata() {         list<byte[]> msgs = new arraylist();         map<string, integer> topicmap = new hashmap<>();          // define single thread topic         topicmap.put(topic, 1);         try {             map<string, list<kafkastream<byte[], byte[]>>> listmap = consumer.createmessagestreams(topicmap);             list<kafkastream<byte[], byte[]>> kafkastreams = listmap.get(topic);              // collect messages.             kafkastreams.foreach(ks -> ks.foreach(mam -> msgs.add(mam.message())));          } catch (consumertimeoutexception exception) {             // there no more messages available -> so, done.              // print messages             msgs.foreach(system.out::println);              // count them             count = msgs.size();         } {             if (consumer != null) {                 consumer.shutdown();             }         }     } } 

Comments

Popular posts from this blog

javascript - Laravel datatable invalid JSON response -

java - Exception in thread "main" org.springframework.context.ApplicationContextException: Unable to start embedded container; -

sql server 2008 - My Sql Code Get An Error Of Msg 245, Level 16, State 1, Line 1 Conversion failed when converting the varchar value '8:45 AM' to data type int -