Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
Well, if every input record's timestamp is X, watermark staying at X is the right answer, no? But I am not sure where the disagreement is, actually. I might be mistaken. KafkaIO has a few in-built policies for watermark and timestamp that cover most use cases (including server time, which has a be

Re: KafkaIO - Deadletter output

2018-10-24 Thread Kenneth Knowles
Forgive me if this is naive or missing something, but here are my thoughts on these alternatives: (0) Timestamp has to be pulled out in the source to control the watermark. Luke's point is imortant. (1) If bad records get min_timestamp, and they occur infrequently enough, then watermark will adva

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
That depends on the users pipeline and how watermark advancement of the source may impact elements becoming droppably late if they are emitted with the minimum timestamp. On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi wrote: > I see. > > What I meant was to return min_timestamp for bad records in

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
I see. What I meant was to return min_timestamp for bad records in the timestamp handler passed to KafkaIO itself, and correct timestamp for parsable records. That should work too, right? On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik wrote: > Yes, that would be fine. > > The user could then use a

Re: Is it possible to run a perl scrip in Dataflow worker?

2018-10-24 Thread Reza Rokni
Hi, Not directly connected ( its for java sdk ) but some of the concepts in these materials maybe useful: https://cloud.google.com/blog/products/gcp/running-external-libraries-with-cloud-dataflow-for-grid-computing-workloads https://github.com/apache/beam/tree/master/examples/java/src/main/java

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
Yes, that would be fine. The user could then use a ParDo which outputs to a DLQ for things it can't parse the timestamp for and use outputWithTimestamp[1] for everything else. 1: https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTime

Re: Is it possible to run a perl scrip in Dataflow worker?

2018-10-24 Thread Jeff Klukas
Another option here would be to make the perl script operate on batches. Your DoFn could then store the records to a buffer rather than outputting them and then periodically flush the buffer, sending records through the perl script and sending to output. On Wed, Oct 24, 2018 at 3:03 PM Robert Brad

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
Thanks. So returning min timestamp is OK, right (assuming application fine is with what it means)? On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik wrote: > All records in Apache Beam have a timestamp. The default timestamp is the > min timestamp defined here: > https://github.com/apache/beam/blob/2

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
All records in Apache Beam have a timestamp. The default timestamp is the min timestamp defined here: https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48 On Wed, Oct 24, 2018 at 1

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik wrote: > You would have to return min timestamp for all records otherwise the > watermark may have advanced and you would be outputting records that are > droppably late. > That would be fine I guess. What’s the timestamp for a record that doesn’t hav

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
You would have to return min timestamp for all records otherwise the watermark may have advanced and you would be outputting records that are droppably late. On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi wrote: > To be clear, returning min_timestamp for unparsable records shound not > affect the

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
To be clear, returning min_timestamp for unparsable records shound not affect the watermark. On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi wrote: > How about returning min_timestamp? The would be dropped or redirected by > the ParDo after that. > Btw, TimestampPolicyFactory.withTimestampFn() is

Re: Is it possible to run a perl scrip in Dataflow worker?

2018-10-24 Thread Robert Bradshaw
While one does want to watch out for expensive per-record operations, this may still be preferable to (and cheaper than) setting up a server and making RPC requests. It depends on the nature of the operation. If executing the perl script is (say) 100ms of "startup" for 1ms of actually processing $D

Re: write to a kafka topic that is set in data

2018-10-24 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 10:47 AM Raghu Angadi wrote: > My bad Alexey, I will review today. I had skimmed through the patch on my > phone. You are right, exactly-once sink support is not required for now. > > It is a quite a different beast and necessarily coupled with transactions > on a speci

Re: write to a kafka topic that is set in data

2018-10-24 Thread Raghu Angadi
My bad Alexey, I will review today. I had skimmed through the patch on my phone. You are right, exactly-once sink support is not required for now. It is a quite a different beast and necessarily coupled with transactions on a specific topic-partitions for correctness. The primary concern is with t

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
How about returning min_timestamp? The would be dropped or redirected by the ParDo after that. Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is this pipeline defined under kafkaio package? On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik wrote: > In this case, the user is attemp

Re: KafkaIO - Deadletter output

2018-10-24 Thread Lukasz Cwik
In this case, the user is attempting to handle errors when parsing the timestamp. The timestamp controls the watermark for the UnboundedSource, how would they control the watermark in a downstream ParDo? On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi wrote: > On Wed, Oct 24, 2018 at 7:19 AM Chamik

Re: KafkaIO - Deadletter output

2018-10-24 Thread Raghu Angadi
On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath wrote: > Ah nice. Yeah, if user can return full bytes instead of applying a > function that would result in an exception, this can be extracted by a > ParDo down the line. > KafkaIO does return bytes, and I think most sources should, unless the

Re: KafkaIO - Deadletter output

2018-10-24 Thread Chamikara Jayalath
Ah nice. Yeah, if user can return full bytes instead of applying a function that would result in an exception, this can be extracted by a ParDo down the line. On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia wrote: > As Raghu said, > > Just apply a regular ParDo and return a PCollectionTuple

Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Maximilian Michels
The FlinkRunner uses a hash function (MurmurHash) on each key which places keys somewhere in the hash space. The hash space (2^32) is split among the partitions (5 in your case). Given enough keys, the chance increases they are equally spread. This should be similar to what the other Runners d

Re: write to a kafka topic that is set in data

2018-10-24 Thread Alexey Romanenko
I added a simple support of this for usual type of Kafka sink (PR: https://github.com/apache/beam/pull/6776 , welcomed for review, btw :) ) In the same time, there is another, more complicated, type of sink - EOS (Exactly Once Sink). In this case the d

Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Jozef Vilcek
So if I run 5 workers with 50 shards, I end up with: Duration Bytes received Records received 2m 39s 900 MB 465,525 2m 39s1.76 GB 930,720 2m 39s 789 MB 407,315 2m 39s1.32 GB 698,262 2m 39s 788 MB

Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Jozef Vilcek
cc (dev) I tried to run the example with FlinkRunner in batch mode and received again bad data spread among the workers. When I tried to remove number of shards for batch mode in above example, pipeline crashed before launch Caused by: java.lang.IllegalStateException: Inputs to Flatten had incom