Siyu - The Beam metrics interface includes the Distribution metric type
which can be used for histograms:
https://beam.apache.org/documentation/programming-guide/#types-of-metrics
Particulars of support depend on the runner. For Cloud Dataflow, the
reported values are MAX, MIN, MEAN, and COUNT, s
at?
> On Fri, May 21, 2021 at 5:10 AM Jeff Klukas wrote:
>
>> Beam users,
>>
>> We're attempting to write a Java pipeline that uses Count.perKey() to
>> collect event counts, and then flush those to an HTTP API every ten minutes
>> based on proc
Beam users,
We're attempting to write a Java pipeline that uses Count.perKey() to
collect event counts, and then flush those to an HTTP API every ten minutes
based on processing time.
We've tried expressing this using GlobalWindows with an AfterProcessingTime
trigger, but we find that when we dra
Hi Thiago,
Note that Dataflow has a custom implementation of PubSub interaction, so
the code you see in PubsubIO in the Beam codebase does not necessarily
reflect Pubsub handling in Dataflow.
Dataflow acks messages as soon as they are first checkpointed, so the first
step in your pipeline that in
not sure if these errors are problematic)
>
> I’ll definitely give the SSD option a shot.
>
> Thanks.
>
> --
> Mark Kelly
> Sent with Airmail
>
> On 14 July 2020 at 15:12:46, Jeff Klukas (jklu...@mozilla.com) wrote:
>
> In my experience with writing to BQ via Bi
In my experience with writing to BQ via BigQueryIO in the Java SDK, the
bottleneck tends to be disk I/O. The BigQueryIO logic requires several
shuffles that cause checkpointing even in the case of streaming inserts,
which in the Dataflow case means writing to disk. I assume the Python logic
is simi
ttp-java-client/json.html
>>
>>
>>
>> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich
>> wrote:
>>
>>> Thanks for explaining. Is it documented somewhere that TableRow contains
>>> Map?
>>> I don't construct it, I fetch from Google
t, I fetch from Google Analytics export to BigQuery
> table.
>
> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas wrote:
>
>> I would expect the following line to fail:
>>
>> List h = ((List) bigQueryRow.get("hits"));
>>
>> The top-level bigQuery
dRunnable.run(ThreadFactoryImpl.java:55)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
> cannot be cast to class com.google.api.services.bigquery.model.TableRow
> (java.util.LinkedHashMap is in module java.base of loader 'bootst
ssions",
> ParDo.of(new CreateSessionMetrics(boolean and
> string params)))
>// few more transformations
>
> }
>
> This is basically similar to examples you can find here
> https://beam.apache.org/documentation/io/built-in/goo
On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich
wrote:
> So from what I understand, it works like this by design and it's not
> possible to test my code with the current coder implementation. Is that
> correct?
>
I would argue that this test failure is indicating an area of potential
failure in
On Wed, Jul 8, 2020 at 1:38 PM Kirill Zhdanovich
wrote:
> So it's correct implementation of TableRow that encode(decode(a)) != a?
>
A TableRow can contain fields of any map implementation. It makes sense to
me that once a TableRow object is serialized and deserialized, that the
coder must make a
t; I don't have issues running this pipeline in production. I have this
> issue, only when I tried to write end to end test.
> Do you know if there are existing coders for TableRow that I can use? I've
> tried TableRowJsonCoder, but seems like it converts all object inside
> Ta
Kirill - Can you tell us more about what Job.runJob is doing? I would not
expect the Beam SDK itself to do any casting to TableRow, so is there a
line in your code where you're explicitly casting to TableRow? There may be
a point where you need to explicitly set the coder on a PCollection to
deseri
>> Yeah - we are issuing a query rather than reading a table. Materializing
>> the results myself and reading them back seems simple enough. I will give
>> that a try!
>>
>> Thanks,
>> Matt
>>
>> On Thu, Jul 2, 2020 at 9:42 AM Jeff Klukas wrote:
>>
>&
It sounds like your pipeline is issuing a query rather than reading a whole
table.
Are you using Java or Python? I'm only familiar with the Java SDK so my
answer may be Java-biased.
I would recommend materializing the query results to a table, and then
configuring your pipeline to read that table
and if a single element in the
> bundle fails and we ignore the error on the single element, then the bundle
> is considered still successfully processed am I correct? Then it would just
> ACK everything in the bundle
>
> Kishore
>
> On Mon, Jun 1, 2020 at 10:27 AM Jeff Kluka
Is this a Python or Java pipeline?
I'm familiar with PubsubIO in Java, though I expect the behavior in Python
is similar. It will ack messages at the first checkpoint step in the
pipeline, so the behavior in your case depends on whether there is a
GroupByKey operation happening before the exceptio
like Guavas memoize[1]
> functions and use them as they are very lightweight.
>
> 1:
> https://guava.dev/releases/19.0/api/docs/com/google/common/base/Suppliers.html#memoize(com.google.common.base.Supplier)
>
> On Thu, Apr 30, 2020 at 12:45 PM Jeff Klukas wrote:
>
>> B
Beam Java users,
I've run into a few cases where I want to present a single thread-safe data
structure to all threads on a worker, and I end up writing a good bit of
custom code each time involving a synchronized method that handles creating
the resource exactly once, and then each thread has its
Aniruddh - Using BigQueryIO.Read with EXPORT method involves a potentially
long wait on BQ to complete the export.
I have experience with running Dataflow batch jobs which use this read
method to ingest ~10 TB of data in a single job. The behavior I generally
see is that the job will progress thro
Mozilla hosts the code for our data ingestion system publicly on GitHub. A
good chunk of that architecture consists of Beam pipelines running on
Dataflow.
See:
https://github.com/mozilla/gcp-ingestion/tree/master/ingestion-beam
and rendered usage documentation at:
https://mozilla.github.io/gcp-
We also had throughput issues in writing to BQ in a streaming pipeline and
we mitigated by provisioning a large quantity of SSD storage to improve I/O
throughput to disk for checkpoints.
I also Erik's suggestion to look into Streaming Engine. We are currently
looking into migrating our streaming u
gt;
That's normal; I also never saw those heap dump options display in the
Dataflow UI. I think Dataflow doesn't show any options that originate from
"Debug" options interfaces.
> On Mon, Nov 18, 2019 at 11:59 AM Jeff Klukas wrote:
>
>> Using default
Using default Dataflow workers, this is the set of options I passed:
--dumpHeapOnOOM --saveHeapDumpsToGcsPath=$MYBUCKET/heapdump --diskSizeGb=100
On Mon, Nov 18, 2019 at 11:57 AM Jeff Klukas wrote:
> It sounds like you're generally doing the right thing. I've succes
It sounds like you're generally doing the right thing. I've successfully
used --saveHeapDumpsToGcsPath in a Java pipeline running on Dataflow and
inspected the results in Eclipse MAT.
I think that --saveHeapDumpsToGcsPath will automatically turn on
--dumpHeapOnOOM but worth setting that explicitly
I ran into exactly this same problem of finding some accented characters
getting replaced with "?" in a pipeline only when running on Dataflow and
not when using the Direct Runner. KafkaIO was not involved, but I'd bet the
root cause is the same.
In my case, the input turned out to be properly UTF
(1) I don't have direct experience with contacting MongoDB in Beam, but my
general expectation is that yes, Beam IOs will do reasonable things for
connection setup and teardown. For the case of MongoDbIO in the Java SDK,
it looks like this connection setup happens in BoundedMongoDbReader [0]. In
ge
What you propose with a writer per bundle is definitely possible, but I
expect the blocker is that in most cases the runner has control of bundle
sizes and there's nothing exposed to the user to control that. I've wanted
to do similar, but found average bundle sizes in my case on Dataflow to be
so
edit the environment variables and you have to
> resort to some hackery that is JVM version dependent[2].
>
> 1:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java
> 2: https://blog.sebastian-daschner.com/entri
I just spent the past two days debugging a character corruption issue in a
Dataflow pipeline. It turned out that we had encoded a json object to a
string and then called getBytes() without specifying a charset. In our
testing infrastructure, this didn't cause a problem because the default
charset o
I have also wrestled with throughput for FileIO and BigQueryIO on Dataflow,
and in my case the bottleneck came down to disk I/O throughput on the
worker machines. Writing with FileIO or BigQueryIO involves several group
by key operations that in the Dataflow case require checkpointing state to
disk
You can use the read* and write* methods of FileIO to read and write
arbitrary binary files. The examples in the Javadoc for FileIO [0] include
an example of reading the entire contents of a file as a string into a Beam
record, along with metadata about the file.
If a one-to-one mapping of files t
t;> operation itself to remove duplicates (PubSub is at least once
>> delivery). Drain should work for a pipeline like this without any
>> issues.
>>
>> On Fri, Apr 5, 2019 at 3:33 PM Jeff Klukas wrote:
>> >
>> > Thanks for the suggestions. Ideally, I'd
7; it with the State API. See the discussion on
> > this ticket:
> > https://issues.apache.org/jira/browse/BEAM-6886
> >
> > On Thu, Apr 4, 2019 at 9:42 PM Jeff Klukas wrote:
> > >
> > > As far as I can tell, Beam expects runners to have full control over
> separa
As far as I can tell, Beam expects runners to have full control over
separation of individual elements into bundles and this is something users
have no control over. Is that true? Or are there any ways that I might
exert some influence over bundle sizes?
My main interest at the moment is investiga
I'm not aware that there's currently any way to trigger based on data size.
As you state, AfterPane.elementCountAtLeast lets you trigger based on
number of elements, but from my reading of the implementations of triggers
in the Java SDK, triggers don't have access to sufficient data to maintain
sum
To question 1, I also would have expected the pipeline to fail in the case
of files failing to load; I'm not sure why it's not. I thought the BigQuery
API returns a 400 level response code in the case of files failing and that
would bubble up to a pipeline execution error, but I haven't dug through
ou maybe share the code of your pipeline?
>
> Cheers,
> Tobi
>
> On Tue, Jan 22, 2019 at 9:28 PM Jeff Klukas wrote:
>
>> I'm attempting to deploy a fairly simple job on the Dataflow runner that
>> reads from PubSub and writes to BigQuery using file loads, but I h
Hi Sri,
To Question 1, you should be able to set `...
.to("/tmp/parquet/").withNumShards(1)` to produce a single output file.
To Question 2, yes if your desired output file name depends on contents of
the record itself, that's exactly what FileIO.writeDynamic() is for. If you
can get the name fro
I'm attempting to deploy a fairly simple job on the Dataflow runner that
reads from PubSub and writes to BigQuery using file loads, but I have so
far not been able to tune it to keep up with the incoming data rate.
I have configured BigQueryIO.write to trigger loads every 5 minutes, and
I've let t
> this means that I am loosing data or if this will be retried by the sink?
I don't have direct experience with KafkaIO, but noting that this exception
happened in the finishBundle method, Beam will not have committed the
bundle.
More specifically, looking at the KafkaWriter code, I see that fin
Related to a previous thread about custom triggering on GlobalWindows [0],
are there general recommendations for controlling size of output files from
FileIO.Write?
A general pattern I've seen in systems that need to batch individual
records to files is that they offer both a maximum file size and
PM Reuven Lax wrote:
> Ah,
>
> numShards = 0 is explicitly not supported in unbounded mode today, for the
> reason mentioned above. If FileIO doesn't reject the pipeline in that case,
> we should fix that.
>
> Reuven
>
> On Fri, Jan 11, 2019 at 9:23 AM Je
to
>> do with Flattening two PCollections together) with their original trigger.
>> Without this, we also know that you can have three PCollections with
>> identical triggering and you can CoGroupByKey them together but you cannot
>> do this three-way join as a sequence of binary jo
derlying issue here but writing
> unbounded input to files when using GlobalWindows for unsharded output is a
> valid usecase so sounds like a bug. Feel free to create a JIRA.
>
> - Cham
>
> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas wrote:
>
>> I've read more
different code path that doesn't flatten collections and no
exception is thrown.
So, this might really be considered a bug of WriteFiles (and thus FileIO).
But I'd love to hear other interpretations.
On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas wrote:
> I'm building a pipeline th
I'm building a pipeline that streams from Pubsub and writes to files. I'm
using FileIO's dynamic destinations to place elements into different
directories according to date and I really don't care about ordering of
elements beyond the date buckets.
So, I think GlobalWindows is appropriate in this
//github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L373
>
> On Wed, Jan 2, 2019 at 12:11 PM Jeff Klukas wrote:
>
>> I see that the Beam codebase includes a PubsubGrpcClient, but there
>> doesn
I see that the Beam codebase includes a PubsubGrpcClient, but there doesn't
appear to be any way to configure PubsubIO to use that client over the
PubsubJsonClient.
There's even a PubsubIO.Read#withClientFactory, but it's marked as for
testing only.
Is gRPC support something that's still in devel
runtime.
On Thu, Dec 20, 2018 at 4:47 PM Jeff Klukas wrote:
> I did some more testing this afternoon, and it looks like I'm incorrect on
> some details here.
>
> Parameters implemented as ValueProviders can be provided at compile time
> and they do end up in the template.
e? Or experience handling default values for ValueProviders?
On Thu, Dec 20, 2018 at 3:49 PM Jeff Klukas wrote:
> I am deploying Beam pipelines with the DataflowRunner and would like to
> move more of the pipeline options to use the ValueProvider interface so
> they can be specified at
I am deploying Beam pipelines with the DataflowRunner and would like to
move more of the pipeline options to use the ValueProvider interface so
they can be specified at runtime rather than at template compile time, but
running into various issues.
First, it's been unclear to the operations enginee
imple pipeline like this?
In effect, would you recommend we write a custom application using the
Pub/Sub and Storage SDKs directly rather than trying to use Beam's
abstractions?
On Wed, Dec 5, 2018 at 2:24 PM Raghu Angadi wrote:
>
> On Wed, Dec 5, 2018 at 11:13 AM Jeff Klukas wr
We are attempting to build a Beam pipeline running on Dataflow that reads
from a Cloud Pub/Sub subscription and writes 10 minute windows to files on
GCS via FileIO. At-least-once completeness is critical in our use case, so
we are seeking the simplest possible solution with obvious and verifiable
d
You can likely achieve what you want using FileIO with dynamic
destinations, which is described in the "Advanced features" section of the
TextIO docs [0].
Your case might look something like:
PCollection events = ...;
events.apply(FileIO.writeDynamic()
.by(event -> formatAsHHMMSS(event.t
I'm trying to write a robust pipeline that takes input from PubSub and
writes to BigQuery. For every PubsubMessage that is not successfully
written to BigQuery, I'd like to get the original PubsubMessage back and be
able to write to an error output collection. I'm not sure this is quite
possible, t
Another option here would be to make the perl script operate on batches.
Your DoFn could then store the records to a buffer rather than outputting
them and then periodically flush the buffer, sending records through the
perl script and sending to output.
On Wed, Oct 24, 2018 at 3:03 PM Robert Brad
der is inferred?
> * can't supply display data?
>
> +user@beam.apache.org , do users think that the
> provided API would be useful enough for it to be added to the core SDK or
> would the addition of the method provide noise/detract from the existing
> API?
>
> On Mon, Sep
59 matches
Mail list logo