Re: Is there a way to decide what RDDs get cached in the Spark Runner?

2019-05-15 Thread Jan Lukavský
Hi, I think this thread is another manifestation of a problem discussed recently in [1]. Long story short - users in certain situations might legitimately need finer control over how is their pipeline translated into runner's operators. The case of caching is another one, where looking at the

Re: Is there a way to decide what RDDs get cached in the Spark Runner?

2019-05-15 Thread Jan Lukavský
The DAG formatted strangely, the intended formatting was PCollectionA -> PCollectionB -> PCollectionC     \-> PCollectionD Jan On 5/15/19 11:19 AM, Jan Lukavský wrote: Hi, I think this thread is another manifestation of a problem discussed recently in

Re: Why is my RabbitMq message never acknowledged ?

2019-06-13 Thread Jan Lukavský
Hi Nicolas, what runner do you use? Have you configured checkpoints (if it is one that needs checkpoints to be configured - e.g. Flink)? Jan On 6/13/19 3:47 PM, Nicolas Delsaux wrote: I'm having big troubles reading data from RabbitMQ. To understand my troubles, i've simplified my previous

Re: Design question regarding streaming and sorting

2019-08-26 Thread Jan Lukavský
Hi, I have seen cases like that, and my general conclusion is that Beam (or Flink for that matter) is not the best match for solving this. Let me explain:  a) Beam (and Flink) tries hard to deliver exactly-once semantics for the data processing and this inevitably introduces some latency  

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-19 Thread Jan Lukavský
Hi Ken, I have seen some deadlock behavior with custom sources earlier (different runner, but that might not be important). Some lessons learned:  a) please make sure your advance() or start() methods do not block, that will cause issues and possibly deadlocks you describe  b) if you want t

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-19 Thread Jan Lukavský
Barr wrote: The Start() seems to be working so focus on advance(). Is there any way to prove if I am blocked in advance() for dataflow runner? I have been through code and cannot see anything. But I know that does not mean much. Ken On 2019/09/19 14:57:09, Jan Lukavský wrote: Hi Ken, I

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-20 Thread Jan Lukavský
Yes, that should be fine. But what do you mean by re-entrant in this context? All accesses to reader should be single-threaded. On 9/20/19 6:11 PM, Ken Barr wrote: Is the IO SDK re-entrant? Is it safe to call advance() from within start()? On 2019/09/19 14:57:09, Jan Lukavský wrote: Hi Ken

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-25 Thread Jan Lukavský
timed manor and properly returning false when read timeout. I have implemented a set of statistic to enumerate advance() behavior, number of times it successfully reads a message, fails to read a message etc. On 2019/09/19 19:45:49, Jan Lukavský wrote: You can ssh to the dataflow worker and

Re: # of Readers < MaxNumWorkers for UnboundedSource. Causing deadlocks

2019-09-26 Thread Jan Lukavský
UnboundedSource splits? On 2019/09/25 08:13:43, Jan Lukavský wrote: I have a feeling there has to be something going on wrong with your source. If I were you, I would probably do the following:  - verify that I never produce an UnboundedSource split with no queue associated (Preconditions.checkState

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-26 Thread Jan Lukavský
Hi Gershi, could you please outline the pipeline you are trying to execute? Basically, you cannot iterate the Iterable multiple times in single ParDo. It should be possible, though, to apply multiple ParDos to output from GroupByKey. Jan On 9/26/19 3:32 PM, Gershi, Noam wrote: Hi, I want

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Jan Lukavský
. Is this something that Beam on Spark Runner never supported? On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote: Hi Gershi, could you please outline the pipeline you are trying to execute? Basically, you cannot iterate the Iterable multiple

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Jan Lukavský
imes. Jan On 9/27/19 9:27 AM, Jan Lukavský wrote: I pretty much think so, because that is how Spark works. The Iterable inside is really an Iterator, which cannot be iterated multiple times. Jan On 9/27/19 2:00 AM, Lukasz Cwik wrote: Jan, in Beam users expect to be able to iterate the

Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-30 Thread Jan Lukavský
ors larger than available memory by paging them in from shuffle, which still allows for reiterating. It sounds like Spark is less flexible here? Reuven On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský mailto:je...@seznam.cz>> wr

Re: Iterating filtered Grouped elements on SparkRunner

2019-10-24 Thread Jan Lukavský
Hi Noam, we are working towards fixing this bug so that your code would work, but that will not be sooner than version 2.18.0 (and I cannot promise even that :-)). In the mean time, you have several options:   a) use merging windowing - that will unfortunately mean some performance penalty a

