I have been using the kafka connector sucessfully for a while now.  But, am 
getting weird results in one case.

I have a test that submits 3 streams to kafka topics, and monitors them on a 
separate process.  The flink job has a source for each topic, and one such is 
fed to 3 separate map functions that lead to other operators.  This topic only 
shows 6097 out of 30000 published, and the map functions following the source 
only show a fraction of that as received.  The consumer is configured to start 
at the begining and in other cases the same code receives all messages 
published.  The parallelism is 6 if that makes a difference, as is the 
partitioning on the topics.

The code for creating the topic is below.

Any suggestions on why it is missing so many messages would be welcome.

Michael

      String topic = a.kafkaTopicName(ets);
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", servers);
      props.setProperty("group.id", UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      DataStream<String> ds = consumers.get(a.eventType);
      if (ds == null) {
        FlinkKafkaConsumer011<String> cons = new FlinkKafkaConsumer011<String>(
            topic, new SimpleStringSchema(), props);
        cons.setStartFromEarliest();
        ds = env.addSource(cons).name(et.name).rebalance();
        consumers.put(a.eventType, ds);
      }

Reply via email to