Re: Generating Hearbeats Using Looping Timer

2022-07-09 Thread Reuven Lax via user
On Fri, Jul 8, 2022 at 1:37 PM gaurav mishra wrote: > Maybe the previous post was too verbose so I will try to summarize my > question - > If one instance of DoFn tries to set a timer for a time which is behind > the pipeline's watermark, can this cause the pipeline to stall for other > keys as w

Re: GroupIntoBatches not working on Flink?

2022-07-26 Thread Reuven Lax via user
This might be a bug in the Flink runner, because it is implemented here . On Tue, Jul 26, 2022 at 9:14 AM Cristian Constantin

Re: Using a non-AutoValue member with AutoValueSchema

2022-08-04 Thread Reuven Lax via user
We do have JavaBeanSchema which might work, depending on whether your thrift class conforms to java beans. On Thu, Aug 4, 2022 at 2:06 PM Binh Nguyen Van wrote: > Hi, > > I have an AutoValue class and it looks like this > > @AutoValue > @DefaultSchema( AutoValueSchema.class ) > public abstract c

Re: Using a non-AutoValue member with AutoValueSchema

2022-08-04 Thread Reuven Lax via user
- I think we should support mixing and matching SchemaProviders > for nested types. > > [1] https://github.com/apache/beam/issues/20359 > > On Thu, Aug 4, 2022 at 2:45 PM Reuven Lax via user > wrote: > >> We do have JavaBeanSchema which might work, depending on wheth

Re: Checkpointing on Google Cloud Dataflow Runner

2022-08-29 Thread Reuven Lax via user
Google Cloud Dataflow does support snapshots . Is this what you were looking for? On Mon, Aug 29, 2022 at 4:04 PM Kenneth Knowles wrote: > Hi Will, David, > > I think you'll find the best source of answer for this sort of question on

Re: Checkpointing on Google Cloud Dataflow Runner

2022-08-30 Thread Reuven Lax via user
Snapshots are expected to happen nearly instantaneously. While processing is paused while the snapshot is in progress, the pause should usually be very brief. It's true that Dataflow does not support automated snapshots - you would have to create them yourself using a cron. Checkpoints on Flink ar

Re: [Question] Using KafkaIO without a data loss

2022-09-25 Thread Reuven Lax via user
On Sun, Sep 25, 2022 at 4:56 AM Yomal de Silva wrote: > Hi all, > > I have started using KafkaIO to read a data stream and have the following > questions. Appreciate it if you could provide a few clarifications on the > following. > > 1. Does KafkaIO ignore the offset stored in the broker and use

Re: [Question] Using KafkaIO without a data loss

2022-09-25 Thread Reuven Lax via user
and B is committed, > will those messages(A) get consumed again from Kafka or will the messages > get recovered from the checkpoint and retried in that specific operator? > > On Sun, Sep 25, 2022 at 10:45 PM Reuven Lax via user > wrote: > >> >> >> On Sun, Sep 25

Re: Why is BigQueryIO.withMaxFileSize() not public?

2022-09-29 Thread Reuven Lax via user
It's not public because it was added for use in unit tests, and modifying this value can have very unexpected results (e.g. making it smaller can trigger a completely different codepath that is triggered when there are too many files, leading to unexpected cost increases in the pipeline). Out of c

Re: Why is BigQueryIO.withMaxFileSize() not public?

2022-09-29 Thread Reuven Lax via user
The default max file size is 4Tib. BigQuery supports files up to 5Tib, but there might be some slop in our file-size estimation which is why Beam set a slightly lower limit. In any case, you won't be able to increase that value by too much, or BigQuery will reject the load job. The default max byt

Re: Staging a PCollection in Beam | Dataflow Runner

2022-10-19 Thread Reuven Lax via user
PCollections's usually are persistent within a pipeline, so you can reuse them in other parts of a pipeline with no problem. There is no notion of state across pipelines - every pipeline is independent. If you want state across pipelines you can write the PCollection out to a set of files which ar

Re: Single side input to multiple transforms

2022-11-07 Thread Reuven Lax via user
Is this a Python job? On Mon, Nov 7, 2022 at 12:38 AM Binh Nguyen Van wrote: > Hi, > > I am writing a pipeline where I have one singleton side input that I want > to use in multiple different transforms. When I run the pipeline in Google > Dataflow I see multiple entries in the logs that have a

Re: Beam saves filepaths in Flink's state

