Re: Histogram metrics in Dataflow/Beam

2022-04-04 Thread Jeff Klukas
Siyu - The Beam metrics interface includes the Distribution metric type which can be used for histograms: https://beam.apache.org/documentation/programming-guide/#types-of-metrics Particulars of support depend on the runner. For Cloud Dataflow, the reported values are MAX, MIN, MEAN, and COUNT, s

Re: How to flush window when draining a Dataflow pipeline?

2021-05-21 Thread Jeff Klukas
at? > On Fri, May 21, 2021 at 5:10 AM Jeff Klukas wrote: > >> Beam users, >> >> We're attempting to write a Java pipeline that uses Count.perKey() to >> collect event counts, and then flush those to an HTTP API every ten minutes >> based on proc

How to flush window when draining a Dataflow pipeline?

2021-05-21 Thread Jeff Klukas
Beam users, We're attempting to write a Java pipeline that uses Count.perKey() to collect event counts, and then flush those to an HTTP API every ten minutes based on processing time. We've tried expressing this using GlobalWindows with an AfterProcessingTime trigger, but we find that when we dra

Re: DataflowRunner and PubSub Acknowledge

2020-10-05 Thread Jeff Klukas
Hi Thiago, Note that Dataflow has a custom implementation of PubSub interaction, so the code you see in PubsubIO in the Beam codebase does not necessarily reflect Pubsub handling in Dataflow. Dataflow acks messages as soon as they are first checkpointed, so the first step in your pipeline that in

Re: WriteToBigQuery - performance issues?

2020-07-14 Thread Jeff Klukas
not sure if these errors are problematic) > > I’ll definitely give the SSD option a shot. > > Thanks. > > -- > Mark Kelly > Sent with Airmail > > On 14 July 2020 at 15:12:46, Jeff Klukas (jklu...@mozilla.com) wrote: > > In my experience with writing to BQ via Bi

Re: WriteToBigQuery - performance issues?

2020-07-14 Thread Jeff Klukas
In my experience with writing to BQ via BigQueryIO in the Java SDK, the bottleneck tends to be disk I/O. The BigQueryIO logic requires several shuffles that cause checkpointing even in the case of streaming inserts, which in the Dataflow case means writing to disk. I assume the Python logic is simi

Re: TableRow class is not the same after serialization

2020-07-09 Thread Jeff Klukas
ttp-java-client/json.html >> >> >> >> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich >> wrote: >> >>> Thanks for explaining. Is it documented somewhere that TableRow contains >>> Map? >>> I don't construct it, I fetch from Google

Re: TableRow class is not the same after serialization

2020-07-09 Thread Jeff Klukas
t, I fetch from Google Analytics export to BigQuery > table. > > On Thu, 9 Jul 2020 at 16:40, Jeff Klukas wrote: > >> I would expect the following line to fail: >> >> List h = ((List) bigQueryRow.get("hits")); >> >> The top-level bigQuery

Re: TableRow class is not the same after serialization

