Globally will not perform as well because you reduce the inherent level of
parallelism.

Lets tackle this one problem and you can see if you can apply the same
principles to the other problems:
- Any time we see a record (call this type A) with record[“id”] == 1 &&
record[“field_6”] == “some_value” *not* followed by a record (call this
type B) with record[“id”] == 2 && record[“field_7”] == “other_value” in the
subsequent 10 minutes.

One idea:
If type A and type B records are rare. You can use session windows with a
gap duration of 10 minutes. Whenever you see a record of type A or type B,
you convert them to a KV<common key, record data>. You pass this through a
GBK which will produce a KV<common key, iterable<record data>> that is
guaranteed to have all the records which of type A and type B that are
within 10 minutes of each other. Then you scan through the iterable and
output all records of type A that are not followed by a record of type B
within 10 minutes. The reasoning why they need to be rare is that you don't
want the session to continue forever.

Another idea:
Convert all type A and type B records to be a KV<common key, record> using
a sliding window of size 20 mins being output every 10 mins. pass these
through GBK, and similarly as above scan through the iterale output all
records of type A that are not followed by a record of type B within 10
minutes.




On Wed, Dec 21, 2016 at 2:56 PM, Ray Ruvinskiy <ray.ruvins...@arcticwolf.com
> wrote:

> The records have a property value in common, yes. For example,
> record[“record_kind”] == “foo” or record[“record_kind”] == “bar.” However,
> I’d be curious if the answer changes if I wanted to do this globally for
> the whole stream.
>
> Thanks!
>
> Ray
>
> From: Lukasz Cwik <lc...@google.com>
> Reply-To: "u...@beam.incubator.apache.org" <u...@beam.incubator.apache.org
> >
> Date: Wednesday, December 21, 2016 at 5:52 PM
> To: "u...@beam.incubator.apache.org" <u...@beam.incubator.apache.org>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> My first question was about how do you know two or more records are
> related or is this global for the entire stream?
>
> The reason I was asking about whether you can map the qualifiers onto a
> fixed set of states is because I was wondering if there was a way to either
> use the State API (WIP https://issues.apache.org/jira/browse/BEAM-25) and
> timers API (WIP https://issues.apache.org/jira/browse/BEAM-27) and just
> transition between a fixed number of states or create composite session
> keys based upon the "id" plus some small set of qualifiers and do a GBK to
> do a join.
>
> In this example, how do you know the two records are related to each other
> (do the share a common attribute or can a common attribute be computed)?
> - Any time we see a record with record[“id”] == 1 && record[“field_6”] ==
> “some_value” *not* followed by a record with record[“id”] == 2 &&
> record[“field_7”] == “other_value” in the subsequent 10 minutes.
>
>
>
> On Wed, Dec 21, 2016 at 2:14 PM, Ray Ruvinskiy <
> ray.ruvins...@arcticwolf.com> wrote:
> I’m unsure about your first question. Are you asking whether there’s an
> attribute that all the records have in common?
>
> I think I’m looking for more flexibility than a fixed set of values but
> perhaps I’m overlooking something. To flesh out the example, let’s say the
> records are JSON documents, with fields. So, to express my examples again,
> I want to know:
> - Any time we see record_1[“type”] == “type1” && record_1[“field1”] ==
> “value1”, followed within no more than a minute by record_2[“type”] ==
> “type1” && record_2[“field2”].contains(“some_substring”), followed within
> no more than 5 minutes by record_3[“type”] == “type2” && record_3[“field3”]
> == “value3”
> - Any time we see N records where record[“id”] == 123 within 5 hours of
> each other, followed by another record with record[“id”] == 456 no more
> than an hour later than the group of N records
> - Any time we see a record with record[“id”] == 1 && record[“field_6”] ==
> “some_value” *not* followed by a record with record[“id”] == 2 &&
> record[“field_7”] == “other_value” in the subsequent 10 minutes.
>
> If data is late, *ideally* it’s taken into account, but we don’t need to
> account for data being late for an arbitrary amount of time. We can say
> that if a data is, for instance, less than an hour later it should be taken
> into account, but if it’s more than an hour late we can ignore it.
>
> Thanks!
>
> Ray
>
> From: Lukasz Cwik <lc...@google.com>
> Reply-To: "u...@beam.incubator.apache.org" <u...@beam.incubator.apache.org
> >
> Date: Wednesday, December 21, 2016 at 4:47 PM
> To: "u...@beam.incubator.apache.org" <u...@beam.incubator.apache.org>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> Do the records have another attribute Z which joins them all together?
> Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of
> values like enums or can be mapped onto a certain number of states (like an
> attribute A > 50 can be mapped onto a state "exceeds threshold")?
> For your examples, what should occur when there is late data in your three
> scenarios?
>
>
> On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <
> ray.ruvins...@arcticwolf.com> wrote:
> Hello,
>
> I am trying to figure out if Apache Beam is the right framework for my use
> case. I have an unbounded stream, and there are a number of questions I
> would like to ask regarding the records in the stream:
>
> - For example, one question may be trying to find a record with attribute
> A followed within no more than a minute by a record with attribute B
> followed within no more than 5 minutes by a record with attribute C.
> - Another question may be trying to find a sequence of at least N records
> with attribute X within 5 hours of each other, followed by an record with
> attribute Y no more than an hour later.
> - A third question would be seeing if there exist a record with attribute
> K *not* followed by a record with attribute L in the next 10 minutes.
>
> Every time I encounter the pattern of records I’m looking for, I would
> like to perform an action. If I understand the Beam model correctly, each
> question would correspond to a separate pipeline I would create, and it
> also sounds like I’m looking for session windows. However, I’m assuming I
> would need to tee the input source to all the separate pipelines? I have
> tried to look for documentation and/or examples on whether Apache Beam can
> be used to express such a setup and how to do it if so, but I haven’t been
> able to find anything concrete. Any help would be appreciated.
>
> Thanks!
>
> Ray
>
>
>
>
>
>
>

Reply via email to