2022-12-08 Thread Reuven Lax via user
This doesn't sound ideal to me. For contrast, Dataflow doesn't save any of these things (coders, transforms, configs) in state, which makes it easier for Dataflow to update pipelines. On Thu, Dec 8, 2022 at 7:48 AM Cristian Constantinescu wrote: > Hi everyone, > > I noticed that the Flink state

Re: getFailedInsertsWithErr and Storage Write API

2023-03-01 Thread Reuven Lax via user
Correct, however if you are using a recent version of Beam you can call WriteResult.getFailedStorageApiInserts On Wed, Mar 1, 2023 at 3:00 PM Matthew Ouyang wrote: > The documentation says WriteResult.getFailedInserts won’t return anything > when used with the Storage Write API ( > https://beam.

Re: Deduplicate usage

2023-03-02 Thread Reuven Lax via user
State is per-key, and keys are distributed across workers. Two workers should not be working on the same state. On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van wrote: > Thank you Ankur, > > This is the current source code of Deduplicate transform. > > Boolean seen = seenState.read(); >

Re: Successful Inserts for Storage Write API?

2023-03-02 Thread Reuven Lax via user
Are you trying to do this in order to use Wait.on? getSuccessfulInserts is not currently supported for Storage Write API. On Thu, Mar 2, 2023 at 1:44 PM Matthew Ouyang wrote: > Thank you to Ahmed and Reuven for the tip on > WriteResult::getFailedStorageApiInserts. > > When I tried to get the suc

Re: Why is FlatMap different from composing Flatten and Map?

2023-03-15 Thread Reuven Lax via user
In Apache Beam, Flatten is a union operation - it takes multiple PCollections (of the same type) and merges them into a single PCollection. On Mon, Mar 13, 2023 at 11:32 AM Godefroy Clair wrote: > Hi, > I am wondering about the way `Flatten()` and `FlatMap()` are implemented > in Apache Beam Pyt

Re: Successful Inserts for Storage Write API?

2023-03-21 Thread Reuven Lax via user
similar with Storage Write. > > On Thu, Mar 2, 2023 at 4:48 PM Reuven Lax via user > wrote: > >> Are you trying to do this in order to use Wait.on? getSuccessfulInserts >> is not currently supported for Storage Write API. >> >> On Thu, Mar 2, 2023 at 1:44 PM

Re: major reduction is performance when using schema registry - KafkaIO

2023-04-09 Thread Reuven Lax via user
How are you using the schema registry? Do you have a code sample? On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov wrote: > Hello, > > I am trying to understand the effect of schema registry on our pipeline's > performance. In order to do sowe created a very simple pipeline that reads > from kafka

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-15 Thread Reuven Lax via user
The maximum parallelism is always determined by the parallelism of your data. If you do a GroupByKey for example, the number of keys in your data determines the maximum parallelism. Beyond the limitations in your data, it depends on your execution engine. If you're using Dataflow, Dataflow is desi

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-16 Thread Reuven Lax via user
t;>> >>> Here's an example of flink >>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level >>> Spark also support to set operator level parallelism (see groupByKey >>> and reduceByKey): >

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-16 Thread Reuven Lax via user
t;> level. And the input size of the operator is unknown at compiling stage if >>>>> it is not a source >>>>> operator, >>>>> >>>>> Here's an example of flink >>>>> https://nightlies.apache.org/flink/flink-docs-ma

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-16 Thread Reuven Lax via user
ve you tried setting spark.sql.adaptive.enabled & >> spark.sql.adaptive.coalescePartitions.enabled >> >> >> >> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user < >> user@beam.apache.org> wrote: >> >>> I see. Robert - what is the story

Re: Loosing records when using BigQuery IO Connector

2023-04-17 Thread Reuven Lax via user
What version of Beam are you using? There are no known data-loss bugs in the connector, however if there has been a regression we would like to address it with high priority. On Mon, Apr 17, 2023 at 12:47 AM Binh Nguyen Van wrote: > Hi, > > I have a job that uses BigQuery IO Connector to write t

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-18 Thread Reuven Lax via user
>>> wrote: >>> >>>> To a (small) degree Sparks “new” AQE might be able to help depending on >>>> what kind of operations Beam is compiling it down to. >>>> >>>> Have you tried setting spark.sql.adaptive.enabled & >>>>

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-21 Thread Reuven Lax via user
I believe you have to call withResults() on the JdbcIO transform in order for this to work. On Fri, Apr 21, 2023 at 10:35 PM Juan Cuzmar wrote: > I hope you all are doing well. I am facing an issue with an Apache Beam > pipeline that gets stuck indefinitely when using the Wait.on transform > alo

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
1, element); > }) > .withResults() > ); > result.apply(Wait.on(insert)) > .apply("Selecting", new SomeTransform()) > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > > htt

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
}) > .withResults() > ); > result.apply(Wait.on(insert)) > .apply("Selecting", new SomeTransform()) > .apply("PubsubMessaging", ParDo.of(new NextTransformer())); > p.run(); > > update

