Hi Reme,

The code you provided seems good to me. Maybe you can add some logs in the
getKey() and join() function for debug purpose to observe whether there was
any successfully joined record. By the way, the metrics in WebUI dashboard
might be of good help.

Best,
Shuiqiang

Reme Ajayi <remeaj...@gmail.com> 于2023年2月16日周四 22:20写道:

> Hi,
> I am trying to join two Kafka Data Streams from and output to another
> Kafka topic, however my joined stream does not output any data.  After some
> time, my program crashes and runs out of memory, which I think is a result
> of the join not working. My code doesn't throw any errors, but the joins
> don't produce any output. My join logic is below, please suggest possible
> solutions.P.S:
> Things I have tried so far:
>
>    1. Increased task slots on the task manager
>    2. Added Watermarks to my Kafka sources
>
>  DataStream<Enhanced> joinedStream = EntriesStream.join(historyStream)
>                               .where(new KeySelector<GenericRecord, String>() 
> {
>
>                                       @Override
>                                       public String getKey(GenericRecord 
> value) throws Exception {
>                                               return 
> value.get("la_id").toString();
>
>                                       }
>                               }).equalTo(new KeySelector<GenericRecord, 
> String>() {
>
>                                       @Override
>                                       public String getKey(GenericRecord 
> value) throws Exception {
>                                               return 
> value.get("id").toString();
>                                       }
>                               
> }).window(TumblingEventTimeWindows.of(Time.seconds(30)))
>                               .apply(new JoinFunction<GenericRecord, 
> GenericRecord, Enhanced>() {
>
>
>                                       @Override
>                                       public Enhanced join(GenericRecord 
> first, GenericRecord second) throws Exception {
>                                               return new Enhanced(
>                                                               
> Long.parseLong(first.get("c_at").toString()),
>                                                               
> first.get("c_type").toString(),
>                                                               
> first.get("id").toString(),
>                                                               
> Integer.parseInt(first.get("d_cts").toString()),
>                                                               
> Integer.parseInt(first.get("c_cts").toString()),
>                                                               
> second.get("prov").toString(),
>                                                               
> second.get("bb_S_T").toString(),
>                                                               
> second.get("p_id").toString(),
>                                                               
> second.get("s_ccurr").toString()
>                                               );
>                                       }
>                               });
>
>

Reply via email to