Re: Iterating filtered Grouped elements on SparkRunner

2019-10-24 Thread Jan Lukavský
it with merging-windows (just for knowledge) ? *From:*[seznam.cz] Jan Lukavský *Sent:* Thursday, October 24, 2019 2:19 PM *To:* user@beam.apache.org *Subject:* Re: Iterating filtered Grouped elements on SparkRunner Hi Noam, we are working towards fixing this bug so that your code would work

Re: Dropping expired sessions with Apache Beam

2020-02-05 Thread Jan Lukavský
Hi Juliana, I'm not quite familiar with the python SDK, so I can give just a generic advise. The problem you describe seems to be handled well via stateful dofn [1], where you would hold last timestamp of event per session and setup a timer on each incoming event to the expiration time (if the

Re: Non-trivial joins examples

2020-05-01 Thread Jan Lukavský
Interestingly, I'm currently also working on a proposal for generic join semantics. I plan to send a proposal for review, but unfortunately, there are still other things keeping me busy. I take this opportunity to review high-level thoughts, maybe someone can give some points. The general idea

Re: Beam 2.25.0 / Flink 1.11.2 - Job failing after upgrading from 2.24.0 / Flink 1.10.2

2020-11-04 Thread Jan Lukavský
Hi Tobias, this looks like a bug, the clearGlobalState method has been introduced in 2.25.0, and it (seems to) might have issues related to rocksdb, can you file a Jira for that, please? Thanks,  Jan On 11/4/20 9:50 AM, Kaymak, Tobias wrote: When running our Kafka-To-BigQuery pipeline with

Re: Question regarding GoupByKey operator on unbounded data

2020-12-14 Thread Jan Lukavský
Hi, I think what you might be looking for is "stateful processing", please have a look at [1]. Note that input to stateful DoFn must be of type KV, which then ensures similar behavior to Flink's keyBy. Best,  Jan [1] https://beam.apache.org/blog/stateful-processing/ On 12/13/20 6:27 AM, Ta

Re: Looping timer, global windows, and direct runner

2021-01-12 Thread Jan Lukavský
Hi Raman, can you share the details of the pipeline? How exactly are you using the looping timer? Timer as described in the linked blog post should be deterministic even when the order of the input elements is undefined. Does you logic depend on element ordering?  Jan On 1/12/21 3:18 PM, Ra

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Jan Lukavský
that the timestamp should have been 07:30:00.000Z. Is there something wrong in my pipeline that is causing this non-deterministic behavior? Thanks, Raman On Tue, Jan 12, 2021 at 9:47 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote: Hi Raman, can you share the details of

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Jan Lukavský
On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote: Hi, yes, there is a possible non-determinism, that is related to the timestamp combiner. Timestamp combiners combine only elements, that are not 'late' ([1]), meaning that their ti

Re: Looping timer, global windows, and direct runner

2021-01-13 Thread Jan Lukavský
ra...@gmail.com>> wrote: (Replying to Reza) Yes, I am using TestStream for my unit test. Other replies below. On Wed, Jan 13, 2021 at 3:40 AM Jan Lukavský mailto:je...@seznam.cz>> wrote: Hi, yes, there is a possible non-determinism, that is related to

Re: General guidance

2021-03-29 Thread Jan Lukavský
Hi Julius, which version of Beam do you run? There has been a fix for 2.25.0 [1] which could address what you see.  Jan [1] https://issues.apache.org/jira/browse/BEAM-10760 On 3/25/21 8:11 PM, Kenneth Knowles wrote: This is a Beam issue indeed, though it is an issue with the FlinkRunner. So

Re: Beam 2.28.0 objectReuse and fasterCopy for FlinkPipelineOption

2021-04-09 Thread Jan Lukavský
Hi Eleanore, the --fasterCopy option disables clone between operators (see [1]). It should be safe to use it, unless your pipeline outputs an object and later modifies the same instance. This is generally not supported by the Beam model and is considered to be an user error. FlinkRunner histo

Re: Beam 2.28.0 objectReuse and fasterCopy for FlinkPipelineOption

2021-04-12 Thread Jan Lukavský
py vs objectReuse option? Eleanore On Fri, Apr 9, 2021 at 11:53 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote: Hi Eleanore, the --fasterCopy option disables clone between operators (see [1]). It should be safe to use it, unless your pipeline outputs an object and late