Re: Apache Beam pipeline stuck indefinitely using Wait.on transform with JdbcIO

2023-04-22 Thread Reuven Lax via user
r 22, 2023 at 10:28 AM Juan Cuzmar wrote: > I'm developing with direct runner. but should go to dataflow when > deployed. > > > Original Message ---- > On Apr 22, 2023, 13:13, Reuven Lax via user < user@beam.apache.org> wrote: > > > What runner are

Re: How can we get multiple side inputs from a single pipeline ?

2023-08-28 Thread Reuven Lax via user
This looks fine. One caveat: there currently appears to be a bug in Beam when you apply a combiner followed by View.asSingleton. I would recommend replacing these lines: .apply(Latest.globally()) .apply(View.asSingleton()) With the following: .apply(Reify.timestamps()) .apply(Combine.globally(Lat

Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Reuven Lax via user
Creating composite DoFns is tricky today due to how they are implemented (via annotated methods). However providing such a method to compose DoFns would be very useful IMO. On Fri, Sep 15, 2023 at 9:33 AM Joey Tran wrote: > Yeah for (1) the concern would be adding a shuffle/fusion break and (2)

Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Reuven Lax via user
Correct - I was referring to Java. On Fri, Sep 15, 2023 at 9:55 AM Robert Bradshaw wrote: > On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user > wrote: > >> Creating composite DoFns is tricky today due to how they are implemented >> (via annotated methods). >> &g

Re: @FieldAccess parameter types not being enforced vs corresponding schema types in Java DoFn

2023-09-18 Thread Reuven Lax via user
Good question - I know it will be enforced at runtime (I think you'll get a ClassCastException if things don't match) but I'd have to check to see if we enforce it at graph-submission time. If we don't have this validation in place, adding it would be an improvement. On Mon, Sep 18, 2023 at 3:04 

Re: [QUESTION] Why no auto labels?

2023-10-01 Thread Reuven Lax via user
Are you talking about transform names? The main reason was because for runners that support updating pipelines in place, the only way to do so safely is if the runner can perfectly identify which transforms in the new graph match the ones in the old graph. There's no good way to auto generate names

Re: simplest way to do exponential moving average?

2023-10-02 Thread Reuven Lax via user
On Mon, Oct 2, 2023 at 2:00 AM Jan Lukavský wrote: > Hi, > > this depends on how exactly you plan to calculate the average. The > original definition is based on exponentially decreasing weight of more > distant (older if time is on the x-axis) data points. This (technically) > means that this av

Re: Questions about writing to BigQuery using storage api

2023-12-07 Thread Reuven Lax via user
This is the stack trace of the rethrown exception. The log should also contain a "caused by" log somewhere detailing the original exception. Do you happen to have that? On Thu, Dec 7, 2023 at 8:46 AM hsy...@gmail.com wrote: > Here is the complete stacktrace It doesn't even hit my code and it >

Re: [Dataflow][Java][2.52.0] Upgrading to 2.52.0 Surfaces Pubsub Coder Error

2023-12-12 Thread Reuven Lax via user
Are you setting the enable_custom_pubsub_source experiment by any chance? On Tue, Dec 12, 2023 at 3:24 PM Evan Galpin wrote: > Hi all, > > When attempting to upgrade a running Dataflow pipeline from SDK 2.51.0 to > 2.52.0, an incompatibility warning is surfaced that prevents pipeline > upgrade:

Re: Using Dataflow with Pubsub input connector in batch mode

2024-01-18 Thread Reuven Lax via user
Some comments here: 1. All messages in a PubSub topic is not a well-defined statement, as there can always be more messages published. You may know that nobody will publish any more messages, but the pipeline does not. 2. While it's possible to read from Pub/Sub in batch, it's usually not rec

Re: Using Dataflow with Pubsub input connector in batch mode

2024-01-21 Thread Reuven Lax via user
> is a bounded input. > > _/ > _/ Alex Van Boxel > > > On Fri, Jan 19, 2024 at 12:18 AM Reuven Lax via user > wrote: > >> Some comments here: >>1. All messages in a PubSub topic is not a well-defined statement, as >> there can always be more messages p

Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Reuven Lax via user
Can you explain the use case a bit more? In order to write a SQL statement (at least one that doesn't use wildcard selection) you also need to know the schema ahead of time. What are you trying to accomplish with these dynamic schemas? Reuven On Sun, Jan 28, 2024 at 2:30 AM Sigalit Eliazov wrote

Re: usage of dynamic schema in BEAM SQL

2024-01-28 Thread Reuven Lax via user
t; I would like to be able to define the sql query via configuration. > In addition in our use case the kafka message schema and the row schema > are pretty much the same. So i wonder if i could reuse it. > > Thanks > Sigalit > > בתאריך יום א׳, 28 בינו׳ 2024, 20:23, מאת Reu

Re: Some events are discarded from a FixedWindow

2024-02-21 Thread Reuven Lax via user
On Wed, Feb 21, 2024 at 12:39 PM Ifat Afek (Nokia) via user < user@beam.apache.org> wrote: > Hi, > > > > We have a Beam-SQL pipeline on top of Flink, that once in 5 min gets a > bunch of events from Kafka and should execute an SQL command on a 1-hour > window. Some of the events arrive late. > > I

Re: KV with AutoValueSchema

2024-04-04 Thread Reuven Lax via user
There are some sharp edges unfortunately around auto-inference of KV coders and schemas. Is there a previous PCollection of type SharedCoreEvent, or is the SharedCoreEvent created in ProcessEvents? On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas wrote: > Hello guys > > I have a question, is it possi

Re: KV with AutoValueSchema

2024-04-04 Thread Reuven Lax via user
wrote: > ProcessEvents receive as an input a Session object and créate a KV SharedCoreEvent> as an output > > El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user < > user@beam.apache.org> escribió: > >> There are some sharp edges unfortunately around auto-in

Re: How to handle Inheritance with AutoValueSchema

2024-04-09 Thread Reuven Lax via user
I don't see any unit tests for inherited AutoValue accessors, so I suspect it simply does not work today with AutoValueSchema. This is something that's probably fixable (though such a fix does risk breaking some users). On Mon, Apr 8, 2024 at 11:21 PM Ruben Vargas wrote: > Hello Guys > > I have

Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Reuven Lax via user
There are various strategies. Here is an example of how Beam does it (taken from Reshuffle.viaRandomKey().withNumBuckets(N) Note that this does some extra hashing to work around issues with the Spark runner. If you don't care about that, you could implement something simpler (e.g. initialize shard

Re: Questions about file_upload method in BigQueryIO

2024-10-08 Thread Reuven Lax via user
;>>>>> The temp tables are only created if file sizes are too large for a >>>>>> single load into BQ (if you use an AVRO formatter you might be able to >>>>>> reduce file size enough to avoid this). In this case, Beam will issue a >>>>>

Re: Questions about file_upload method in BigQueryIO

2024-10-08 Thread Reuven Lax via user
n this case, Beam will issue a >>>> copy job to copy all the temp tables to the final table. >>>> >>>> On Wed, Oct 2, 2024 at 2:42 PM hsy...@gmail.com >>>> wrote: >>>> >>>>> @Reuven Lax I do see the file_upload create

Re: Questions about file_upload method in BigQueryIO

2024-10-08 Thread Reuven Lax via user
2024 at 2:42 PM hsy...@gmail.com wrote: >> >>> @Reuven Lax I do see the file_upload create tons of >>> temp tables, but when does BQ load temp tables to the final table? >>> >>> On Wed, Oct 2, 2024 at 1:17 PM Reuven Lax via user >>> wrote: &g

Re: Questions about file_upload method in BigQueryIO

2024-10-06 Thread Reuven Lax via user
> > On Wed, Oct 2, 2024 at 1:17 PM Reuven Lax via user > wrote: > >> File load does not return per-row errors (unlike storage API which does). >> Dataflow will generally retry the entire file load on error (indefinitely >> for streaming and up to 3 times for batch). You

Re: Questions about file_upload method in BigQueryIO

2024-10-02 Thread Reuven Lax via user
File load does not return per-row errors (unlike storage API which does). Dataflow will generally retry the entire file load on error (indefinitely for streaming and up to 3 times for batch). You can look at the logs to find the specific error, however it can be tricky to associate it with a specif

Re: File_Upload even works? and how do I even debug ?

2024-10-03 Thread Reuven Lax via user
Copy jobs are not always generated. If the files are within the limits of a single load, we issue a load job to load them straight into BQ. We only issue a copy job if the data is too large for a single load. I that case we load into multiple temporary tables and then copy to the final table. Reuv

Re: Beam dropping events from Kafka after reshuffle ?

2024-09-18 Thread Reuven Lax via user
How are you doing this aggregation? On Wed, Sep 18, 2024 at 3:11 PM Lydian Lee wrote: > Hi Jan, > > Thanks for the recommendation. In our case, we are windowing with the > processing time, which means that there should be no late event at all. > > You’ve mentioned that GroupByKey is stateful and

Re: Beam dropping events from Kafka after reshuffle ?

2024-09-18 Thread Reuven Lax via user
ions. Kafka have 16 partition, but we only want to > generate 2 files every minute > ) > | "Group by randomly-assigned integer key" >> beam.GroupByKey() > | "Abandon Dummy Key" >> beam.MapTuple(lambda key, val: val) > | "Writing event data batches

Re: Beam dropping events from Kafka after reshuffle ?

2024-09-23 Thread Reuven Lax via user
imestamp" >> beam.Map(lambda event: >>> window.TimestampedValue(event, time.time())) >>> | "Window into Fixed Intervals" >> beam.WindowInto( >>> beam.transforms.window.FixedWindows(fixed_window_size), # >>> fixed_window_size is 1 min. >

Re: Beam dropping events from Kafka after reshuffle ?

2024-09-23 Thread Reuven Lax via user
; duplicates of data written, but we are more concerned about the missing > data, which is the issue we are facing right now. > > On Mon, Sep 23, 2024 at 11:54 AM Reuven Lax via user > wrote: > >> Do you close the write afterwards? If not, I wonder if you could lose >> rec

Re: IllegalArgumentException: Received null value for non-nullable when useBeamSchema to write beam row directly to bigquery

2024-10-01 Thread Reuven Lax via user
or > beam row format > > On Tue, Oct 1, 2024 at 8:13 AM Reuven Lax via user > wrote: > >> Can you explain what you are trying to do here? BigQuery requires schema >> to be known before we write. Beam schemas similarly must be known at graph >> construction time - thoug

Re: IllegalArgumentException: Received null value for non-nullable when useBeamSchema to write beam row directly to bigquery

2024-10-01 Thread Reuven Lax via user
Can you explain what you are trying to do here? BigQuery requires schema to be known before we write. Beam schemas similarly must be known at graph construction time - though this isn't quite the same as Java compile time. Reuven On Tue, Oct 1, 2024 at 12:44 AM hsy...@gmail.com wrote: > I mean

Re: Questions about file_upload method in BigQueryIO

2024-10-02 Thread Reuven Lax via user
All data is not kept in memory. However if you have too few shards and writing the files is slow, data has to be in memory while the file write is in process. On Wed, Oct 2, 2024 at 11:16 AM hsy...@gmail.com wrote: > We are trying to process over 150TB data(streaming unbound) per day and > save

Re: [Question] Best Practices for Managing Persistent State with Bigtable in Streaming Beam Pipelines

2025-04-29 Thread Reuven Lax via user
Pipeline state persists across pipeline updates - i.e. if you update the job to. a new one. If you cancel the job and restart, then you generally lose the state. Writing to an external store such as BigTable from your DoFn can be tricky both from a performance perspective and a correctness perspec

Re: Catching Up with a Streaming Pipeline

2025-05-21 Thread Reuven Lax via user
If you create a Pub/Sub subscription before you start the Beam pipeline, the subscription will capture all of those messages (as long as you start the Beam pipeline within 7 days). You can then start the Beam pipeline against that subscription, which should do what you want. On Fri, May 16, 2025 a

Re: Question: Best Practice for periodic file flushing in streaming DoFn

2025-08-01 Thread Reuven Lax via user
This is incorrect - FinishBundleContext is only valid inside of finishBundle. You cannot save it beyond the method scope. Have you looked at the existing files sinks? Do those not work for your use case? On Fri, Aug 1, 2025 at 9:29 AM Jin An via user wrote: > Hi Beam Community, > > I'm reac