Hi Jessy, I had this issue as well, here's the resolution <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefun-2-2-2-Checkpoint-restore-NPE-td44022.html>. I ended up forking the version of statefun I used and removing the null check to default to empty string, but I'm going to switch to the solution Igal suggested.
Thanks, Tim On Thu, Jun 10, 2021 at 10:46 AM Jessy Ping <tech.user.str...@gmail.com> wrote: > Hi all, > > I am trying to consume data from azure eventhub using the kafka ingress > and i am getting the following error. > > *java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress > requires a UTF-8 key set for each record.* > > While sending the data to the Event hub using my data producer, I am not > sending it with any KEY. And the same data can be consumed without any > issues with a normal flink application . > > I can see the error is raising from > RoutableProtobufKafkaIngressDeserializer.requireNonNullKey() > > Why is this check important and how to resolve this for eventhubs.? > > private byte[] requireNonNullKey(byte[] key) { > if (key == null) { > IngressType tpe = > ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE; > throw new IllegalStateException( > "The " > + tpe.namespace() > + "/" > + tpe.type() > + " ingress requires a UTF-8 key set for each record."); > } > return key; > } > > Thanks > Jessy > On Thu, 10 Jun 2021 at 20:13, Jessy Ping <tech.user.str...@gmail.com> > wrote: > >> Hi all, >> >> I am trying to consume data from azure eventhub using the kafka ingress >> and i am getting the following error. >> >> *java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress >> requires a UTF-8 key set for each record.* >> >> While sending the data to the Event hub using my data producer, I am not >> sending it with any KEY. And the same data can be consumed without any >> issues with a normal flink application . >> >> I can see the error is raising from >> RoutableProtobufKafkaIngressDeserializer.requireNonNullKey() >> >> >>