Re: Getting null pointer exception in a basic setup, don't know why

2021-05-06 Thread Jan Lukavský
Hi Teodor, can you share (maybe github link, if you have it in public repo) the implementation of CountSource and Printer? What changed in Beam 2.25.0 (if I recall correctly) is how Read transform is translated. It uses SDF now, so there might be something that was broken before, but the chang

Re: DirectRunner, Fusion, and Triggers

2021-05-17 Thread Jan Lukavský
Hi Bashir, the behavior you describe should be expected. DirectRunner splits the input work into bundles, processing each bundle might result in zero, one or more new bundles. The executor executes the work associated with these bundles, enqueuing new bundles into a queue, until there are no

Re: DirectRunner, Fusion, and Triggers

2021-05-17 Thread Jan Lukavský
ing of log messages. GBK is a stateful operation that has to wait for a trigger - in simple batch case the trigger is the end of input, which is why you cannot see outputs of GBK being interleaved with reading inputs. All inputs have had to be read before GBK can proceed and output any bundl

Re: KafkaIO with DirectRunner is creating tons of connections to Kafka Brokers

2021-05-24 Thread Jan Lukavský
Hi Serge, I posted answer to the SO question, hope that helps. One question - a frequent creation of consumers should be expected with DirectRunner, but there should be only a limited number of them at a time. Do you see many of them present simultaneously? Or are they correctly closed and rel

Re: KafkaIO with DirectRunner is creating tons of connections to Kafka Brokers

2021-05-24 Thread Jan Lukavský
:35:50, Jan Lukavský (je...@seznam.cz <mailto:je...@seznam.cz>) wrote: Hi Serge, I posted answer to the SO question, hope that helps. One question - a frequent creation of consumers should be expected with DirectRunner, but there should be only a limited number of them at a time. Do yo

Re: [DISCUSS] Drop support for Flink 1.10

2021-05-31 Thread Jan Lukavský
Hi, +1 to remove the support for 1.10.  Jan On 5/28/21 10:00 PM, Ismaël Mejía wrote: Hello, With Beam support for Flink 1.13 just merged it is the time to discuss the end of support for Flink 1.10 following the agreed policy on supporting only the latest three Flink releases [1]. I would

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský
Hi Eddy, does your data get buffered in a state - e.g. does the size of the state grow over time? Do you see watermark being updated in your Flink WebUI? When a stateful operation (and GroupByKey is a stateful operation) does not output any data, the first place to look at is if watermark cor

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský
e using KinesisIO for reading messages. Kinesis uses UnboundedSource, which is expended to SDF starting from Beam 2.25.0. The flag should change that as well. Can you try the --experiments=use_deprecated_read and see if you Pipeline DAG changes (should not contain Impulse transform at the begin

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský
mentations? Are there any posts of possible breaking changes? On 2021/06/14 13:19:39, Jan Lukavský wrote: Hi Eddy, answers inline. On 6/14/21 3:05 PM, Eddy G wrote: Hi Jan, Thanks for replying so fast! Regarding your questions, - "Does your data get buffered in a state?" Yes, I

Re: Using Beam to generate unique ids with unbounded sources

2021-07-22 Thread Jan Lukavský
Hi Cristian, I didn't try that, so I'm not 100% sure it would work, but you probably could try using custom timestamp policy for the KafkaIO, which will shift the timestamp to BoundedWindow.TIMESTAMP_MAX_VALUE, once you know you reached head of the state topic. That would probably require read

Re: Kafka IO using Python 3.8, Beam 2.31, and Flink 1.13; NoClassDefFoundError HpackDecoder

2021-08-26 Thread Jan Lukavský
Hi Jeremy, unfortunately, there are several bugs affecting KafkaIO with Python on FlinkRunner in current releases.  a) there are some limitations to portable SDF support on Flink [1]  b) the use_deprecated_read flag cannot be passed to ExpansionService, that is fixed for upcoming 2.32.0 in [

Terasort-like pipeline

2017-07-19 Thread Jan Lukavský
Hi all, I'm trying to get better understanding of Beam's internals for the sake of integration with Euphoria API as a DSL ([1]), and while trying to wrap Euphoria's abstractions of outputs, I came across a little issue, that I'm currently a little stuck with. The issue is not important to thi

Re: Terasort-like pipeline

2017-07-21 Thread Jan Lukavský
doesn't support global sorting, [1] discusses in detail that you might find useful. [1] https://lists.apache.org/thread.html/bc0e65a3bb653b8fd0db96bcd4c9da5af71a71af5a5639a472167808@1464278191@%3Cdev.beam.apache.org%3E On 19 July 2017 at 02:45, Jan Lukavský <mailto:je...@seznam.cz>

Re: Will Beam add any overhead or lack certain API/functions available in Spark/Flink?

2019-05-02 Thread Jan Lukavský
Hi, I'd say that what Pankaj meant could be rephrased as "What if I want to manually tune or tweak my Pipeline for specific runner? Do I have any options for that?". As I understand it, currently the answer is, no, PTransforms are somewhat hardwired into runners and the way they expand cannot

Re: Will Beam add any overhead or lack certain API/functions available in Spark/Flink?

2019-05-02 Thread Jan Lukavský
Just to clarify - the code I posted is just a proposal, it is not actually possible currently. On 5/2/19 11:05 AM, Jan Lukavský wrote: Hi, I'd say that what Pankaj meant could be rephrased as "What if I want to manually tune or tweak my Pipeline for specific runner? Do I have a

Re: Will Beam add any overhead or lack certain API/functions available in Spark/Flink?

2019-05-02 Thread Jan Lukavský
he current solution is to adapt/extend FlinkRunner (possibly forking code) to understand this operation and its substitution. On Thu, May 2, 2019 at 11:09 AM Jan Lukavský wrote: Just to clarify - the code I posted is just a proposal, it is not actually possible currently. On 5/2/19 11:05 AM, Ja

Re: Will Beam add any overhead or lack certain API/functions available in Spark/Flink?

2019-05-02 Thread Jan Lukavský
rather than using the generic pair-with-random-key implementation). If you really want to do this for MyFancyFlinkOperator, the current solution is to adapt/extend FlinkRunner (possibly forking code) to understand this operation and its substitution. On Thu, May 2, 2019 at 11:09 AM Jan Lukavský wr

