How to get an OffSet from a ByteBufferMessageSet returned by Simple Consumer of Kafka in Java? -


consider following code:

public static long offset = 0l; fetchrequest req = new fetchrequest(kafkaproperties.topic, 0, offset,10485760); bytebuffermessageset messageset = simpleconsumer.fetch(req); 

the question how last offset , set variable offset read next batch of data kafka?


update: when print data i.e.:

for (messageandoffset messageandoffset : messageset) {              system.out.println(messageandoffset); } 

the output follows:

messageandoffset(message(magic = 1, attributes = 0, crc = 2000130375, payload = java.nio.heapbytebuffer[pos=0 lim=176 cap=176]),296215) messageandoffset(message(magic = 1, attributes = 0, crc = 956398356, payload = java.nio.heapbytebuffer[pos=0 lim=196 cap=196]),298144) .... .... messageandoffset(message(magic = 1, attributes = 0, crc = 396743887, payload = java.nio.heapbytebuffer[pos=0 lim=179 cap=179]),299136) 

the docs says last number offset

messageandoffset(message: message, offset: long) 

that in above case, last offset read 299136

does ? 1 bad thing loop forever.

    long offset = 0;      while (true) {         fetchrequest fetchrequest = new fetchrequest(topicname, 0, offset, 10485760);          bytebuffermessageset messages = consumer.fetch(fetchrequest);         (messageandoffset msg : messages) {             system.out.println("consumed: " + utils.tostring(msg.message().payload(), "utf-8"));             offset = msg.offset();         }      } 

also in 0.8 kafka simpleconsumer example, have thing below

    long numread = 0;     (messageandoffset messageandoffset : fetchresponse.messageset(a_topic, a_partition)) {           long currentoffset = messageandoffset.offset();           if (currentoffset < readoffset) {              system.out.println("found old offset: " + currentoffset + " expecting: " + readoffset);              continue;           }           readoffset = messageandoffset.nextoffset();           bytebuffer payload = messageandoffset.message().payload();            byte[] bytes = new byte[payload.limit()];           payload.get(bytes);           system.out.println(string.valueof(messageandoffset.offset()) + ": " + new string(bytes, "utf-8"));           numread++;           a_maxreads--;     } 

but mentioned application expects a_maxread(maximum number of messages read) parameter passed argument don’t loop forever. new kafka , not sure if looking for.


Comments

Popular posts from this blog

javascript - Laravel datatable invalid JSON response -

java - Exception in thread "main" org.springframework.context.ApplicationContextException: Unable to start embedded container; -

sql server 2008 - My Sql Code Get An Error Of Msg 245, Level 16, State 1, Line 1 Conversion failed when converting the varchar value '8:45 AM' to data type int -