2020-07-09 Thread Jeff Klukas
dRunnable.run(ThreadFactoryImpl.java:55) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap > cannot be cast to class com.google.api.services.bigquery.model.TableRow > (java.util.LinkedHashMap is in module java.base of loader 'bootst

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
ssions", > ParDo.of(new CreateSessionMetrics(boolean and > string params))) >// few more transformations > > } > > This is basically similar to examples you can find here > https://beam.apache.org/documentation/io/built-in/goo

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich wrote: > So from what I understand, it works like this by design and it's not > possible to test my code with the current coder implementation. Is that > correct? > I would argue that this test failure is indicating an area of potential failure in

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
On Wed, Jul 8, 2020 at 1:38 PM Kirill Zhdanovich wrote: > So it's correct implementation of TableRow that encode(decode(a)) != a? > A TableRow can contain fields of any map implementation. It makes sense to me that once a TableRow object is serialized and deserialized, that the coder must make a

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
t; I don't have issues running this pipeline in production. I have this > issue, only when I tried to write end to end test. > Do you know if there are existing coders for TableRow that I can use? I've > tried TableRowJsonCoder, but seems like it converts all object inside > Ta

Re: TableRow class is not the same after serialization

2020-07-08 Thread Jeff Klukas
Kirill - Can you tell us more about what Job.runJob is doing? I would not expect the Beam SDK itself to do any casting to TableRow, so is there a line in your code where you're explicitly casting to TableRow? There may be a point where you need to explicitly set the coder on a PCollection to deseri

Re: BigQuery query caching?

2020-07-02 Thread Jeff Klukas
>> Yeah - we are issuing a query rather than reading a table. Materializing >> the results myself and reading them back seems simple enough. I will give >> that a try! >> >> Thanks, >> Matt >> >> On Thu, Jul 2, 2020 at 9:42 AM Jeff Klukas wrote: >> >&

Re: BigQuery query caching?

2020-07-02 Thread Jeff Klukas
It sounds like your pipeline is issuing a query rather than reading a whole table. Are you using Java or Python? I'm only familiar with the Java SDK so my answer may be Java-biased. I would recommend materializing the query results to a table, and then configuring your pipeline to read that table

Re: Exceptions PubsubUnbounded source ACK

2020-06-01 Thread Jeff Klukas
and if a single element in the > bundle fails and we ignore the error on the single element, then the bundle > is considered still successfully processed am I correct? Then it would just > ACK everything in the bundle > > Kishore > > On Mon, Jun 1, 2020 at 10:27 AM Jeff Kluka

Re: Exceptions PubsubUnbounded source ACK

2020-06-01 Thread Jeff Klukas
Is this a Python or Java pipeline? I'm familiar with PubsubIO in Java, though I expect the behavior in Python is similar. It will ack messages at the first checkpoint step in the pipeline, so the behavior in your case depends on whether there is a GroupByKey operation happening before the exceptio

Re: Pattern for sharing a resource across all worker threads in Beam Java SDK

2020-05-01 Thread Jeff Klukas
like Guavas memoize[1] > functions and use them as they are very lightweight. > > 1: > https://guava.dev/releases/19.0/api/docs/com/google/common/base/Suppliers.html#memoize(com.google.common.base.Supplier) > > On Thu, Apr 30, 2020 at 12:45 PM Jeff Klukas wrote: > >> B

Pattern for sharing a resource across all worker threads in Beam Java SDK

2020-04-30 Thread Jeff Klukas
Beam Java users, I've run into a few cases where I want to present a single thread-safe data structure to all threads on a worker, and I end up writing a good bit of custom code each time involving a synchronized method that handles creating the resource exactly once, and then each thread has its

Re:

2020-04-23 Thread Jeff Klukas
Aniruddh - Using BigQueryIO.Read with EXPORT method involves a potentially long wait on BQ to complete the export. I have experience with running Dataflow batch jobs which use this read method to ingest ~10 TB of data in a single job. The behavior I generally see is that the job will progress thro

Re: Large public Beam projects?

2020-04-21 Thread Jeff Klukas
Mozilla hosts the code for our data ingestion system publicly on GitHub. A good chunk of that architecture consists of Beam pipelines running on Dataflow. See: https://github.com/mozilla/gcp-ingestion/tree/master/ingestion-beam and rendered usage documentation at: https://mozilla.github.io/gcp-

Re: Fanout and OOMs on Dataflow while streaming to BQ

2019-11-22 Thread Jeff Klukas
We also had throughput issues in writing to BQ in a streaming pipeline and we mitigated by provisioning a large quantity of SSD storage to improve I/O throughput to disk for checkpoints. I also Erik's suggestion to look into Streaming Engine. We are currently looking into migrating our streaming u

Re: Memory profiling on Dataflow with java

2019-11-18 Thread Jeff Klukas
gt; That's normal; I also never saw those heap dump options display in the Dataflow UI. I think Dataflow doesn't show any options that originate from "Debug" options interfaces. > On Mon, Nov 18, 2019 at 11:59 AM Jeff Klukas wrote: > >> Using default

Re: Memory profiling on Dataflow with java

2019-11-18 Thread Jeff Klukas
Using default Dataflow workers, this is the set of options I passed: --dumpHeapOnOOM --saveHeapDumpsToGcsPath=$MYBUCKET/heapdump --diskSizeGb=100 On Mon, Nov 18, 2019 at 11:57 AM Jeff Klukas wrote: > It sounds like you're generally doing the right thing. I've succes

Re: Memory profiling on Dataflow with java

2019-11-18 Thread Jeff Klukas
It sounds like you're generally doing the right thing. I've successfully used --saveHeapDumpsToGcsPath in a Java pipeline running on Dataflow and inspected the results in Eclipse MAT. I think that --saveHeapDumpsToGcsPath will automatically turn on --dumpHeapOnOOM but worth setting that explicitly

Re: Encoding Problem: Kafka - DataFlow

2019-10-31 Thread Jeff Klukas
I ran into exactly this same problem of finding some accented characters getting replaced with "?" in a pipeline only when running on Dataflow and not when using the Direct Runner. KafkaIO was not involved, but I'd bet the root cause is the same. In my case, the input turned out to be properly UTF

Re: A couple questions from someone new to Beam

2019-09-25 Thread Jeff Klukas
(1) I don't have direct experience with contacting MongoDB in Beam, but my general expectation is that yes, Beam IOs will do reasonable things for connection setup and teardown. For the case of MongoDbIO in the Java SDK, it looks like this connection setup happens in BoundedMongoDbReader [0]. In ge

Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Jeff Klukas
What you propose with a writer per bundle is definitely possible, but I expect the blocker is that in most cases the runner has control of bundle sizes and there's nothing exposed to the user to control that. I've wanted to do similar, but found average bundle sizes in my case on Dataflow to be so

Re: Setting environment and system properties on Dataflow workers

2019-08-30 Thread Jeff Klukas
edit the environment variables and you have to > resort to some hackery that is JVM version dependent[2]. > > 1: > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java > 2: https://blog.sebastian-daschner.com/entri

Setting environment and system properties on Dataflow workers

2019-08-30 Thread Jeff Klukas
I just spent the past two days debugging a character corruption issue in a Dataflow pipeline. It turned out that we had encoded a json object to a string and then called getBytes() without specifying a charset. In our testing infrastructure, this didn't cause a problem because the default charset o

Re: Cost efficient loading of Kafka high throughput event stream to Bigquery

2019-05-17 Thread Jeff Klukas
I have also wrestled with throughput for FileIO and BigQueryIO on Dataflow, and in my case the bottleneck came down to disk I/O throughput on the worker machines. Writing with FileIO or BigQueryIO involves several group by key operations that in the Dataflow case require checkpointing state to disk

Re: Reading and Writing Binary files in Beam

2019-04-26 Thread Jeff Klukas
You can use the read* and write* methods of FileIO to read and write arbitrary binary files. The examples in the Javadoc for FileIO [0] include an example of reading the entire contents of a file as a string into a Beam record, along with metadata about the file. If a one-to-one mapping of files t

Re: Do I have any control over bundle sizes?

2019-04-05 Thread Jeff Klukas
t;> operation itself to remove duplicates (PubSub is at least once >> delivery). Drain should work for a pipeline like this without any >> issues. >> >> On Fri, Apr 5, 2019 at 3:33 PM Jeff Klukas wrote: >> > >> > Thanks for the suggestions. Ideally, I'd

Re: Do I have any control over bundle sizes?

2019-04-05 Thread Jeff Klukas
7; it with the State API. See the discussion on > > this ticket: > > https://issues.apache.org/jira/browse/BEAM-6886 > > > > On Thu, Apr 4, 2019 at 9:42 PM Jeff Klukas wrote: > > > > > > As far as I can tell, Beam expects runners to have full control over > separa

Do I have any control over bundle sizes?

2019-04-04 Thread Jeff Klukas
As far as I can tell, Beam expects runners to have full control over separation of individual elements into bundles and this is something users have no control over. Is that true? Or are there any ways that I might exert some influence over bundle sizes? My main interest at the moment is investiga

Re: Push Kafka data to S3 based on window size in bytes

2019-02-14 Thread Jeff Klukas
I'm not aware that there's currently any way to trigger based on data size. As you state, AfterPane.elementCountAtLeast lets you trigger based on number of elements, but from my reading of the implementations of triggers in the Java SDK, triggers don't have access to sufficient data to maintain sum

Re: Beam pipeline should fail when one FILE_LOAD fails for BigQueryIO on Flink

2019-02-13 Thread Jeff Klukas
To question 1, I also would have expected the pipeline to fail in the case of files failing to load; I'm not sure why it's not. I thought the BigQuery API returns a 400 level response code in the case of files failing and that would bubble up to a pipeline execution error, but I haven't dug through

Re: Tuning BigQueryIO.Write

2019-01-30 Thread Jeff Klukas
ou maybe share the code of your pipeline? > > Cheers, > Tobi > > On Tue, Jan 22, 2019 at 9:28 PM Jeff Klukas wrote: > >> I'm attempting to deploy a fairly simple job on the Dataflow runner that >> reads from PubSub and writes to BigQuery using file loads, but I h

Re: How to disable sharding with FileIO.write()/FileIO.writeDynamic

2019-01-24 Thread Jeff Klukas
Hi Sri, To Question 1, you should be able to set `... .to("/tmp/parquet/").withNumShards(1)` to produce a single output file. To Question 2, yes if your desired output file name depends on contents of the record itself, that's exactly what FileIO.writeDynamic() is for. If you can get the name fro

Tuning BigQueryIO.Write

2019-01-22 Thread Jeff Klukas
I'm attempting to deploy a fairly simple job on the Dataflow runner that reads from PubSub and writes to BigQuery using file loads, but I have so far not been able to tune it to keep up with the incoming data rate. I have configured BigQueryIO.write to trigger loads every 5 minutes, and I've let t

Re: KafkaIO error

2019-01-22 Thread Jeff Klukas
> this means that I am loosing data or if this will be retried by the sink? I don't have direct experience with KafkaIO, but noting that this exception happened in the finishBundle method, Beam will not have committed the bundle. More specifically, looking at the KafkaWriter code, I see that fin

Recommendations for controlling FileIO.Write size

2019-01-16 Thread Jeff Klukas
Related to a previous thread about custom triggering on GlobalWindows [0], are there general recommendations for controlling size of output files from FileIO.Write? A general pattern I've seen in systems that need to batch individual records to files is that they offer both a maximum file size and

Re: Possible to use GlobalWindows for writing unbounded input to files?

2019-01-11 Thread Jeff Klukas
PM Reuven Lax wrote: > Ah, > > numShards = 0 is explicitly not supported in unbounded mode today, for the > reason mentioned above. If FileIO doesn't reject the pipeline in that case, > we should fix that. > > Reuven > > On Fri, Jan 11, 2019 at 9:23 AM Je

Re: Possible to use GlobalWindows for writing unbounded input to files?

2019-01-11 Thread Jeff Klukas
to >> do with Flattening two PCollections together) with their original trigger. >> Without this, we also know that you can have three PCollections with >> identical triggering and you can CoGroupByKey them together but you cannot >> do this three-way join as a sequence of binary jo

