ta.
>>>
>>>
>>> On Tue, May 18, 2021, 3:21 PM Miguel Anzo Palomo <
>>> miguel.a...@wizeline.com> wrote:
>>>
>>>> Hi, I’m looking at how to implement a reader as a SplittableDoFn and
>>>> I'm having some problems with the
ate to stat files or query databases in order to
>> determine the initial restrictions and partitions of the data.
>>
>>
>> On Tue, May 18, 2021, 3:21 PM Miguel Anzo Palomo <
>> miguel.a...@wizeline.com> wrote:
>>
>>> Hi, I’m looking at how to im
t's entirely appropriate to stat files or query databases in order to
> determine the initial restrictions and partitions of the data.
>
>
> On Tue, May 18, 2021, 3:21 PM Miguel Anzo Palomo
> wrote:
>
>> Hi, I’m looking at how to implement a reader as a SplittableDoFn
Anzo Palomo
wrote:
> Hi, I’m looking at how to implement a reader as a SplittableDoFn and I'm
> having some problems with the initial restriction, specifically, how do you
> set the initial restriction if you don’t know the size of the data?
> The DoFn that I'm working on tak
Hi, I’m looking at how to implement a reader as a SplittableDoFn and I'm
having some problems with the initial restriction, specifically, how do you
set the initial restriction if you don’t know the size of the data?
The DoFn that I'm working on takes a PCollection of Spanner *ReadOpera
m what happens to window assignments through a
>> SplittableDoFn. Are output elements automatically assigned to the same
>> window as input elements?
>>
>> Thanks,
>> Evan
>>
>
Hi,
Yes, just like normal DoFn, Splittable DoFn preserves the window
information as well.
On Wed, May 5, 2021 at 8:04 PM Evan Galpin wrote:
> Hi folks,
>
> I’d just like to confirm what happens to window assignments through a
> SplittableDoFn. Are output elements automatically ass
Hi folks,
I’d just like to confirm what happens to window assignments through a
SplittableDoFn. Are output elements automatically assigned to the same
window as input elements?
Thanks,
Evan
a#L38>,
> CassandraIO <https://github.com/apache/beam/pull/10546>. Hope this helps
> : )
>
> On Mon, Jun 15, 2020 at 10:05 AM Pablo Estrada wrote:
>
>> Hi Pedro,
>> Boyuan shared her prototype implementation in [1]. If you're coding a
>> Splittable
/io/hbase/HBaseReadSplittableDoFn.java#L38>,
CassandraIO <https://github.com/apache/beam/pull/10546>. Hope this helps : )
On Mon, Jun 15, 2020 at 10:05 AM Pablo Estrada wrote:
> Hi Pedro,
> Boyuan shared her prototype implementation in [1]. If you're coding a
> SplittableDo
Hi Pedro,
Boyuan shared her prototype implementation in [1]. If you're coding a
SplittableDoFn, I'd guess the relevant piece of code is ReadViaSDF.java
Best
-P.
[1] https://github.com/apache/beam/pull/11749/files
On Mon, Jun 15, 2020 at 10:00 AM Pedro H S Teixeira
wrote:
> Hi Boyua
Hi Boyuan,
Is the implementation (even if incomplete) open source / available at this
moment?
Trying to implement here an IO to a custom source here using SplittableDoFn,
and it would be helpful to see more examples :)
Thanks,
Pedro
On 2020/05/29 02:16:49, Boyuan Zhang wrote:
> Hi t
Re Alexey:
1. One of the demanded feature is to discover (and remove probably) new
> partitions and topics in runtime [1].
Yes, that's the major motivation for us to build Kafka read on top of
SplittableDoFn.
Do you expect any non-compatible API changes by adding SDF Read version o
sing
>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105>
>>> by
>>> default and in the future, we may want to support custom kind if there is a
>>> request.
parallel with old one?
[1] https://issues.apache.org/jira/browse/BEAM-5786
> On 29 May 2020, at 04:16, Boyuan Zhang wrote:
>
> Hi team,
>
> I'm Boyuan, currently working on building a Kafka read PTransform on top of
> SplittableDoFn[1][2][3]. There are two questions abou
Zhang wrote:
>
>> Hi Reuven,
>>
>> I'm going to use MonotonicallyIncreasing
>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105>
>> by
>> default
java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105>
> by
> default and in the future, we may want to support custom kind if there is a
> request.
>
> On Thu, May 28, 2020 at 8:54 PM Reuven Lax wrote:
>
>> Which WatermarkEstimator do you thin
Hi Reuven,
I'm going to use MonotonicallyIncreasing
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105>
by
default and in the future, we may want to support custom kind if there is a
reque
Which WatermarkEstimator do you think should be used?
On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang wrote:
> Hi team,
>
> I'm Boyuan, currently working on building a Kafka read PTransform on top
> of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
> want
Hi team,
I'm Boyuan, currently working on building a Kafka read PTransform on top of
SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want
to discuss with you:
1. Compared to the KafkaIO.Read
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/
erformance overhead is not considerable bigger for existing users
>> (in
>> >> particular Java users) and (2) that the portability abstractions are
>> mature. We
>> >> are getting there, but not yet there.
>> >
>> >
>> > Dataflow and its internal coun
e closest non-portable equivalent has been. Java has been less of a
> focus since it is the most mature non-portable implementation but hopefully
> will move in that direction quickly soon. This would be a great time for
> any contributors who are interested in specific runners to help migrate
>
kly soon. This would be a great time for any
> contributors who are interested in specific runners to help migrate them to
> portable implementations.
>
>>
>> On Tue, Mar 3, 2020 at 6:57 PM Robert Bradshaw wrote:
>> >
>> > On Tue, Mar 3, 2020 at 9:11
kly soon. This would be a great time for
any contributors who are interested in specific runners to help migrate
them to portable implementations.
> On Tue, Mar 3, 2020 at 6:57 PM Robert Bradshaw
> wrote:
> >
> > On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía wrote:
> > >
>
beam/sdk/transforms/DoFn.java#L1174
> On Tue, Mar 3, 2020 at 7:03 PM Luke Cwik wrote:
> >
> >
> >
> > On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía wrote:
> >>
> >> > the unification of bounded/unbounded within SplittableDoFn has always
> been a
icular Java users) and (2) that the portability abstractions are mature. We
are getting there, but not yet there.
On Tue, Mar 3, 2020 at 6:57 PM Robert Bradshaw wrote:
>
> On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía wrote:
> >
> > > the unification of bounded/unbounded wit
gt;>
>> > the unification of bounded/unbounded within SplittableDoFn has always been
>> > a goal.
>>
>> I am glad to know that my intuition is correct and that this was envisioned,
>> the
>> idea of checkpoints for bounded inputs sounds super really useful
On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía wrote:
> > the unification of bounded/unbounded within SplittableDoFn has always
> been a goal.
>
> I am glad to know that my intuition is correct and that this was
> envisioned, the
> idea of checkpoints for bounded inputs sound
On Tue, Mar 3, 2020 at 9:11 AM Ismaël Mejía wrote:
>
> > the unification of bounded/unbounded within SplittableDoFn has always been
> > a goal.
>
> I am glad to know that my intuition is correct and that this was envisioned,
> the
> idea of checkpoints for bounded
> the unification of bounded/unbounded within SplittableDoFn has always been a
> goal.
I am glad to know that my intuition is correct and that this was envisioned, the
idea of checkpoints for bounded inputs sounds super really useful. Eager to try
that on practice.
An explicit example (
l, monotonic, and eventually a
probabilistic one).
> Ismael, the unification of bounded/unbounded within SplittableDoFn has always
> been a goal. There are a set of features that BoundedSources are unlikely to
> use but would still be allowed to use them. For example, bounded sources may
Jan, there are some parts of Apache Beam the watermarks package will likely
rely on (@Experimental annotation, javadoc links) but fundamentally should
not rely on core and someone could create a separate package for this.
Ismael, the unification of bounded/unbounded within SplittableDoFn has
either the
>> framework invokes allowing it to observe the timestamps of output records
>> or a manual watermark estimator that can be explicitly invoked to update
>> the watermark.
>>
>> See [3] for an initial PR with the public facing additions to the core
>&g
that can be explicitly invoked to update
> the watermark.
>
> See [3] for an initial PR with the public facing additions to the core
> Java API related to SplittableDoFn.
>
> This mirrors a bunch of work that was done by Boyuan within the Pyhon SDK
> [4, 5] but in the style o
invoked to update the watermark.
See [3] for an initial PR with the public facing additions to
the core Java API related to SplittableDoFn.
This mirrors a bunch of work that was done by Boyuan within
the Pyhon SDK [4, 5] but in the style of new DoFn
param
by GetInitialRestriction.
>> NewWatermarkEstimator: Returns a watermark estimator that either the
>> framework invokes allowing it to observe the timestamps of output records
>> or a manual watermark estimator that can be explicitly invoked to update
>> the watermark.
>&g
he
> framework invokes allowing it to observe the timestamps of output records
> or a manual watermark estimator that can be explicitly invoked to update
> the watermark.
>
> See [3] for an initial PR with the public facing additions to the core
> Java API related to SplittableD
be explicitly invoked to update
the watermark.
See [3] for an initial PR with the public facing additions to the core Java
API related to SplittableDoFn.
This mirrors a bunch of work that was done by Boyuan within the Pyhon SDK
[4, 5] but in the style of new DoFn parameter/method invocation
e
files, then we need to wait for processing all files to successfully
checkpoint.
1. Are my assumption correct?
2. Is there some possibility to improve behavior of SplittableDoFn (or
subsequent reading from BoundedSource) for Flink to better propagate
checkpoint barrier?
For now my fix
more files, then we
need to wait for processing all files to successfully checkpoint.
1. Are my assumption correct?
2. Is there some possibility to improve behavior of SplittableDoFn (or
subsequent reading from BoundedSource) for Flink to better propagate
checkpoint barrier?
For now my fix is
t, since I'll be referring to concepts from it.
>>>
>>> After working on the prototype I've changed my mind on the original
>>> decisions to go with an interface approach and a combined restriction +
>>> tracker. But I don't want to go all in and creat
on the original
>> decisions to go with an interface approach and a combined restriction +
>> tracker. But I don't want to go all in and create another doc with a
>> detailed proposal, so I've laid out a brief summary of the changes to get
>> some initial feedback befor
isions to go with an interface approach and a combined restriction +
> tracker. But I don't want to go all in and create another doc with a
> detailed proposal, so I've laid out a brief summary of the changes to get
> some initial feedback before I go ahead and start working on
of the changes to get
some initial feedback before I go ahead and start working on these changes
in detail. Please let me know what you think!
*1. Change from native Go interfaces to dynamic reflection-based API.*
Instead of the native Go interfaces (SplittableDoFn, RProvider, and
RTracker) de
;>> Yes we may need a Coder effectively for both sides, only
>>>> thing I don’t
>>>> >> >> >>> like is external impact in the API. I mean it is not too
>>>> complex, but
>>>> >> >> >>> adds some extras to
is that the
>>> Runner may give any arbitrary byte[] backlog to the SDK during splitting
>>> and this coder would need to be able to handle it.
>>> >> >> >>
>>> >> >> >>>
>>> >> >> >>> > Ismael, I looked at the API aro
can't rename because that would be a
>> backwards incompatible change for existing users of
>> ByteKeyRange/OffsetRange. This would allow us to add methods relevant to
>> SDF and remove methods that aren't needed.
>> >> >> >>> > 2) Rename
I previously responded to your post on user@:
https://lists.apache.org/thread.html/5c10b7edf982ef63d1d1d70545e3fe2716d00628ff5c2a7854383413@%3Cuser.beam.apache.org%3E
I've also mirrored my response on StackOverflow:
https://stackoverflow.com/a/53771980/33791
On Thu, Dec 13, 2018 at 4:21 PM Chak-P
Hello everyone!
I asked the following question and think I might get some suggestions
whether what I want is doable or not.
https://stackoverflow.com/questions/53746046/how-can-i-implement-zipwithindex-like-spark-in-apache-beam/53747612#53747612
If I can get `PCollection` id and the number of (c
y
> >> >> >>> lean to (3) Leave things as they are) save if there is important
> >> >> >>> things to change/fix (1) which I am not aware of.
> >> >> >>
> >> >> >>
> >> >> >> Sounds good to me.
relevant to SDF and remove methods that aren't needed.
>> >> >>> > 2) Rename ByteKeyRangeTracker to ByteKeyRangeRestrictionTracker and
>> >> >>> > OffsetRangeTracker to OffsetRangeRestrictionTracker. Not really
>> >> >>>
t the ‘new’ Backlog concept following a
> >> >>> >> quick look at the PR
> >> >>> >> https://github.com/apache/beam/pull/6969/files
> >> >>> >>
> >> >>> >> 1. Is the Backlog a specific concept fo
and my pre excuses if some questions look silly,
>> >>> >> but the last documents were a lot of info that I have not yet fully
>> >>> >> digested.
>> >>> >>
>> >>> >> I have some questions about the ‘new’ Backlo
t; quick look at the PR
>> >>> >> https://github.com/apache/beam/pull/6969/files
>> >>> >>
>> >>> >> 1. Is the Backlog a specific concept for each IO? Or in other
>> words:
>> >>> >> ByteKeyRestrictionTracker
e same restriction tracker will
> provide the same backlog computation. For example, if HBase/Bigtable use
> the ByteKeyRestrictionTracker then they will use the same backlog
> calculation. Note that an implementation could subclass a restriction
> tracker if the data store could pro
will provide the same
>>> > backlog computation. For example, if HBase/Bigtable use the
>>> > ByteKeyRestrictionTracker then they will use the same backlog
>>> > calculation. Note that an implementation could subclass a restriction
>>> > tracker if t
e default
>> backlog for a ByteKeyRestrictionTracker over [startKey, endKey) is
>> distance(currentKey, lastKey) where distance is represented as byte array
>> subtraction (which can be wildly inaccurrate as the density of data is not
>> well reflected) but if HBase/Bigtable
>
> > Other common examples of backlogs would be:
> > * files: backlog = length of file - current byte offset
> > * message queues: backlog = number of outstanding messages
> >
> >>
> >>
> >> 2. Since the backlog is a byte[] this means that it is
tion, is this correct? Also
>> since splitRestriction has now the Backlog as an argument, what do we
>> expect the person that implements this method in a DoFn to do ideally
>> with it? Maybe a more concrete example of how things fit for
>> File/Offset or HBase/Bigtable/Byt
s a restriction tracker if the
>>>>> data store could provide additional information. For example, the default
>>>>> backlog for a ByteKeyRestrictionTracker over [startKey, endKey) is
>>>>> distance(currentKey, lastKey) where distance is represented
backlog calculation.
>>>>> Note that an implementation could subclass a restriction tracker if the
>>>>> data store could provide additional information. For example, the default
>>>>> backlog for a ByteKeyRestrictionTracker over [startKey, endKey) is
>
a better representation could be provided.
>>>>
>>>> Other common examples of backlogs would be:
>>>> * files: backlog = length of file - current byte offset
>>>> * message queues: backlog = number of outstanding messages
>>>>
>>>&
s would be:
>>> * files: backlog = length of file - current byte offset
>>> * message queues: backlog = number of outstanding messages
>>>
>>>
>>>>
>>>> 2. Since the backlog is a byte[] this means that it is up to the user
>>>>
up to the user
>>> to give it a meaning depending on the situation, is this correct? Also
>>> since splitRestriction has now the Backlog as an argument, what do we
>>> expect the person that implements this method in a DoFn to do ideally
>>> with it? Maybe a m
nce splitRestriction has now the Backlog as an argument, what do we
>> expect the person that implements this method in a DoFn to do ideally
>> with it? Maybe a more concrete example of how things fit for
>> File/Offset or HBase/Bigtable/ByteKey will be helpful (maybe also for
>
or HBase/Bigtable/ByteKey will be helpful (maybe also for
> the BundleFinalizer concept too).
>
Yes, the restriction tracker/restriction/SplittableDoFn must give the
byte[] a meaning. This can have any meaning but we would like that the
backlog byte[] representation to be lexicograhically comp
Some late comments, and my pre excuses if some questions look silly,
but the last documents were a lot of info that I have not yet fully
digested.
I have some questions about the ‘new’ Backlog concept following a
quick look at the PR
https://github.com/apache/beam/pull/6969/files
1. Is the Backlo
On Wed, Nov 7, 2018 at 8:33 AM Robert Bradshaw wrote:
> I think that not returning the users specific subclass should be fine.
> Does the removal of markDone imply that the consumer always knows a
> "final" key to claim on any given restriction?
>
Yes, each restriction needs to support claiming
I think that not returning the users specific subclass should be fine.
Does the removal of markDone imply that the consumer always knows a
"final" key to claim on any given restriction?
On Wed, Nov 7, 2018 at 1:45 AM Lukasz Cwik wrote:
>
> I have started to work on how to change the user facing AP
I have started to work on how to change the user facing API within the Java
SDK to support splitting/checkpointing[1], backlog reporting[2] and bundle
finalization[3].
I have this PR[4] which contains minimal interface/type definitions to
convey how the API surface would change with these 4 change
Kh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing
> (quite
> > > proud of the animations though ;-)
> > >
> > > _/
> > > _/ Alex Van Boxel
> > >
> > >
> > > On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik > > <mailto:
he animations though ;-)
> >
> > _/
> > _/ Alex Van Boxel
> >
> >
> > On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik > <mailto:lc...@google.com>> wrote:
> >
> > Reuven, just inside the restriction tracker itself which is scoped
> &
hu, Sep 27, 2018 at 12:04 AM Lukasz Cwik <mailto:lc...@google.com>> wrote:
>
> Reuven, just inside the restriction tracker itself which is scoped
> per executing SplittableDoFn. A user could incorrectly write the
> synchronization since they are currently responsi
On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik wrote:
>
>> Reuven, just inside the restriction tracker itself which is scoped per
>> executing SplittableDoFn. A user could incorrectly write the
>> synchronization since they are currently responsible for writing it though.
>>
>&
google.com/presentation/d/1dSc6oKh5pZItQPB_QiUyEoLT2TebMnj-pmdGipkVFPk/edit?usp=sharing
(quite
proud of the animations though ;-)
_/
_/ Alex Van Boxel
On Thu, Sep 27, 2018 at 12:04 AM Lukasz Cwik wrote:
> Reuven, just inside the restriction tracker itself which is scoped per
> executing Splitt
Reuven, just inside the restriction tracker itself which is scoped per
executing SplittableDoFn. A user could incorrectly write the
synchronization since they are currently responsible for writing it though.
On Wed, Sep 26, 2018 at 2:51 PM Reuven Lax wrote:
> is synchronization over an ent
;> Can you give details on what the synchronization is per? Is it per key,
>> or global to each worker?
>>
>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik wrote:
>>
>>> As I was looking at the SplittableDoFn API while working towards making
>>> a proposal for how
t; or global to each worker?
>>
>> On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik wrote:
>>
>>> As I was looking at the SplittableDoFn API while working towards making
>>> a proposal for how the backlog/splitting API could look, I found some sharp
>>> edge
1, 2018 at 2:10 PM Lukasz Cwik wrote:
>
>> As I was looking at the SplittableDoFn API while working towards making a
>> proposal for how the backlog/splitting API could look, I found some sharp
>> edges that could be improved.
>>
>> I noticed that:
>> 1)
Can you give details on what the synchronization is per? Is it per key, or
global to each worker?
On Fri, Sep 21, 2018 at 2:10 PM Lukasz Cwik wrote:
> As I was looking at the SplittableDoFn API while working towards making a
> proposal for how the backlog/splitting API could look, I foun
As I was looking at the SplittableDoFn API while working towards making a
proposal for how the backlog/splitting API could look, I found some sharp
edges that could be improved.
I noticed that:
1) We require users to write thread safe code, this is something that we
haven't asked of users
The changes to the API have not been proposed yet. So far it has all been
about what is the representation and why.
For splitting, the current idea has been about using the backlog as a way
of telling the SplittableDoFn where to split, so it would be in terms of
whatever the SDK decided to report
mailto:lc...@google.com>> wrote:
>>> >
>>> > I came up with a proposal[1] for a progress model
>>> > solely based off of the backlog and that splits
>>> >
solely based off of the backlog and that splits
>> > should be based upon the remaining backlog we want
>> > the SDK to split at. I also give recommendations to
>> > runner authors as to how
the SDK to split at. I also give recommendations to
> > runner authors as to how an autoscaling system could
> > work based upon the measured backlog. A lot of
> > discussions around progress reporting and splitting
> >
I did not do
much work in classifying what a well behaved
SplittableDoFn is though. Much of this work builds
off ideas that Eugene had documented in the past[2].
I could use the communities wide knowledge of
different I/O
I also give recommendations to runner authors as
>>>>> to how an autoscaling system could work based upon the measured backlog. A
>>>>> lot of discussions around progress reporting and splitting in the past has
>>>>> always been around finding an optimal solution
been around finding an optimal solution, after reading a lot of
>>>> information about work stealing, I don't believe there is a general
>>>> solution and it really is upto SplittableDoFns to be well behaved. I did
>>>> not do much work in classifying wha
eporting and splitting in the past
>has
>>> always been around finding an optimal solution, after reading a lot
>of
>>> information about work stealing, I don't believe there is a general
>>> solution and it really is upto SplittableDoFns to be well behaved. I
>
ng, I don't believe there is a general
>> solution and it really is upto SplittableDoFns to be well behaved. I did
>> not do much work in classifying what a well behaved SplittableDoFn is
>> though. Much of this work builds off ideas that Eugene had documented in
>> the pa
n optimal solution, after reading a lot of
> information about work stealing, I don't believe there is a general
> solution and it really is upto SplittableDoFns to be well behaved. I did
> not do much work in classifying what a well behaved SplittableDoFn is
> though. Much of thi
work in classifying what a well behaved SplittableDoFn is
though. Much of this work builds off ideas that Eugene had documented in
the past[2].
I could use the communities wide knowledge of different I/Os to see if
computing the backlog is practical in the way that I'm suggesting and to
gathe
Awesome !
Thanks Luke !
I plan to work with you and others on this one.
Regards
JB
Le 13 août 2018 à 19:14, à 19:14, Lukasz Cwik a écrit:
>I wanted to reach out that I will be continuing from where Eugene left
>off
>with SplittableDoFn. I know that many of you have done a bunch of w
I wanted to reach out that I will be continuing from where Eugene left off
with SplittableDoFn. I know that many of you have done a bunch of work with
IOs and/or runner integration for SplittableDoFn and would appreciate your
help in advancing this awesome idea. If you have questions or things you
94 matches
Mail list logo