Hi Jessy,

I had this issue as well, here's the resolution
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Statefun-2-2-2-Checkpoint-restore-NPE-td44022.html>.
I ended up forking the version of statefun I used and removing the null
check to default to empty string, but I'm going to switch to the solution
Igal suggested.

Thanks,

Tim

On Thu, Jun 10, 2021 at 10:46 AM Jessy Ping <tech.user.str...@gmail.com>
wrote:

> Hi all,
>
> I am trying to consume data from azure eventhub using the kafka ingress
> and i am getting the following error.
>
> *java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress
> requires a UTF-8 key set for each record.*
>
> While sending the data to the Event hub using my data producer, I am not
> sending it with any KEY. And the same data can be consumed without any
> issues with a normal flink application .
>
> I can see the error is raising from
> RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()
>
> Why is this check important and how to resolve this for eventhubs.?
>
> private byte[] requireNonNullKey(byte[] key) {
>   if (key == null) {
>     IngressType tpe = 
> ProtobufKafkaIngressTypes.ROUTABLE_PROTOBUF_KAFKA_INGRESS_TYPE;
>     throw new IllegalStateException(
>         "The "
>             + tpe.namespace()
>             + "/"
>             + tpe.type()
>             + " ingress requires a UTF-8 key set for each record.");
>   }
>   return key;
> }
>
> Thanks
> Jessy
> On Thu, 10 Jun 2021 at 20:13, Jessy Ping <tech.user.str...@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I am trying to consume data from azure eventhub using the kafka ingress
>> and i am getting the following error.
>>
>> *java.lang.IllegalStateException: The io.statefun.kafka/ingress ingress
>> requires a UTF-8 key set for each record.*
>>
>> While sending the data to the Event hub using my data producer, I am not
>> sending it with any KEY. And the same data can be consumed without any
>> issues with a normal flink application .
>>
>> I can see the error is raising from
>> RoutableProtobufKafkaIngressDeserializer.requireNonNullKey()
>>
>>
>>

Reply via email to