Re: Will Beam add any overhead or lack certain API/functions available in Spark/Flink?

2019-05-03 Thread Jan Lukavský
theory able to do via some annotations of sources, but the fundamental question here is - do you really want to do that? Or just let the user perform some hard coding when he knows that it might help in his particular case (possible even corner case)? Jan Cheers, Max On 02.05.19 22:44, Ja

Re: Kafka IO using Python 3.8, Beam 2.31, and Flink 1.13; NoClassDefFoundError HpackDecoder

2021-08-26 Thread Jan Lukavský
ners one running the python SDK harness and the other running the java SDK harness. So the environment config would need to be different for the two languages. Thanks J J On Thu, Aug 26, 2021 at 3:14 AM Jan Lukavský <mailto:je...@seznam.cz>> wrote: Hi Jeremy, unfortunately, the

Re: Kafka IO using Python 3.8, Beam 2.31, and Flink 1.13; NoClassDefFoundError HpackDecoder

2021-08-30 Thread Jan Lukavský
hank you very much for the pointers. I'm working on getting the code built from 2.33 branch and trying that out. J On Thu, Aug 26, 2021 at 6:35 AM Jan Lukavský mailto:je...@seznam.cz>> wrote: Hi Jeremy,

Re: [GENERAL QUESTION] How independent are worker nodes

2021-09-07 Thread Jan Lukavský
Hi Ana, in general, worker nodes do not share any state, and cannot themselves decide which work to accept and which to reject. How the work is distributed to downstream processing is defined by a runner, not the Beam model. On the other hand, what you ask for might be possibly accomplished u

Re: [GENERAL QUESTION] How independent are worker nodes

2021-09-07 Thread Jan Lukavský
/org.crossflow.tests/src/org/crossflow/tests/opinionated> On Tue, 7 Sept 2021 at 13:57, Jan Lukavský <mailto:je...@seznam.cz>> wrote: Hi Ana, in general, worker nodes do not share any state, and cannot themselves decide which work to accept and which to reject. How the work

Re: Using Beam to generate unique ids with unbounded sources

