How to get an OffSet from a ByteBufferMessageSet returned by Simple Consumer of Kafka in Java? -
consider following code:
public static long offset = 0l; fetchrequest req = new fetchrequest(kafkaproperties.topic, 0, offset,10485760); bytebuffermessageset messageset = simpleconsumer.fetch(req);
the question how last offset , set variable offset
read next batch of data kafka?
update: when print data i.e.:
for (messageandoffset messageandoffset : messageset) { system.out.println(messageandoffset); }
the output follows:
messageandoffset(message(magic = 1, attributes = 0, crc = 2000130375, payload = java.nio.heapbytebuffer[pos=0 lim=176 cap=176]),296215) messageandoffset(message(magic = 1, attributes = 0, crc = 956398356, payload = java.nio.heapbytebuffer[pos=0 lim=196 cap=196]),298144) .... .... messageandoffset(message(magic = 1, attributes = 0, crc = 396743887, payload = java.nio.heapbytebuffer[pos=0 lim=179 cap=179]),299136)
the docs says last number offset
messageandoffset(message: message, offset: long)
that in above case, last offset read 299136
does ? 1 bad thing loop forever.
long offset = 0; while (true) { fetchrequest fetchrequest = new fetchrequest(topicname, 0, offset, 10485760); bytebuffermessageset messages = consumer.fetch(fetchrequest); (messageandoffset msg : messages) { system.out.println("consumed: " + utils.tostring(msg.message().payload(), "utf-8")); offset = msg.offset(); } }
also in 0.8 kafka simpleconsumer example, have thing below
long numread = 0; (messageandoffset messageandoffset : fetchresponse.messageset(a_topic, a_partition)) { long currentoffset = messageandoffset.offset(); if (currentoffset < readoffset) { system.out.println("found old offset: " + currentoffset + " expecting: " + readoffset); continue; } readoffset = messageandoffset.nextoffset(); bytebuffer payload = messageandoffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); system.out.println(string.valueof(messageandoffset.offset()) + ": " + new string(bytes, "utf-8")); numread++; a_maxreads--; }
but mentioned application expects a_maxread
(maximum number of messages read) parameter passed argument don’t loop forever. new kafka , not sure if looking for.
Comments
Post a Comment