Re: Possible to use GlobalWindows for writing unbounded input to files?

2019-01-09 Thread Jeff Klukas
derlying issue here but writing > unbounded input to files when using GlobalWindows for unsharded output is a > valid usecase so sounds like a bug. Feel free to create a JIRA. > > - Cham > > On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas wrote: > >> I've read more

Re: Possible to use GlobalWindows for writing unbounded input to files?

2019-01-09 Thread Jeff Klukas
different code path that doesn't flatten collections and no exception is thrown. So, this might really be considered a bug of WriteFiles (and thus FileIO). But I'd love to hear other interpretations. On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas wrote: > I'm building a pipeline th

Possible to use GlobalWindows for writing unbounded input to files?

2019-01-09 Thread Jeff Klukas
I'm building a pipeline that streams from Pubsub and writes to files. I'm using FileIO's dynamic destinations to place elements into different directories according to date and I really don't care about ordering of elements beyond the date buckets. So, I think GlobalWindows is appropriate in this

Re: Using gRPC with PubsubIO?

2019-01-02 Thread Jeff Klukas
//github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L373 > > On Wed, Jan 2, 2019 at 12:11 PM Jeff Klukas wrote: > >> I see that the Beam codebase includes a PubsubGrpcClient, but there >> doesn

Using gRPC with PubsubIO?