2021-09-10 Thread Jan Lukavský
evious run by looking them up from an external source. 1: https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java <https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java> On Thu, Jul 22, 2021 at 7:40 AM

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread Jan Lukavský
Hi Sandeep, a few questions:  a) which state backend do you use for Flink?  b) what is your checkpointingInterval set for FlinkRunner?  c) how much data is there in your input Kafka topic(s)? FileIO has to buffer all elements per window (by default) into state, so this might create a high pressu

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-15 Thread Jan Lukavský
deep *From: *Jan Lukavský *Date: *Tuesday, September 14, 2021 at 10:47 AM *To: *"user@beam.apache.org" *Cc: *user *Subject: *Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format This email is from an external sender. Hi Sandeep, a few questions:  a) which state b

Re: [Question] How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution

2021-09-22 Thread Jan Lukavský
Hi, are you using KafkaIO? If yes, then you can enable offsets commit in bundle finalize via [1]. Note on the other hand, that KafkaIO stores offsets in checkpoint, so - provided you run your Beam Pipeline on a runner with enabled checkpointing - it should not be necessary to commit offsets t

Re: [Question] How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution

2021-09-22 Thread Jan Lukavský
not tell for sure how it works in Spark, but the good news is that it should not matter to the user code.  Jan Best Regards, *From: *Jan Lukavský *Reply-To: *"user@beam.apache.org" *Date: *Wednesday, September 22, 2021 at 4:39 PM *To: *"user@beam.apache.org" *Subject:

Importing dependencies of Python Pipeline

2021-09-23 Thread Jan Lukavský
Hi, I'm facing issues importing dependencies of my Python Pipeline. I intend to use gRPC to communicate with remote RPC service, hence I have the following project structure: script.py     | service_pb2.py     | service_pb2_grpc.py I created setup.py with something like setup(name

Re: Importing dependencies of Python Pipeline

2021-09-23 Thread Jan Lukavský
Oops, sorry, the illustration of the three files is wrong. It was meant to be src/  | script.py  | service_pb2.py  | service_pb2_grpc.py The three files are in the same directory. On 9/23/21 3:08 PM, Jan Lukavský wrote: Hi, I'm facing issues importing dependencies

Re: Importing dependencies of Python Pipeline

2021-09-24 Thread Jan Lukavský
the --setup_file works fine, but there is more general problem (or misunderstanding from my side) with importing modules. Can this be runner-dependent? I use FlinkRunner and submit jobs with --flink_submit_uber_jar, could there be the problem?  Jan On 9/23/21 3:12 PM, Jan Lukavský wrote:

Re: Importing dependencies of Python Pipeline

2021-09-25 Thread Jan Lukavský
ps://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors https://cloud.google.com/dataflow/docs/resources/faq#how_do_i_handle_nameerrors On 9/24/21 6:14 PM, Robert Bradshaw wrote: On Fri, Sep 24, 2021 at 6:33 AM Jan Lukavský wrote: +dev I hit very similar issue even with stand

Re: How can I gracefully stop unbounded KafkaIO consumer?

2021-09-30 Thread Jan Lukavský
Hi Marco, what is your intention? You want to upgrade the pipeline? Flink uses checkpoints / savepoints (see [1]), so cancelling pipeline to savepoint and then resuming from the savepoint should be safe. Another option is to enable offset commit to Kafka via [2]. That way you should be able to

Re: How can I gracefully stop unbounded KafkaIO consumer?

2021-09-30 Thread Jan Lukavský
streaming in logs) I've tried 'stopping' the job via the REST API but it gives a response like "the module is already in Finished state. Not stopping". It is correct in that one of my two pipeline stages is finished but one is in RUNNING. Any tips to clean this mess up

Re: side input terribly slow when using Flink runner

2021-10-12 Thread Jan Lukavský
Hi Stefan, could you verify what is the coder you use for the PCollection, which you materialize as side-input? I'm not sure from the flame-graph itself, but could it be SerializableCoder?  Jan On 10/12/21 12:23 PM, Stefan Wachter wrote: Hi, I have a pipeline where are PCollection is fed a

Re: Performance of Apache Beam

2021-10-18 Thread Jan Lukavský
Hi Azhar, -dev +user this kind of question cannot be answered in general. The overhead will depend on the job and the SDK you use. Using Java SDK with (classical) FlinkRunner should give the best performance on Flink, although the ov

Re: [Question] Beam+Python+Flink

2021-10-18 Thread Jan Lukavský
Hi Chiara, environment_type LOOPBACK is meant for local execution only. The default is docker, which is not ideal when you use docker-compose (docker in docker), so the other option is to use EXTERNAL environment. With this environment, you must manually start the Python SDK harness as a sepa

Re: Stateful processing of session data in order

