Hi Elias,

I like the idea of having a trailing / sliding window assigner to perform
your join. However, the result should not be entirely correct wrt your
initial join specification.

Given an events data set which contains the elements e1 = (4000, 1, 1) and
e2 = (4500, 2, 2) and a changes data set which contains the element c1 =
(4000, 1, 1). With the trailing and sliding window assigner of
SlidingEventTimeWindows(4000, 1000) and TrailingEventTimeWindows(4000,
1000), c1 would be assigned to TrailingWindow(1000, 5000). e1 and e2 would
both be amongst others in SlidingWindow(1000, 5000). Thus, the two windows
would be joined. The result would be (e1, c1) and (e2, c1). However, e2
happened after c1.

But if that is ok for your use case, then your solution is great :-)

Cheers,
Till

On Wed, May 25, 2016 at 2:04 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi Elias!
>
> I think you brought up a couple of good issues. Let me try and summarize
> what we have so far:
>
> 1) Joining in a more flexible fashion
>     => The problem you are solving with the trailing / sliding window
> combination: Is the right way to phrase the join problem "join records
> where key is equal and timestamps are within X seconds (millis/minutes/...)
> of each other"?
>     => That should definitely have an API abstraction. The first version
> could me implemented exactly with a combination of sliding and trailing
> windows.
>
>     => For joins between windowed and non windowed streams in the long
> run: Aljoscha posted the Design Doc on side inputs. Would that cover the
> use case as a long-term solution?
>
> 2) Lists that are larger than the memory
>     => The ListState returns an Iterable, but it is eagerly materialized
> from RocksDB. Is there a way to "stream" the bytes from RocksDB? Flink
> could then deserialize them in a streamed fashion as well.
>
> 3) Can you elaborate a bit on the OrderedListState? Do you think of
> multiple values (ordered) per key, or a sequence of key/value pairs,
> ordered by key?
>     => Currently Flink limits the scope of key accesses to the values
> current key (as defined in the keyBy() function). That way, the system can
> transparently redistribute keys when changing the parallelism.
>
> Greetings,
> Stephan
>
>
> On Sat, May 21, 2016 at 12:24 AM, Elias Levy <fearsome.lucid...@gmail.com>
> wrote:
>
>> Till,
>>
>> An issue with your suggestion is that the job state may grow unbounded.
>> You are managing
>> expiration of data from the cache in the operator, but the state is
>> partitioned by the stream key.
>> That means if we no longer observe a key, the state associated with that
>> key will never be
>> removed.
>>
>> In my data set keys come and go, and many will never be observed again.
>> That will lead to
>> continuous state growth over time.
>>
>>
>> On Mon, May 2, 2016 at 6:06 PM, Elias Levy <fearsome.lucid...@gmail.com>
>> wrote:
>>
>>> Thanks for the suggestion.  I ended up implementing it a different way.
>>>
>>> [...]
>>>
>>> On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> orry for the late reply. You're right that with the windowed join you
>>>> would have to deal with pairs where the timestamp of (x,y) is not
>>>> necessarily earlier than the timestamp of z. Moreover, by using sliding
>>>> windows you would receive duplicates as you've described. Using tumbling
>>>> windows would mean that you lose join matches if (x,y) lives in an earlier
>>>> window. Thus, in order to solve your problem you would have to write a
>>>> custom stream operator.
>>>>
>>>> The stream operator would do the following: Collecting the inputs from
>>>> (x,y) and z which are already keyed. Thus, we know that x=z holds true.
>>>> Using a priority queue we order the elements because we don't know how the
>>>> arrive at the operator. Whenever we receive a watermark indicating that no
>>>> earlier events can arrive anymore, we can go through the two priority
>>>> queues to join the elements. The queues are part of the operators state so
>>>> that we don't lose information in case of a recovery.
>>>>
>>>> I've sketched such an operator here [1]. I hope this helps you to get
>>>> started.
>>>>
>>>
>>>
>>>
>>
>

Reply via email to