2019-01-02 Thread Jeff Klukas
I see that the Beam codebase includes a PubsubGrpcClient, but there doesn't appear to be any way to configure PubsubIO to use that client over the PubsubJsonClient. There's even a PubsubIO.Read#withClientFactory, but it's marked as for testing only. Is gRPC support something that's still in devel

Re: Providing default values for ValueProvider

2018-12-20 Thread Jeff Klukas
runtime. On Thu, Dec 20, 2018 at 4:47 PM Jeff Klukas wrote: > I did some more testing this afternoon, and it looks like I'm incorrect on > some details here. > > Parameters implemented as ValueProviders can be provided at compile time > and they do end up in the template.

Re: Providing default values for ValueProvider

2018-12-20 Thread Jeff Klukas
e? Or experience handling default values for ValueProviders? On Thu, Dec 20, 2018 at 3:49 PM Jeff Klukas wrote: > I am deploying Beam pipelines with the DataflowRunner and would like to > move more of the pipeline options to use the ValueProvider interface so > they can be specified at

Providing default values for ValueProvider

2018-12-20 Thread Jeff Klukas
I am deploying Beam pipelines with the DataflowRunner and would like to move more of the pipeline options to use the ValueProvider interface so they can be specified at runtime rather than at template compile time, but running into various issues. First, it's been unclear to the operations enginee

