Knime Kafka nodes - kafka version

Hi,
Trying to import data with the new kafka nodes. Appear to connect to the Kafka broker but trying to consumer data results in no data returned.

What version of Kafka are the kafka nodes built against? The kafka broker I am using is 0.10

Hi @markdoutre,
the Kafka Nodes are based on the current release Kafka 1.1.

best Mareike

2 Likes

Hi @markdoutre,

I just tested the new Kafka nodes with a 0.10 Broker and did not run into any problems. However, since it is very likely that my cluster configuration is different to yours this is not prove anything.

KNIME assumes that every message contains a timestamp, which to the best of my knowledge is the default since Kafka 0.10.

Maybe your problem is related to one of the following issues:

Prerequisite:

  • The Kafka Connector node can establish a connection to your cluster/borkers

Guess 1: Kafka Consumer cannot consume messages due to “high” latency.

  1. Increase the Poll timeout (ms) and decrease Max number of messages per poll

Guess 2: No message available

  1. Ensure that since the last execution of the Kafka Consumer messages have been sent to the specified Topic(s) and that no consumer belonging to the same Group ID (Consumer group) already consumed them.

  2. If Guess2.1. does not work and you are sure that messages have been sent to the selected Topic(s) please try the following. Create a new consumer group by entering a random Group_ID (if you are allowed to). Next go to the Advanced Settings and add the key auto.offset.reset and change its value to earliest.

I really hope that this solves your problem and please keep me posted.

Best
Mark

1 Like

@Mark_Ortmann Eventually got this to work.
Had the Kafka Consumer connected to the broker but was unable to consume any messages. Tried setting the auto.offset.reset to earliest but still got nothing. However, when I connected a producer to the broker and send through data to the topic, I received data at the consumer - the new data.

Checked using the normal Kafka-console-consumer tool, with --from-beginning and that consumed data from the start of the topic, so data is still persisted in the Kafka broker. So, it suggests to me that the auto.offset.reset value is not doing anything - the consumer node only sees new data.

I’m using HDP 2.6 and Kafka 0.10.

Mark

@markdoutre when you used auto.offset.reset earliest did you also create a new Consumer group?

Btw. there is a difference between auto.offset.reset and the –from-beginning argument used by the Kafka-console-consumer.

  • –form-beginning ignores the offset for your Consumer group, which is stored by Kafka/Zookeeper and just starts reading from the beginning of each of the (topic) partitions. Currently the only way to reproduce this behavior is by creating new consumer groups and using auto.offset.reset = earliest.
  • auto.offset.reset only applies if Kafka/Zookeeper does not provide an initial offset for your Consumer group. This happens when your Consumer group consumes for the first time from that topic, or the current offset does not exist anymore on the server. In this case earliest will set the offset to the oldest/first available entry for each of the (topic) partitions. The default latest will result in an offset for each (topic) partition that is higher than the offset of the newest message in that (topic) partition. Therefore, when you create a new Consumer group and use latest no messages can be consumed until new messages have been written by a producer to that topic, however using earliest you’ll be able to consume all messages.

Example:
Let’s assume you have a topic that contains a certain number of messages and now you consume messages from that topic with a new Consume group and you set auto.offset.reset = latest. Then Kafka/Zookeeper will initialize the offsets for that Consumer group such that each of these offset is by one larger than the offset of the newest/last message for that (topic) partition. Executing the consumer will result in an empty table. Changing, after this initial execution, auto.offset.reset to earliest and executing the consumer once more will also result in an empty table. However, this is expected behavior since Kafka/Zookeeper already has offsets stored for that Consumer group and consequently the auto.offset.reset option is ignored. Still if you would have used earliest for the initial execution the resulting table would contain messages.

Please let me know if this explains your problems.

Looking forward to hearing from you
Mark

P.S. I really like your name :smiley:

1 Like

@Mark_Ortmann Got it. Just tried it with a new consumer group and client name and it seems to have sorted itself out. Can now consume data as I would expect.
Many thanks

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.