On Tue, May 25, 2021 at 11:14 AM Sozonoff Serge <se...@sozonoff.com> wrote:
> Hi, > > Thanks for the clarification. > > What is an issue with applying windowing/triggering strategy for your case? > > > The problem was actually not the trigger but the whole approach I took. > > > I guess fundamentally the whole issue for me boils down to the fact the > with bound pipelines we have quite a few approaches which can be taken to > enrich data and with unbound pipelines we have very few. Obviously in a > bound pipeline you dont really need to worry about refreshing your > enriching data either since its all built when the pipeline launches. > > So, I had this perfectly working batching pipeline and everything fell > apart when it became unbound. In an ideal world we could mix an unbound > pipeline with a bound pipeline. The requirement was fairly simple, process > a bunch of CSV lines when a new file arrives. The only unbound element in > this pipeline is when the file arrives and what its path and name are. From > the moment the file becomes available everything else is batch processing. > > This left me looking at options for streaming pipelines. > > The Side input approach seemed to fit the best but since I needed a > refreshing side input I quickly stumbled over the fact that you can't use > the Beam connectors for this and you need to write your own code to fetch > the data!! To me it made no sense, Beam has all these IO connectors but I > cant use them for my side input! > > It could be that with IO connectors which implement ReadAll my statement > is no longer true (I did not find any examples), I have not tested it but > in any case I would have needed DynamoDBIO which does not implement ReadAll. > I have a long standing MR that implements readAll for the Cassandra API, and our company is using it in production for both batch and streaming jobs. We use it in streaming mode to enrich data being read off of a redis stream (similar to a Kafka stream). I'm hoping it gets merged soon. It shouldn't be hard to add similar functionality to the DynamoDB client. So after having spent the weekend playing around with various thoughts and > ideas I did end up coding a lookup against DynamoDB in a DoFn, using the > AWS SDK :-) > > Kind Thanks, > Serge > > > > > On 25 May 2021 at 18:18:31, Alexey Romanenko (aromanenko....@gmail.com) > wrote: > > You don’t need to use windowing strategy or aggregation triggers for a > pipeline with bounded source to perform GbK-like transforms, but since you > started to use unbounded source then your pcollections became unbounded and > you need to do that. Otherwise, it’s unknown at which point of time your > GbK transforms will have all data arrived to process it (in theory, it will > never happened because of “unbounded” definition). > > What is an issue with applying windowing/triggering strategy for your case? > > — > Alexey > > On 24 May 2021, at 10:25, Sozonoff Serge <se...@sozonoff.com> wrote: > > Hi, > > Referring to the explanation found at the following link under (Stream > processing triggered from an external source) > > https://beam.apache.org/documentation/patterns/file-processing/ > > > While implementing this solution I am trying to figure out how to deal > with the fact that my pipeline, which was bound, has now become unbound. It > exposes me to windowing/triggering concerns which I did not have de deal > with before and in essence are unnecessary since I am still fundamentally > dealing with bound data. The only reason I have an unbound source involved > is as a trigger and provider of the file to be processed. > > Since my pipeline uses GroupByKey transforms I get the following error. > > Exception in thread "main" java.lang.IllegalStateException: GroupByKey > cannot be applied to non-bounded PCollection in the GlobalWindow without a > trigger. Use a Window.into or Window.triggering transform prior to > GroupByKey. > > Do I really need to add windowing/triggering semantics to the PCollections > which are built from bound data ? > > Thanks for any pointers. > > Serge > > > ~Vincent