Re: [Dataflow] Delaying PubSubIO acks until FileIO completes writes

2018-12-06 Thread Jeff Klukas
imple pipeline like this? In effect, would you recommend we write a custom application using the Pub/Sub and Storage SDKs directly rather than trying to use Beam's abstractions? On Wed, Dec 5, 2018 at 2:24 PM Raghu Angadi wrote: > > On Wed, Dec 5, 2018 at 11:13 AM Jeff Klukas wr

[Dataflow] Delaying PubSubIO acks until FileIO completes writes

2018-12-05 Thread Jeff Klukas
We are attempting to build a Beam pipeline running on Dataflow that reads from a Cloud Pub/Sub subscription and writes 10 minute windows to files on GCS via FileIO. At-least-once completeness is critical in our use case, so we are seeking the simplest possible solution with obvious and verifiable d

Re: TextIO setting file dynamically issue

2018-11-28 Thread Jeff Klukas
You can likely achieve what you want using FileIO with dynamic destinations, which is described in the "Advanced features" section of the TextIO docs [0]. Your case might look something like: PCollection events = ...; events.apply(FileIO.writeDynamic() .by(event -> formatAsHHMMSS(event.t

BigQueryIO failure handling for writes

2018-11-16 Thread Jeff Klukas
I'm trying to write a robust pipeline that takes input from PubSub and writes to BigQuery. For every PubsubMessage that is not successfully written to BigQuery, I'd like to get the original PubsubMessage back and be able to write to an error output collection. I'm not sure this is quite possible, t

Re: Is it possible to run a perl scrip in Dataflow worker?

2018-10-24 Thread Jeff Klukas
Another option here would be to make the perl script operate on batches. Your DoFn could then store the records to a buffer rather than outputting them and then periodically flush the buffer, sending records through the perl script and sending to output. On Wed, Oct 24, 2018 at 3:03 PM Robert Brad

Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-19 Thread Jeff Klukas
der is inferred? > * can't supply display data? > > +user@beam.apache.org , do users think that the > provided API would be useful enough for it to be added to the core SDK or > would the addition of the method provide noise/detract from the existing > API? > > On Mon, Sep