2021-10-18 Thread Jan Lukavský
Hi Fabian, you can use (experimental) @RequiresTimeSortedInput [1] annotation for that. I believe it should be supported by Dataflow in batch mode (because as you noticed Dataflow sorts inputs to stateful DoFn in batch by default; maybe someone can correct me if I'm wrong). It is supported by

Re: Beam on Flink runner not able to advance watermarks on a high load

2021-11-16 Thread Jan Lukavský
Hi Sandeep, - dev@beam The watermark estimation itself should not be related to load. Can you please clarify, if  a) you are using any custom timestamp policy?  b) you see any backpressure in Flink's UI? Backpressure could - under some circumstances - cause del

Re: Beam on Flink runner not able to advance watermarks on a high load

2021-11-18 Thread Jan Lukavský
slots. Thanks, Sandeep *From: *Jan Lukavský *Reply-To: *"user@beam.apache.org" *Date: *Tuesday, November 16, 2021 at 3:11 AM *To: *"user@beam.apache.org" *Subject: *Re: Beam on Flink runner not able to advance watermarks on a high load This email is from an external

Re: Beam on Flink runner not able to advance watermarks on a high load

2021-11-30 Thread Jan Lukavský
Thanks, Sandeep *From: *Jan Lukavský *Date: *Thursday, November 18, 2021 at 4:03 AM *To: *"Kathula, Sandeep" , "user@beam.apache.org" *Subject: *Re: Beam on Flink runner not able to advance watermarks on a high load This email is from an external sender. Hi Sa

Re: Problem with windowing in python

2022-01-17 Thread Jan Lukavský
Hi Giovanni, one thing that I overlooked when answering your SF question is that your read_records method ignores the provided offset_range_tracker. That seems that could be the root of the issues - the FileBasedSource is based in splittable DoFn [1], where your logic must cooperate with the

Re: Flink JobManager getting issue while running a Beam pipeline

2022-01-18 Thread Jan Lukavský
Hi Sujay, can you please provide more information about how you run the job? FlinkRunner is definitely compiled against Flink 1.12.2, but that should not be an issue. FlinkRunner contains flink-runtime, you should be fine simply excluding it and replacing with the version 1.12.7 (maybe better

Re: Beam State with the Flink Runner when things go wrong

2022-02-04 Thread Jan Lukavský
+dev Hi Cristian, the savepointPath should not be ignored. We need to verify if local environment supports savepoints (I suppose it does) and in that case we should use it. In the case it does not we should throw exception as silent ignoring of the savepoint is m

Re: Beam on Flink not processing input splits in parallel

2022-03-09 Thread Jan Lukavský
Hi Janek, I think you hit a deficiency in the FlinkRunner's SDF implementation. AFAIK the runner is unable to do dynamic splitting, which is what you are probably looking for. What you describe essentially works in the model, but FlinkRunner does not implement the complete contract to make us

Re: Beam on Flink not processing input splits in parallel

2022-03-09 Thread Jan Lukavský
l each of these inputs process its files on one worker, but even the inputs are fused together. So even if you resolved the glob locally and then added one input for each individual file, all of that would still run sequentially. On 09/03/2022 17:14, Jan Lukavský wrote: Hi Janek, I thi

Re: session window question

2022-04-27 Thread Jan Lukavský
Hi Sigalit, if I understand correctly, what you want is a deduplication of outputs coming from your GBK logic, is that correct? If do, you can use a stateful DoFn and a ValueState holding the last value seen per key in global window. There is an implementation of this approach in Deduplicate.

Re: Beam slowness compared to flink-native

2022-05-12 Thread Jan Lukavský
Hi Ifat, can you try adding 'use_deprecated_read' experiment to the PipelineOptions? IIRC the default expansion for KafkaIO uses splittable DoFn now, which could be the cause for the performance difference you see.  You can add the option on command line using "--experiments=use_deprecated_re

Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Jan Lukavský
Hi, I think this feature is valid. Every runner for which Beam is not a 'native' SDK uses some form of translation context, which maps PCollection to internal representation of the particular SDK of the runner (RDD in this case). It should be possible to "import" an RDD into the specific runne

Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Jan Lukavský
+dev@beam <mailto:d...@beam.apache.org> On 5/24/22 11:40, Jan Lukavský wrote: Hi, I think this feature is valid. Every runner for which Beam is not a 'native' SDK uses some form of translation context, which maps PCollection to internal representation of the particular SD

Re: RDD (Spark dataframe) into a PCollection?

2022-05-24 Thread Jan Lukavský
timestamp associated with every element, so the importRDD function probably needs a way to specify the timestamp (could be an attribute name for dataframes or a timestamp extraction function for regular RDDs). On Tue, May 24, 2022 at 2:40 AM Jan Lukavský wrote: Hi, I think this featur

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-06-01 Thread Jan Lukavský
Hi Gorjan, +user@beam The trace you posted is just waiting for a bundle to finish in the SDK harness. I would suspect there is a problem in the logs of the harness. Did you look for possible errors there?  Jan On 5/31/22 13:54, Gorjan Todorovski wrote: Hi, I

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-06-02 Thread Jan Lukavský
Starting worker with command ['/opt/apache/beam/boot', '--id=3-3', '--logging_endpoint=localhost:38683', '--artifact_endpoint=localhost:44867', '--provision_endpoint=localhost:34833', '--control_endpoint=localhost:44351'] Startin

Re: Flink task stuck - MapPartition WAITING on java.util.concurrent.CompletableFuture$Signaller

2022-06-07 Thread Jan Lukavský
Jun 2, 2022 at 4:45 PM Jan Lukavský wrote: -user@flink <http://u...@flink.apache.org> as this looks like purely beam issue Could you please elaborate more about what "stuck" means? Does the watermark stop progressing? Does that happen at any specific instant (

Re: sink triggers

2022-07-25 Thread Jan Lukavský
Hi Sigalit, there might be several options, which one is the best would depend on the actual use-case. You might:  a) use the CoGroupByKey transform with a fixed window [1], or  b) use stateful processing [2] with timer triggering your output Which one is the best depends on if you can use w

