I figured out the issue.
Basically my table2Mapped which is created from stream2 has some messages
that arrive later than they arrive at  stream1 for same key.
After checking stream to table semantics I found that the left side is
joined to right side only for the record that exist for that key that time
on the right side.
It will not join if records arrive on right side at a later time since
windowed joins are not applicable for stream to table.

Anyway a question her can be does a stream to table window join makes sense
like in this case?

The reason I mapped the stream2 to  table2Mapped because stream2 usually
has only one record per key, in some cases it may have multiple records
with same value for same key.
Hence converting to table made sense as I am only interested in the latest
record for a key.

But I guess if that records arrives later than the some other record in stream1
for same key, it won't get joined.

So now I have switched back to stream to stream window join.

Let me know if there is any other way to handle such a case.

Thanks
Sachin





On Sat, Dec 7, 2019 at 12:08 PM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I am facing some weird problems when joining two streams or a stream/table.
> The joined stream does not contain all the joined records.
>
> Also note that my keys are custom keys for which I have implemented equals
> and hashcode method
> Is there something else also I need to do to ensure key1 === key2
>
> Let me illustrate it with a example:
> //created a new stream with a new custom key
> stream1Mapped = stream1.map((k,v) -> ...)
> //checked the re-partition data and it has 2 records
>
> // created a new stream with a new custom key and converted a stream to
> table using standard way
> table2Mapped = stream2.map((k,v) -> ...).groupByKey().reduce((av, nv) ->
> nv)
> //checked the re-partition and change-log data and it has 3 records
>
> //now I join the stream with table on the new custom key
> joinedStream = stream1Mapped.join(table2Mapped, (lv, rv) -> ..)
>
> //printed the data for stream
> joinedStream.peek((k, v) -> print(v))
> //is called only once ??
>
> This should to be called twice as keys for both the records in the stream
> are there in table too.
>
> Please let me know if I understood the case well enough and if there is
> anything I can do to debug this problem better.
>
> Thanks
> Sachin
>
>
>
>
>

Reply via email to