Re: Is there a way to decide what RDDs get cached in the Spark Runner?

2019-05-15 Thread Robert Bradshaw
Just to clarify, do you need direct control over what to cache, or would it be OK to let Spark decide the minimal set of RDDs to cache as long as we didn't cache all intermediates? From: Augusto Ribeiro Date: Wed, May 15, 2019 at 8:37 AM To: > Hi Kyle, > > Thanks for the help. It seems like I h

Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
Unfortunately the "write" portion of the reshuffle cannot be parallelized more than the source that it's reading from. In my experience, generally the read is the bottleneck in this case, but it's possible (e.g. if the input compresses extremely well) that it is the write that is slow (which you se

Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw wrote: >> >> Unfortunately the "write" portion of the reshuffle cannot be >> parallelized more than the source that it's reading from. In my >> experience, generally the read is the bottleneck in this case, but

Re: Problem with gzip

2019-05-15 Thread Robert Bradshaw
On Wed, May 15, 2019 at 8:43 PM Allie Chen wrote: > Thanks all for your reply. I will try each of them and see how it goes. > > The experiment I am working now is similar to > https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform, > which tries to get early results

Re: Question about --environment_type argument

2019-05-27 Thread Robert Bradshaw
On Mon, May 27, 2019 at 2:24 PM 青雉(祁明良) wrote: > > Just now I try to use the PROCESS environment type, the Flink taskmanager > complains about "/tmp/beam-artifact-staging/job_xxx" not found. And I found > this directory is only created on the machine with beam job endpoint. I guess > maybe I sh

Re: Question about --environment_type argument

2019-05-28 Thread Robert Bradshaw
or.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > On 27 May 2019, at 9:

Re: Question about --environment_type argument

2019-05-28 Thread Robert Bradshaw
; at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolE

Re: Question about --environment_type argument

2019-05-28 Thread Robert Bradshaw
va.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > On 28 May 2019, at 5:01 PM, Robert Bradshaw wrote: > > IIRC, the default artifact directory is local, not HDFS, which would > of course not be readable

Re: Question about --environment_type argument

2019-05-28 Thread Robert Bradshaw
hadoop dependency > is missing. > Is there a simple way to built all depenpencies into > beam-runners-flink_2.11-job-server distribution? The easiest would probably be to create a project that depends on both the job server and the hadoop filesystem and then build that as a fat jar. >

Re: Question about --environment_type argument

2019-05-29 Thread Robert Bradshaw
naryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171) >>> > > at >>> > > >>> > org.apache.beam.vendor.grpc.v1p13p1.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >>> > > at >>> > >

Re: Question about --environment_type argument

2019-05-29 Thread Robert Bradshaw
uot;/usr/local/lib/python2.7/dist-packages/apache_beam/io/filebasedsource.py", > line 193, in estimate_size > match_result = FileSystems.match([pattern])[0] > File > "/usr/local/lib/python2.7/dist-packages/apache_beam/io/filesystems.py", > line 186, in match >

Re: Writing and serializing a custom WindowFn

2019-06-03 Thread Robert Bradshaw
Hi Chad! As you've discovered, fully custom merging window fns are not yet supported portably, though this is on our todo list. https://issues.apache.org/jira/browse/BEAM-5601 This involves calling back into the SDK to perform the actually merging logic (and also, for full generality, being able

Re: Windows were processed out of order