Re: UNIMPLEMENTED method: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker

2022-09-20 Thread Jan Lukavský
Hi Lydian, there are two parts involved.  a) expansion service (which you run on port 8097) - this service expands the ReadFromKafka which is Java transform  b) Java SDK environment, which is not the expansion service, it must be some environment that is able to run the Java ReadFromKafka t

Re: UNIMPLEMENTED method: org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker

2022-09-21 Thread Jan Lukavský
incerely, Lydian Lee On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský wrote: Hi Lydian, there are two parts involved.  a) expansion service (which you run on port 8097) - this service expands the ReadFromKafka which is Java transform  b) Java SDK environment, which is not the

Re: Join streams with different frequencies

2023-01-04 Thread Jan Lukavský
Hi, the general pattern here would be to map both the PCollections to a common type, e.g. PCollection> and then flatten them into one PCollection, onto which you apply a stateful DoFn, see [1]. You would hold the DataY value of your ID in the state and match it against events coming from Data

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-20 Thread Jan Lukavský
Hi, this topic was discussed many years ago and the conclusion there was that setting the parallelism of individual operators via FlinkPipelineOptions (or ResourceHints) is be possible, but would be somewhat cumbersome. Although I understand that it "feels" weird to have high parallelism for

Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-20 Thread Jan Lukavský
it, setting a single parallelism for the whole pipeline is not ideal because it limits the scalability of your pipeline significantly. Ning. On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský wrote: Hi, this topic was discussed many years ago and the conclusion there was that setting the paral

Re: [Question] - Time series - cumulative sum in right order with python api in a batch process

2023-04-25 Thread Jan Lukavský
Hi, there is (rather old and long) discussion of this for Java SDK in [1]. This discussion resulted in adding @RequiresTimeSortedInput annotation [2]. Unfortunately this probably has not been transferred to Python SDK. I'll sum up reasons why it was added:  a) inputs to stateful DoFn are nat

Re: [Exception] Output timestamps must be no earlier than the timestamp of the current input or timer.

2023-05-29 Thread Jan Lukavský
Hi Mario, PubSub allows to store record metadata into "attributes". You can write the timestamp into an attribute of your choice and then pass the name of this attribute into the Read transform [1]. This will cause PubSubIO to assign timestamps based on the real timestamp, when the event occur

Re: Is Flink >1.14 really supported by the runners?

2023-06-13 Thread Jan Lukavský
Hi Edgar, the website seems to be mistakenly not updated when the support for Flink versions was added. This should be fixed [1], the runner is stable on versions up to 1.16. Regarding Flink Operator, I'm not 100% familiar with it, but given that Beam Pipeline is translated into standard Fli

Re: Is Flink >1.14 really supported by the runners?

