GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Eddy G
As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with late data (intentionally stopped my consumer so data has been accumulating for several days now). Now, with the following Window... I'm using Beam 2.27 and Flink 1.12. Window.into(FixedWindow

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský
Hi Eddy, does your data get buffered in a state - e.g. does the size of the state grow over time? Do you see watermark being updated in your Flink WebUI? When a stateful operation (and GroupByKey is a stateful operation) does not output any data, the first place to look at is if watermark cor

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
I believe that by default windows will only trigger one time [1]. This has definitely caught me by surprise before. I think that default strategy might fine for a batch pipeline, but typically does not for streaming (which I assume you’re using because you mentioned Flink). I believe you’ll want

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Eddy G
Hi Jan, Thanks for replying so fast! Regarding your questions, - "Does your data get buffered in a state?" Yes, I do have a state within a stage prior ParquetIO writing together with a Timer with PROCESSING_TIME. The stage which contains the state does send bytes to the next one which is the

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský
Hi Eddy, answers inline. On 6/14/21 3:05 PM, Eddy G wrote: Hi Jan, Thanks for replying so fast! Regarding your questions, - "Does your data get buffered in a state?" Yes, I do have a state within a stage prior ParquetIO writing together with a Timer with PROCESSING_TIME. The stage which co

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Eddy G
Hi Jan, I've added --experiments=use_deprecated_read and it seems to work flawlessly (with my current Window and the one proposed by Evan). Why is this? Do Splittable DoFn now break current implementations? Are there any posts of possible breaking changes? On 2021/06/14 13:19:39, Jan Lukavský

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
There have been varied reports of slowness loosely attributed to SDF default wrapper change from 2.25.0. Ex https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský
Hi Eddy, you are probably hitting a not-yet discovered bug in SDF implementation in FlinkRunner that (under some currently unknown conditions) seems to stop advancing the watermark. This has been observed in one other instance (that I'm aware of). I think we don't yet have a tracking JIRA for

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Robert Bradshaw
+1, we'd really like to get to the bottom of this, so clear instructions on a pipeline/conditions that can reproduce it would be great. On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský wrote: > > Hi Eddy, > > you are probably hitting a not-yet discovered bug in SDF implementation in > FlinkRunner th

End to end unit tests for stateful pipeline

2021-06-14 Thread gaurav mishra
Hi, I have a streaming pipeline which reads from pubsub, enriches data using redis and finally writes to pubsub. The code has some stateful DoFns with timers. I wanted to write unit tests for the whole pipeline, that reads from TestStream<> , enriches data using a mocked redis client, and writes da

Re: Allyship workshops for open source contributors

2021-06-14 Thread Aizhamal Nurmamat kyzy
Thank you all! Based on the feedback, I will set up a session for a couple open source groups. Will share more details soon. Stay tuned. On Mon, Jun 7, 2021 at 4:42 PM Kenneth Knowles wrote: > Yes please! > > On Thu, Jun 3, 2021, 18:32 Ratnakar Malla wrote: > >> +1 >> >> >>

Re: Allyship workshops for open source contributors

2021-06-14 Thread deepak kumar
+1 On Tue, Jun 15, 2021 at 12:31 AM Aizhamal Nurmamat kyzy wrote: > Thank you all! Based on the feedback, I will set up a session for a couple > open source groups. Will share more details soon. Stay tuned. > > On Mon, Jun 7, 2021 at 4:42 PM Kenneth Knowles wrote: > >> Yes please! >> >> On Thu,

Re: End to end unit tests for stateful pipeline

2021-06-14 Thread Luke Cwik
You can create a PipelineOption which represents your Redis client object. For tests you would set the PipelineOption to a serializable fake/mock that can replay the results you want. The default for the PipelineOption object would instantiate your production client. You can see an example usage of

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
@robert I have a pipeline which consistently shows a major slowdown (10 seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be boiled down to: - Read GCS file patterns from PubSub - Window into Fixed windows (repeating every 15 seconds) - Deduplicate/distinct (have tried both) -

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Robert Bradshaw
Is it possible to post the code? (Or the code of a similar, but minimal, pipeline that exhibits the same issues?) On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin wrote: > > @robert I have a pipeline which consistently shows a major slowdown (10 > seconds Vs 10 minutes) between version <=2.23.0 and >

Re: How avoid blocking when decompressing large GZIP files.

2021-06-14 Thread Luke Cwik
Try adding a Reshuffle transform to the pipeline after the ParDo that gives the sequence number. This will cause the data to be materialized and then the subsequent steps happen in parallel. Depending on which IO transform you are using and if splitting support is ever added for compressed files a

CANCELLED: call already cancelled

2021-06-14 Thread Trevor Kramer
Hello, I am trying to run a Beam pipeline on Flink using EMR. I am consistently getting these errors. I found a reference to a bug report that said this issue was fixed in 1.11. I am using 1.12.1. Caused by: org.apache.beam.vendor.grpc.v1p36p0.io.grpc. StatusRuntimeException: CANCELLED: call alre

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
I’ll try to create something as small as possible from the pipeline I mentioned 👍 I should have time this week to do so. Thanks, Evan On Mon, Jun 14, 2021 at 18:09 Robert Bradshaw wrote: > Is it possible to post the code? (Or the code of a similar, but > minimal, pipeline that exhibits the same

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Robert Bradshaw
Awesome, thanks! On Mon, Jun 14, 2021 at 5:36 PM Evan Galpin wrote: > > I’ll try to create something as small as possible from the pipeline I > mentioned 👍 I should have time this week to do so. > > Thanks, > Evan > > On Mon, Jun 14, 2021 at 18:09 Robert Bradshaw wrote: >> >> Is it possible to

Re: End to end unit tests for stateful pipeline

2021-06-14 Thread gaurav mishra
Hi Luke, I tried going down the path which you suggested but hitting some roadblocks. Maybe I am doing something wrong. As you said I created a unit test specific class for PipelineOptions, created a TestRedisFactory which is setup to return a mock instance of RedisClient. In my test code I have `