2019-06-03 Thread Robert Bradshaw
The elements of a PCollection are unordered. This includes the results of a GBK--there is no promise that the output be processed in any (in particular, windows ordered by timestamp) order. DoFns, especially one with side effects, should be written with this in mind. (There is actually ongoing dis

Re: Windows were processed out of order

2019-06-03 Thread Robert Bradshaw
doing a replay of past data (we rewind our > kafka consumer groups), in 1h of processing time, there might be multiple 1h > windows for a given GBK hence theses windows are fired on any arbitrary order? > > Thanks for the insight! > JC > > On Mon, Jun 3, 2019 at 1:50 PM Robert

Re: How to convert python dictionary / JSON to a pcollection

2019-06-05 Thread Robert Bradshaw
I assume you have the dictionary in memory? If so, you can do pcoll = p | beam.Create(my_dict.items()) On Wed, Jun 5, 2019 at 8:10 AM Anjana Pydi wrote: > > Hi, > > Can some one please let me know how to convert a dictionary / JSON to a > pcollection ? > > Thanks, > Anjana > ---

Re: [ANNOUNCE] Spark portable runner (batch) now available for Java, Python, Go

2019-06-17 Thread Robert Bradshaw
Excellent work, very excited to see this! On Fri, Jun 14, 2019 at 11:02 PM Kyle Weaver wrote: > Hello Beamers, > > I'm happy to announce that the portable Spark runner is now mostly > feature-complete [0] for BATCH processing (STREAMING is not yet available). > This means you can run your new or

Re: Can we do pardo inside a pardo?

2019-06-17 Thread Robert Bradshaw
You want Beam.FlataMap(get_response) here so that the output PCollection contains all the elements of all the lists returned by get_response rather than the lists themselves. On Tue, Jun 18, 2019, 3:27 AM Chamikara Jayalath wrote: > > Probably you should be returning "response" from the "get_res

Re: [Python] Accessing the DAG object from my apache beam pipeline?

2019-06-24 Thread Robert Bradshaw
Probably the easiest thing to do here would be to implement your own runner that wraps another runner, and overrides https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/runner.py#L154 to do whatever inspection you want of the graph before passing it on. (You would also want t

Re: gRPC method to get a pipeline definition?

2019-06-26 Thread Robert Bradshaw
Yes, offering a way to get a pipeline from the job service directly would be a completely reasonable thing to do (and likely not hard at all). We welcome pull requests. Alternative UIs built on top of this abstraction would be an interesting project to explore. On Wed, Jun 26, 2019 at 8:44 AM Cha

Re: [python] ReadFromPubSub broken in Flink

2019-07-15 Thread Robert Bradshaw
On Mon, Jul 15, 2019 at 5:42 AM Chamikara Jayalath wrote: > > On Sat, Jul 13, 2019 at 7:41 PM Chad Dombrova wrote: >> >> Hi Chamikara, why not make this part of the pipeline options? does it really need to vary from transform to transform? >>> >>> It's possible for the same pipel

Re: Portability framework: multiple environments in one pipeline

2019-07-22 Thread Robert Bradshaw
Yes, for sure. Support for this is available in some runners (like the Python Universal Local Runner and Flink) and actively being added to others (e.g. Dataflow). There are still some rough edges however--one currently must run an expansion service to define a pipeline step in an alternative envir

Re: Any way to profile speed for each transforms ?

2019-07-24 Thread Robert Bradshaw
Beam tracks the amount of time spent in each transform in profile counters. There is ongoing work to expose these in a uniform way for all runners (e.g. in Dataflow they're displayed on the UI), but for the direct runner you can see an example at https://github.com/apache/beam/blob/release-2.14.0/s

Re: Any way to profile speed for each transforms ?

2019-07-24 Thread Robert Bradshaw
Also, take note that these counters will only be available if Beam has been compiled with Cython ( e.g. installed from a wheel). Of course if you care about performance you'd want that anyway. On Wed, Jul 24, 2019, 5:15 PM Robert Bradshaw wrote: > Beam tracks the amount of time spent

Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Robert Bradshaw
Though it's not obvious in the name, Stateful ParDos can only be applied to keyed PCollections, similar to GroupByKey. (You could, however, assign every element to the same key and then apply a Stateful DoFn, though in that case all elements would get processed on the same worker.) On Thu, Jul 25,

Re: Stateful ParDo on Non-Keyed PCollection

2019-07-25 Thread Robert Bradshaw
like System.identityHashCode(this) in the body of a DoFn might be sufficient. > On Thu, Jul 25, 2019 at 9:54 PM Robert Bradshaw wrote: >> >> Though it's not obvious in the name, Stateful ParDos can only be >> applied to keyed PCollections, similar to GroupByKey. (You c

Re: [ANNOUNCE] Beam 2.14.0 Released!

2019-08-02 Thread Robert Bradshaw
Lots of improvements all around. Thank you for pushing this through, Anton! On Fri, Aug 2, 2019 at 1:37 AM Chad Dombrova wrote: > > Nice work all round! I love the release blog format with the highlights and > links to issues. > > -chad > > > On Thu, Aug 1, 2019 at 4:23 PM Anton Kedin wrote: >

Re: Try to understand "Output timestamps must be no earlier than the timestamp of the current input"

2019-08-20 Thread Robert Bradshaw
The original timestamps are probably being assigned in the watchForNewFiles transform, which is also setting the watermark: https://github.com/apache/beam/blob/release-2.15.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L668 Until https://issues.apache.org/jira/browse/BEAM-644

Re: Beam/flink/kubernetes/minikube/wordcount example

2019-09-11 Thread Robert Bradshaw
Is your input on a file system accessible to the workers? (Including, from within Docker, if the workers are running in docker.) On Wed, Sep 11, 2019 at 12:03 PM Matthew Patterson wrote: > > Hi Beamers, > > > > I am running the `wordcount` example, but following example from > https://beam.apach

Re: How do you write portable runner pipeline on separate python code ?

2019-09-12 Thread Robert Bradshaw
On this note, making local files easy to read is something we'd definitely like to improve, as the current behavior is quite surprising. This could be useful not just for running with docker and the portable runner locally, but more generally when running on a distributed system (e.g. a Flink/Spark

Re: How do you write portable runner pipeline on separate python code ?

2019-09-13 Thread Robert Bradshaw
ome other >>>>> non-docker environment, as Docker adds some operational complexity that >>>>> isn't really needed to run a word count example. For example, Yu's >>>>> pipeline >>>>> errored here because the expected Docker container

Re: Looping in Dataflow(Creating multiple jobs for a while loop)

2019-09-16 Thread Robert Bradshaw
You can put your pipeline definition inside your while loop, i.e. while True: to_date = time.time() p = beam.Pipeline(options=PipelineOptions()) (p | 'create surveys' >> beam.Create(id_list) | 'get data' >> beam.FlatMap(lambda id: get_api_data(id, from_date, to_date))

Re: The state of external transforms in Beam

2019-09-16 Thread Robert Bradshaw
Thanks for bringing this up again. My thoughts on the open questions below. On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova wrote: > That commit solves 2 problems: > > Adds the pubsub Java deps so that they’re available in our portable pipeline > Makes the coder for the PubsubIO message-holder typ

Re: Looping in Dataflow(Creating multiple jobs for a while loop)

2019-09-16 Thread Robert Bradshaw
An external scheduler would also create a new job every time. The only way I see to continuously process results in a single job is to have a streaming job. On Mon, Sep 16, 2019 at 12:22 PM Anjana Pydi wrote: > > Hi Juan, > > Thanks for the reply ! I want to know if there is any way in dataflow t

Re: Running Python Wordcount issues

2019-09-16 Thread Robert Bradshaw
What errors did you get with the release-2.16.0 branch? On Mon, Sep 16, 2019 at 4:43 PM Benjamin Tan wrote: > > I can confirm. I used the master branch to get this to work. I tried the > "release-2.16.0" branch but that didn't work either. > > On 2019/09/16 23:30:54, Tom Barber wrote: > > Hello

Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-17 Thread Robert Bradshaw
On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise wrote: > > +1 for making --experiments=beam_fn_api default. > > Can the Dataflow runner driver just remove the setting if it is not > compatible? The tricky bit would be undoing the differences in graph construction due to this flag flip. But I would

Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-19 Thread Robert Bradshaw
ableRunner. > > > > > > > > Kyle Weaver | Software Engineer | github.com/ibzib > > <http://github.com/ibzib> > > > <http://github.com/ibzib> > > > > <http://github.com/ibzib> | kcwea...@g

Re: Publish message to pubsub topic after processing current input in beam streaming pipeline

2019-09-19 Thread Robert Bradshaw
On Thu, Sep 19, 2019 at 11:05 AM Anjana Pydi wrote: > > Hi , > > I have a beam streaming pipeline which reads data from pubsub topic, use it > to call an API and get responses, apply some transformations on the obtained > responses and writes to output sinks. > > Now, I need to add logic to writ

Re: Publish message to pubsub topic after processing current input in beam streaming pipeline

2019-09-19 Thread Robert Bradshaw
message for > every element of the list. Is there any way that I can do it only once at the > end? > > beam.Map(lambda output: send_to_output(output)) | 'process completed' >> > beam.io.WriteStringsToPubSub(topic=known_args.output_topic) > > Regards, > Anjana

Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-19 Thread Robert Bradshaw
unner adds the Impulse override. That > way also the Python SDK would not have to have separate code paths for > Reads. Or, rather, that the Runner adds the non-Impuls override (in Java and Python). > On 19.09.19 11:46, Robert Bradshaw wrote: > > On Thu, Sep 19, 2019 at 11:22 AM Maxi

Re: How to reference manifest from apache flink worker node ?

2019-09-23 Thread Robert Bradshaw
You need to set your artifact directory to point to a distributed filesystem that's also accessible to the workers (when starting up the job server). On Mon, Sep 23, 2019 at 5:43 AM Yu Watanabe wrote: > Hello. > > I am working on flink runner (2.15.0) and would like to ask question about > how t

Re: How to reference manifest from apache flink worker node ?

2019-09-26 Thread Robert Bradshaw
2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L203 >>> artifacts_dir = self.local_temp_dir(prefix='artifacts') >>> >>> => >>> >>> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd6

Re: Python Portable Runner Issues

2019-10-01 Thread Robert Bradshaw
Different runners have different characteristics, pros, and cons, which is part of the value proposition for Beam. We have some comparisons at https://beam.apache.org/documentation/runners/capability-matrix/ but these were put together a while ago and don't really take into account the state of thi

Re: Joining PCollections to aggregates of themselves

2019-10-10 Thread Robert Bradshaw
Looking at the naive solution PCollectionView agg = input .apply(Windows.sliding(10mins, 1sec hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1 .apply(Mean.globally()) .apply(View.asSingleton()); PCollection output = input .apply(ParDo.of(new Joiner().withSi

Re: ETL with Beam?

2019-10-11 Thread Robert Bradshaw
I would like to call out that Beam itself can be directly used for ETL, no extra framework required (not to say that both of these frameworks don't provide additional value, e.g. GUI-style construction of pipelines). On Fri, Oct 11, 2019 at 9:29 AM Ryan Skraba wrote: > > Hello! Talend has a big

Re: ETL with Beam?

2019-10-11 Thread Robert Bradshaw
x27;t have to write that part, > that would be really cool! > > On Fri, Oct 11, 2019 at 1:28 PM Robert Bradshaw wrote: >> >> I would like to call out that Beam itself can be directly used for >> ETL, no extra framework required (not to say that both of these >>

Re: Joining PCollections to aggregates of themselves

2019-10-11 Thread Robert Bradshaw
ate with these > new elements, so the side-input having triggered at least once is not a > guarantee it is up to date. > > On 2019/10/10 18:35:21, Robert Bradshaw wrote: > > > Time: 00:08:00 > > Input: > Output: > > > > > Time: 00:13:00 > >

Re: Python vs Java SDK Performance

2019-10-30 Thread Robert Bradshaw
Python does not allow as much customization of serialization as is available in Java, in part due to often not explicitly knowing the types at each point in the pipeline (though Udi is working on making this better, and there's ongoing work for adding explicit schema support as well). Somewhat to c

Re: Python vs Java SDK Performance

2019-10-30 Thread Robert Bradshaw
uncan wrote: > > I was about to ask if cython would work with the Beam SDK. I just started > building the pipes to support cython in modules. > > On Wed, Oct 30, 2019 at 2:53 PM Robert Bradshaw wrote: >> >> Python does not allow as much customization of serialization as is &

Re: Is the following workflow suitable for apache beam

2019-12-02 Thread Robert Bradshaw
Yes, one could use Apache Beam to execute such a pipeline. The main question seems to be where the set of remote directories would come from (the input) and where you would write your results (the output). And, yes, you would have to write this to not use asyncio. There's also the question of what

Re: Sub-second Beam

2019-12-13 Thread Robert Bradshaw
In general, sub-second latencies are difficult because one must wait for the watermark to catch up before actually firing. This would require the oldest item in flight across all machines to be almost exactly the same timestamp as the newest. Furthermore most sources cannot provide sub-second water

Re: Sub-second Beam

2019-12-13 Thread Robert Bradshaw
gt; Besides DirectRunner which as I understand it is primarily for testing, what > are other in-process runners? Flink and Spark have (test) all-in-process modes too. > > > > > > > > > > > On Fri, Dec 13, 2019 at 1:45 PM Robert Bradshaw wrote: >> >

Re: proto in pubsub

2019-12-16 Thread Robert Bradshaw
This should work. An example of what you're trying to do and what errors/unexpected behavior you're getting would be helpful. On Sun, Dec 15, 2019 at 10:13 PM Austin Bennett wrote: > > Hi All, > > Struggling with reading a proto message from pubsub and writing to > BigQuery in Beam (Direct Runner

Re: proto in pubsub

2019-12-16 Thread Robert Bradshaw
hat things work. I should actually put together a working > example/demo, as extensive searching didn't yield much usable. > > > On Mon, Dec 16, 2019 at 12:06 PM Robert Bradshaw wrote: > > > > This should work. An example of what you're trying to do and what >

Re: Please assist; how do i use a Sample transform ?

2019-12-17 Thread Robert Bradshaw
beam.transforms.combiners.Sample is a container class that hails back to the days when folks more familiar with Java were just copying things over, and is just an empty class containing actual transforms (as Kyle indicates). These are shorthand for beam.CombineGlobally(beam.transforms.combiners.Sam

Re: Scio 0.8.0 released

2020-01-08 Thread Robert Bradshaw
Nice! On Wed, Jan 8, 2020 at 10:03 AM Neville Li wrote: > Hi all, > > We just released Scio 0.8.0. This is based on the most recent Beam 2.17.0 > release and includes a lot of new features & bug fixes over the past 10 > months. > > Cheers, > Neville > > https://github.com/spotify/scio/releases/t

Re: beam main file with dependencies

2020-01-16 Thread Robert Bradshaw
Yes, you'll need to bundle up these dependencies in a way that they can be shipped to the workers. See https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ On Thu, Jan 16, 2020 at 2:00 PM Marco Mistroni wrote: > > Hello all > i have written an apache beam workflow which i hav

Re: Unable to reliably have multiple cores working on a dataset with DirectRunner

2020-01-29 Thread Robert Bradshaw
gt; > before the commit: execution time is 50 seconds using the fn_api_runner in > multiprocess mode on 4 workers > after the commit: execution time is 18 seconds using the fn_api_runner in > multiprocess mode on 4 workers > > commit e0adc9a256cdcf73d172d1c6bd6153d0840d488d (HEAD,

Re: snowflake source/sink

2020-01-29 Thread Robert Bradshaw
You could use the beam.util.BatchElements transform to batch rows into larger chunks. On Wed, Jan 29, 2020 at 12:01 PM Alan Krumholz wrote: > > Thanks Brice! I'll look into wrapping the connector. > > One more question. > I'm trying now to develop a sink too. This is what I have: > > def writeSno

Re: Unable to reliably have multiple cores working on a dataset with DirectRunner

2020-01-31 Thread Robert Bradshaw
; > achieve any throughput. >>>> >>>> What do you mean by this? >>>> >>>> On Wed, Jan 29, 2020 at 1:20 PM Julien Lafaye wrote: >>>>> >>>>> I confirm the situation gets better after the commit: 4 cores used for

Re: Local error when using requirements_file for external runner.

2020-02-03 Thread Robert Bradshaw
Hmm... that error doesn't give much information. Could you perhaps try running the failing command with a '-v' (for more verbosity)? On Mon, Feb 3, 2020 at 4:43 AM Alan Krumholz wrote: > > I have a simple python pipeline that uses a publicly available (PyPI) library. > > I can run my pipeline fin

Re: seems beam.util.GroupIntoBatches is not supported in DataFlow. Any alternative?

2020-02-05 Thread Robert Bradshaw
Yes, you should use BatchElements. Stateful DoFns are not yet supported for Python Dataflow. (The difference is that GroupIntoBatches has the capability to batch across bundles, which can be important for streaming.) On Wed, Feb 5, 2020 at 7:53 AM Alan Krumholz wrote: > > OK, seems like beam.Ba

Re: Local error when using requirements_file for external runner.

2020-02-05 Thread Robert Bradshaw
. On Mon, Feb 3, 2020 at 11:13 AM Alan Krumholz wrote: > > Hi Robert, > > Please find attached the output of running it with -v > > Thank you > > On Mon, Feb 3, 2020 at 10:54 AM Robert Bradshaw wrote: >> >> Hmm... that error doesn't give much informat

Re: GCS numShards doubt

2020-02-14 Thread Robert Bradshaw
To let Dataflow choose the optimal number shards and maximize performance, it's often significantly better to simply leave it unspecified. A higher numShards only helps if you have at least that many workers. On Thu, Feb 13, 2020 at 10:24 PM vivek chaurasiya wrote: > > hi folks, I have this in co

Re: Hello Beam Community!

2020-03-13 Thread Robert Bradshaw
Welcome! On Fri, Mar 13, 2020 at 7:41 AM Ismaël Mejía wrote: > Welcome ! > > On Fri, Mar 13, 2020 at 3:00 PM Connell O'Callaghan > wrote: > >> Welcome Brittany >> >> On Fri, Mar 13, 2020 at 6:45 AM Rustam Mehmandarov >> wrote: >> >>> Welcome Brittany! >>> >>> Cheers, >>> Rustam >>> @rmehma

Re: Type hints do not work on multi-output PTransforms?

2020-03-30 Thread Robert Bradshaw
That is correct, type hints unfortunately are not yet supported for multiple-output PTransforms. On Thu, Mar 26, 2020 at 10:05 PM Joshua B. Harrison wrote: > Hello all, > > I am working on adding type hints to my pipeline, and ran into an issue > with PTransforms that produce multiple, tagged ou

Re: Type hints do not work on multi-output PTransforms?

2020-03-30 Thread Robert Bradshaw
ed as a Union of > all the component collection types. Would it make sense to do the same for > the output types? Is this a better discussion for the dev group? > +1 to taking this to the dev group. > Thanks again for your time and help. > > Best, > Joshua > > On Mon,

Re: Calling external programs in the pipeline - is it feasible?

2020-04-10 Thread Robert Bradshaw
In general, runners like to schedule more than one task per worker (to take advantage of multiple cores, etc.). The mitigation to this is likely to be runner-specific. E.g. For Dataflow the number of tasks/threads per machine is by default chosen to be the number of cores of that VM. I think Flink

Re: Stateful & Timely Call

2020-04-23 Thread Robert Bradshaw
If you're using State solely to batch elements together, I would recommend avoiding state altogether. You can instead have a DoFn that holds a List as a member variable, add to it, and possibly emit the list at the threshold, in your ProcessElement method, and also emit the batch in FinishBundle.

Re: Stateful & Timely Call

2020-04-23 Thread Robert Bradshaw
I may have misinterpreted your email, I thought you didn't have a need for keys at all. If this is actually the case, you don't need a GroupByKey, just have your DoFn take Rows as input, and emit List as output. That is, it's a DoFn>. You can buffer multiple Rows in an instance variable between pr

Re: Kafka IO: value of expansion_service

2020-04-28 Thread Robert Bradshaw
Java dependencies are not yet fully propagated over the expansion service, which might be what you're running into. I'm actually in the process of putting together a PR to fix this; I'll let you know when it's ready. On Mon, Apr 27, 2020 at 9:14 AM Kyle Weaver wrote: > I'm not sure about the org

Re: Kafka IO: value of expansion_service

2020-04-28 Thread Robert Bradshaw
https://github.com/apache/beam/pull/11557 On Tue, Apr 28, 2020 at 9:28 AM Robert Bradshaw wrote: > Java dependencies are not yet fully propagated over the expansion service, > which might be what you're running into. I'm actually in the process of > putting together a PR to

Re: adding apt-get to setup.py fails passing apt-get commands

2020-05-04 Thread Robert Bradshaw
Printing the result out shouldn't matter, but as mentioned in the doce Popen.communicate is not intended to be used when the amount of output is large. If you need to just run the process, I would recommend a simple subprocess.check_output(). On Mon, May 4, 2020 at 9:00 AM OrielResearch Eila Arich

Re: Portable Runner performance optimisation

2020-05-15 Thread Robert Bradshaw
I don't think this is being worked on, but given that Java already supports the LOOPBACK environment (which is a special case of EXTERNAL) it would just be a matter of properly parsing the flags. On Fri, May 15, 2020 at 9:52 AM Alexey Romanenko wrote: > Thanks! It looks that this is exactly what

Re: Timers exception on "Job Drain" while using stateful beam processing in global window

2020-05-18 Thread Robert Bradshaw
Glad you were able to get this working; thanks for following up. On Mon, May 18, 2020 at 10:35 AM Mohil Khare wrote: > Hi, > On another note, I think I was unnecessarily complicating things by > applying a sliding window here and then an extra global window to remove > duplicates. I replaced th

Re: PaneInfo showing UNKOWN State

2020-05-26 Thread Robert Bradshaw
To clarify, PaneInfo is supported on the FnAPI local runner, but not on the bundle based one. Unfortunately, Streaming is not supported on the FnAPI one (yet), but work there is ongoing. On Tue, May 26, 2020 at 11:46 AM Pablo Estrada wrote: > Hi Jayadeep, > Unfortunately, it seems that PaneInfo

Re: Understanding combiner's distribution logic

2020-07-01 Thread Robert Bradshaw
On Tue, Jun 30, 2020 at 3:32 PM Julien Phalip wrote: > Thanks Luke! > > One part I'm still a bit unclear about is how exactly the PreCombine stage > works. In particular, I'm wondering how it can perform the combination > before the GBK. Is it because it can already compute the combination on > a

Re: Not able to see WordCount output in docker /tmp/...

2020-07-07 Thread Robert Bradshaw
Does it work when you write to a distributed filesystem? (One issue with Docker is that the manager and each of their workers have their own local filesystem.) On Tue, Jul 7, 2020 at 2:17 PM Avijit Saha wrote: > > While trying to run the Beam WordCount example on Flink runner using Job > Manage

Re: Testing Apache Beam pipelines / python SDK

2020-07-13 Thread Robert Bradshaw
You can use apache_beam.testing.util.assert_that to write tests of Beam pipelines. This is what Beam uses for its tests, e.g. https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util_test.py#L80 On Mon, Jul 13, 2020 at 2:36 PM Sofia’s World wrote: > > Hi all > i was wo

Re: Testing Apache Beam pipelines / python SDK

2020-07-17 Thread Robert Bradshaw
test_pipeline.get_full_options_as_args(**extra_opts)) > > print(result) > > Basically, i would expect a PCollection as result of the pipeline, and i > would be testing the content of the PCollection > > Running this results in this messsage > > IT is skipped b

Re: Testing Apache Beam pipelines / python SDK

2020-07-21 Thread Robert Bradshaw
ditionally, what is the best way to test writing to BigQuery? > I have seen this file > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py > but it appears it writes to real big query? > > kind regards > Marco > > &g

Re: ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

2020-07-22 Thread Robert Bradshaw
On Sat, Jul 18, 2020 at 12:08 PM Chamikara Jayalath wrote: > > > On Fri, Jul 17, 2020 at 10:04 PM ayush sharma <1705ay...@gmail.com> wrote: > >> Thank you guys for the reply. I am really stuck and could not proceed >> further. >> Yes, the previous trial published message had null key. >> But when

Re: Scio 0.9.3 released

2020-08-05 Thread Robert Bradshaw
Thanks for the update! On Wed, Aug 5, 2020 at 11:46 AM Neville Li wrote: > > Hi all, > > We just released Scio 0.9.3. This bumps Beam SDK to 2.23.0 and includes a lot > of improvements & bug fixes. > > Cheers, > Neville > > https://github.com/spotify/scio/releases/tag/v0.9.3 > > "Petrificus Tota

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

2020-08-17 Thread Robert Bradshaw
+1 Thanks, Eugene, for finding and fixing this! FWIW, most use of Python from the Python Portable Runner used the embedded environment (this is the default direct runner), so dependencies are already present. On Mon, Aug 17, 2020 at 3:19 PM Daniel Oliveira wrote: > > Normally I'd say not to che

Re: Staged PIP package mysteriously ungzipped, non-installable inside the worker

2020-08-17 Thread Robert Bradshaw
I checked Java, it looks like the way things are structured we do not have that bug there. On Mon, Aug 17, 2020 at 3:31 PM Robert Bradshaw wrote: > > +1 > > Thanks, Eugene, for finding and fixing this! > > FWIW, most use of Python from the Python Portable Runner used the >

Re: Resource Consumption increase With TupleTag

2020-08-19 Thread Robert Bradshaw
Is this 2kps coming out of Filter1 + 2kps coming out of Filter2 (which would be 4kps total), or only 2kps coming out of KafkaIO and MessageExtractor? Though it /shouldn't/ matter, due to sibling fusion, there's a chance things are getting fused poorly and you could write Filter1 and Filter2 instea

Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-24 Thread Robert Bradshaw
As for the question of writing tests in the face of non-determinism, you should look into TestStream. MyStatefulDoFn still needs to be updated to not assume an ordering. (This can be done by setting timers that provide guarantees that (modulo late data) one has seen all data up to a certain timesta

Re: Info needed - pmc mailing list

2020-08-25 Thread Robert Bradshaw
Try priv...@beam.apache.org. On Tue, Aug 25, 2020 at 6:18 AM D, Anup (Nokia - IN/Bangalore) wrote: > > Hi, > > > > We would like to know if there is a way to reach out to members of the pmc > group. > > We tried sending email to p...@beam.apache.org but it got bounced. > > > > Thanks > > Anup

Re: [DISCUSS] Deprecation of AWS SDK v2 IO connectors

2020-09-15 Thread Robert Bradshaw
> should > deprecate a v1 IO ONLY when we have full feature parity in the v2 version. > I think we don't have a replacement for AWSv1 S3 IO so that one should not > be > deprecated. > > On Tue, Sep 15, 2020 at 6:07 PM Robert Bradshaw > wrote: > > > > The

Re: Upload third party runtime dependencies for expanding transform like KafkaIO.Read in Python Portable Runner

2020-10-02 Thread Robert Bradshaw
:177) > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) > > On Fri, Oct 2, 2020 at 5:14 PM Robert Bradshaw wrote: >> >> Could you clarify a bit exactly what you're trying to

Re: Support for Flink 1.11

2020-10-16 Thread Robert Bradshaw
Support for Flink 1.11 is https://issues.apache.org/jira/browse/BEAM-10612 . It has been implemented and will be included in the next release (Beam 2.25). In the meantime, you could try building yourself from head. On Fri, Oct 16, 2020 at 4:39 AM Kishor Joshi wrote: > > Hi Team, > > Since the bea

Re: is apache beam go sdk supported by spark runner?

2020-11-25 Thread Robert Bradshaw
Yes, it should be for batch (just like for Python). There is ongoing work to make it work for Streaming as well. On Sat, Nov 21, 2020 at 2:57 PM Meriem Sara wrote: > > Hello everyone. I am trying to use apache beam with Golang to execute a data > processing workflow using apache Spark. However,

Re: Help measuring upcoming performance increase in flink runner on production systems

2020-12-21 Thread Robert Bradshaw
I agree. Borrowing the mutation detection from the direct runner as an intermediate point sounds like a good idea. On Mon, Dec 21, 2020 at 8:57 AM Kenneth Knowles wrote: > I really think we should make a plan to make this the default. If you test > with the DirectRunner it will do mutation check

Re: Is there an array explode function/transform?

2021-01-13 Thread Robert Bradshaw
Ah, thanks for the clarification. UNNEST does sound like what you want here, and would likely make sense as a top-level relational transform as well as being supported by SQL. On Wed, Jan 13, 2021 at 10:53 AM Tao Li wrote: > @Kyle Weaver sure thing! So the input/output > definition for the Flat

Re: Is there an array explode function/transform?

2021-01-14 Thread Robert Bradshaw
ly could be a top-level transform. Should it automatically >>>>> unnest all arrays, or just the fields specified? >>>>> >>>>> We do have to define the semantics for nested arrays as well. >>>>> >>>>> On Wed, Jan 13, 2021 at 1

Re: Is there an array explode function/transform?

2021-01-14 Thread Robert Bradshaw
t of all the different array > elements? > > On Thu, Jan 14, 2021 at 11:25 AM Robert Bradshaw > wrote: > >> I think it makes sense to allow specifying more than one, if desired. >> This is equivalent to just stacking multiple Unnests. (Possibly one could >> even hav

Re: Overwrite support from ParquetIO

2021-01-27 Thread Robert Bradshaw
Fortunately making deleting files idempotent is much easier than writing them :). But one needs to handle the case of concurrent execution as well as sequential re-execution due to possible zombie workers. On Wed, Jan 27, 2021 at 5:04 PM Reuven Lax wrote: > Keep in mind thatt DoFns might be reex

Re: [DISCUSS] Drop support for Flink 1.8 and 1.9

2021-03-12 Thread Robert Bradshaw
Do we now support 1.8 through 1.12? Unless there are specific objections, makes sense to me. On Fri, Mar 12, 2021 at 8:29 AM Alexey Romanenko wrote: > +1 too but are there any potential objections for this? > > On 12 Mar 2021, at 11:21, David Morávek wrote: > > +1 > > D. > > On Thu, Mar 11, 20

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Robert Bradshaw
Returning PDone is an anti-pattern that should be avoided, but changing it now would be backwards incompatible. PRs to add non-PDone returning variants (probably as another option to the builders) that compose well with Wait, etc. would be welcome. On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Robert Bradshaw
nd just add new > funtionality. Though, we need to follow the same pattern for user API and > maybe even naming for this feature across different IOs (like we have for > "readAll()” methods). > > > > I agree that we have to avoid returning PDone for such cases. > >

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Robert Bradshaw
>> Am I reading this wrong? >> >> Kenn >> >> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato wrote: >> >>> How about a PCollection containing every element which was successfully >>> written? >>> Basically the same things which were passed into it

  1   2   3   >