2023-06-13 Thread Jan Lukavský
Probably better for dev@ <mailto:d...@beam.apache.org> (added).  Jan On 6/13/23 12:43, Edgar H wrote: Got you, thanks! Are there any plans on supporting 1.17 anytime soon too? El mar, 13 jun 2023, 12:27, Jan Lukavský escribió: Hi Edgar, the website seems to be mistaken

Re: [Question] check if pipeline is still running in pipeline runner

2023-07-07 Thread Jan Lukavský
Hi, if I understand correctly, you have a 'program runner' (sometimes called a driver), which is supposed to be long-running and watching if the submitted Pipeline runs or not. If not, then the driver resubmits the job. If my understanding is correct, I would suggest looking into the reasons

Re: [Question] check if pipeline is still running in pipeline runner

2023-07-10 Thread Jan Lukavský
won't know the job id so we can't use that to query the REST API. Like I said, we were looking into that method that initializes an AppName, but that was written in Java. What do you think we should do? Thank you so much for your help! Best, Adlae D'Orazio On Fri, Jul 7, 2023

Re: Count Incoming Unbounded records using Apache Beam java

2023-07-14 Thread Jan Lukavský
Hi, thanks for your interest in Apache Beam. I answered your question, see [1] Best,  Jan [1] https://stackoverflow.com/questions/76681800/count-incoming-unbouned-messages-from-pubsub-using-apache-beam-java/76685799#76685799 On 7/13/23 19:17, Phani Geeth wrote: Hi, I am trying to count the

Re: Seeking Assistance to Resolve Issues/bug with Flink Runner on Kubernetes

2023-08-15 Thread Jan Lukavský
Hi Kapil, if you don't have a special reason for running the jobserver manually, you can let Beam Python SDK to run it for you (and let it configure accordingly). You just need to pass `--runner=flink` (and --flink_master) to your flink_options (or via command-line). As Sam suggested, it woul

Re: "Decorator" pattern for PTramsforms

2023-09-18 Thread Jan Lukavský
Do we have a defined way for a PTransform to create bounded PCollection from an unbounded one (a typical example would be LIMIT acting on unbounded input)? AFAIK, we can use SDF to manipulate watermark, but that requires terminating the Pipeline even though there are still upstream running tran

Re: "Decorator" pattern for PTramsforms

2023-09-19 Thread Jan Lukavský
ime. Though, in the same time we had a discussion that it should not be used anymore and considered as obsolete transform. On 18 Sep 2023, at 09:28, Jan Lukavský wrote: Do we have a defined way for a PTransform to create bounded PCollection from an unbounded one (a typical example would be L

Re: simplest way to do exponential moving average?

2023-10-02 Thread Jan Lukavský
Hi, this depends on how exactly you plan to calculate the average. The original definition is based on exponentially decreasing weight of more distant (older if time is on the x-axis) data points. This (technically) means that this average at any point X1 depends on all values X0 <= X1. This

[DISCUSS] Drop Euphoria extension

2023-10-13 Thread Jan Lukavský
Hi, it has been some time since Euphoria extension [1] has been adopted by Beam as a possible "Java 8 API". Beam has evolved from that time a lot, the current API seems actually more elegant than the original Euphoria's and last but not least, it has no maintainers and no known users. If ther

Re: [DISCUSS] Drop Euphoria extension

2023-10-16 Thread Jan Lukavský
Jan On 10/16/23 15:10, Alexey Romanenko wrote: Can we just deprecate it for a while and then remove completely? — Alexey On 13 Oct 2023, at 18:59, Jan Lukavský wrote: Hi, it has been some time since Euphoria extension [1] has been adopted by Beam as a possible "Java 8 API". Beam

Re: Pipeline Stalls at GroupByKey Step

2023-11-20 Thread Jan Lukavský
Hi Sigalit, you should set TimestampPolicyFactory [1] to the source, because otherwise resetting the timestamp in a plain ParDo (ConvertFromKafkaRecord) can cause the element's timestamp to shift back in time before watermark and subsequently cause the data to get dropped by the GroupByKey tr

Re: Can apache beam be used for control flow (ETL workflow)

2023-12-14 Thread Jan Lukavský
Hi, can you give an example of what you mean for better understanding? Do you mean using Beam as a scheduler of other ETL workflows?  Jan On 12/14/23 13:17, data_nerd_666 wrote: Hi all, I am new to apache beam, and am very excited to find beam in apache community. I see lots of use cases o

  1   2   >