Gotcha, I think there's a fairly easy solution to link input and output
streams.... Let me try it out... might even be possible to have both
element and stream-wise closure pardos. Definitely possible to have that at
the DoFn level (called SerializableFn in the SDK because I want to
use @DoFn as a macro)

On Thu, Aug 24, 2023 at 1:09 PM Robert Bradshaw <rober...@google.com> wrote:

> On Thu, Aug 24, 2023 at 12:58 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>>
>>
>> On Thu, Aug 24, 2023 at 12:27 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> I would like to figure out a way to get the stream-y interface to work,
>>> as I think it's more natural overall.
>>>
>>> One hypothesis is that if any elements are carried over loop iterations,
>>> there will likely be some that are carried over beyond the loop (after all
>>> the callee doesn't know when the loop is supposed to end). We could reject
>>> "plain" elements that are emitted after this point, requiring one to emit
>>> timestamp-windowed-values.
>>>
>>
>> Are you assuming that the same stream (or overlapping sets of data) are
>> pushed to multiple workers ? I thought that the set of data streamed here
>> are the data that belong to the current bundle (hence already assigned to
>> the current worker) so any output from the current bundle invocation would
>> be a valid output of that bundle.
>>
>>>
> Yes, the content of the stream is exactly the contents of the bundle. The
> question is how to do the input_element:output_element correlation for
> automatically propagating metadata.
>
>
>> Related to this, we could enforce that the only (user-accessible) way to
>>> get such a timestamped value is to start with one, e.g. a
>>> WindowedValue<T>.withValue(O) produces a WindowedValue<O> with the same
>>> metadata but a new value. Thus a user wanting to do anything "fancy" would
>>> have to explicitly request iteration over these windowed values rather than
>>> over the raw elements. (This is also forward compatible with expanding the
>>> metadata that can get attached, e.g. pane infos, and makes the right thing
>>> the easiest/most natural.)
>>>
>>> On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis <byronel...@google.com>
>>> wrote:
>>>
>>>> Ah, that is a good point—being element-wise would make managing windows
>>>> and time stamps easier for the user. Fortunately it’s a fairly easy change
>>>> to make and maybe even less typing for the user. I was originally thinking
>>>> side inputs and metrics would happen outside the loop, but I think you want
>>>> a class and not a closure at that point for sanity.
>>>>
>>>> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> Ah, I see.
>>>>>
>>>>> Yeah, I've thought about using an iterable for the whole bundle rather
>>>>> than start/finish bundle callbacks, but one of the questions is how that
>>>>> would impact implicit passing of the timestamp (and other) metadata from
>>>>> input elements to output elements. (You can of course attach the metadata
>>>>> to any output that happens in the loop body, but it's very easy to
>>>>> implicitly to break the 1:1 relationship here (e.g. by doing buffering or
>>>>> otherwise modifying local state) and this would be hard to detect. (I
>>>>> suppose trying to output after the loop finishes could require
>>>>> something more explicit).
>>>>>
>>>>>
>>>>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis <byronel...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Oh, I also forgot to mention that I included element-wise collection
>>>>>> operations like "map" that eliminate the need for pardo in many cases. 
>>>>>> the
>>>>>> groupBy command is actually a map + groupByKey under the hood. That was 
>>>>>> to
>>>>>> be more consistent with Swift's collection protocol (and is also why
>>>>>> PCollection and PCollectionStream are different types... PCollection
>>>>>> implements map and friends as pipeline construction operations whereas
>>>>>> PCollectionStream is an actual stream)
>>>>>>
>>>>>> I just happened to push some "IO primitives" that uses map rather
>>>>>> than pardo in a couple of places to do a true wordcount using good ol'
>>>>>> Shakespeare and very very primitive GCS IO.
>>>>>>
>>>>>> Best,
>>>>>> B
>>>>>>
>>>>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis <byronel...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite a
>>>>>>> bit before settling on where I ended up. Ultimately I decided to go with
>>>>>>> something that felt more Swift-y than anything else which means that 
>>>>>>> rather
>>>>>>> than dealing with a single element like you do in the other SDKs you're
>>>>>>> dealing with a stream of elements (which of course will often be of size
>>>>>>> 1). That's a really natural paradigm in the Swift world especially with 
>>>>>>> the
>>>>>>> async / await structures. So when you see something like:
>>>>>>>
>>>>>>> pardo(name:"Read Files") { filenames,output,errors in
>>>>>>>
>>>>>>> for try await (filename,_,_) in filenames {
>>>>>>>   ...
>>>>>>>   output.emit(data)
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> filenames is the input stream and then output and errors are both
>>>>>>> output streams. In theory you can have as many output streams as you 
>>>>>>> like
>>>>>>> though at the moment there's a compiler bug in the new type pack feature
>>>>>>> that limits it to "as many as I felt like supporting". Presumably this 
>>>>>>> will
>>>>>>> get fixed before the official 5.9 release which will probably be in the
>>>>>>> October timeframe if history is any guide)
>>>>>>>
>>>>>>> If you had parameterization you wanted to send that would look like
>>>>>>> pardo("Parameter") { param,filenames,output,error in ... } where "param"
>>>>>>> would take on the value of "Parameter." All of this is being 
>>>>>>> typechecked at
>>>>>>> compile time BTW.
>>>>>>>
>>>>>>>
>>>>>>> the (filename,_,_) is a tuple spreading construct like you have in
>>>>>>> ES6 and other things where "_" is Swift for "ignore." In this case
>>>>>>> PCollectionStreams have an element signature of (Of,Date,Window) so you 
>>>>>>> can
>>>>>>> optionally extract the timestamp and the window if you want to 
>>>>>>> manipulate
>>>>>>> it somehow.
>>>>>>>
>>>>>>> That said it would also be natural to provide elementwise pardos---
>>>>>>> that would probably mean having explicit type signatures in the 
>>>>>>> closure. I
>>>>>>> had that at one point, but it felt less natural the more I used it. I'm
>>>>>>> also slowly working towards adding a more "traditional" DoFn 
>>>>>>> implementation
>>>>>>> approach where you implement the DoFn as an object type. In that case it
>>>>>>> would be very very easy to support both by having a default stream
>>>>>>> implementation call the equivalent of processElement. To make that
>>>>>>> performant I need to implement an @DoFn macro and I just haven't gotten 
>>>>>>> to
>>>>>>> it yet.
>>>>>>>
>>>>>>> It's a bit more work and I've been prioritizing implementing
>>>>>>> composite and external transforms for the reasons you suggest. :-) I've 
>>>>>>> got
>>>>>>> the basics of a composite transform (there's an equivalent wordcount
>>>>>>> example) and am hooking it into the pipeline generation, which should 
>>>>>>> also
>>>>>>> give me everything I need to successfully hook in external transforms as
>>>>>>> well. That will give me the jump on IOs as you say. I can also treat the
>>>>>>> pipeline itself as a composite transform which lets me get rid of the
>>>>>>> Pipeline { pipeline in ... } and just instead have things attach 
>>>>>>> themselves
>>>>>>> to the pipeline implicitly.
>>>>>>>
>>>>>>> That said, there are some interesting IO possibilities that would be
>>>>>>> Swift native. In particularly, I've been looking at the native Swift
>>>>>>> binding for DuckDB (which is C++ based). DuckDB is SQL based but not
>>>>>>> distributed in the same was as, say, Beam SQL... but it would allow for 
>>>>>>> SQL
>>>>>>> statements on individual files with projection pushdown supported for
>>>>>>> things like Parquet which could have some cool and performant data lake
>>>>>>> applications. I'll probably do a couple of the simpler IOs as
>>>>>>> well---there's a Swift AWS SDK binding that's pretty good that would 
>>>>>>> give
>>>>>>> me S3 and there's a Cloud auth library as well that makes it pretty 
>>>>>>> easy to
>>>>>>> work with GCS.
>>>>>>>
>>>>>>> In any case, I'm updating the branch as I find a minute here and
>>>>>>> there.
>>>>>>>
>>>>>>> Best,
>>>>>>> B
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 23, 2023 at 5:02 PM Robert Bradshaw <rober...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Neat.
>>>>>>>>
>>>>>>>> Nothing like writing and SDK to actually understand how the FnAPI
>>>>>>>> works :). I like the use of groupBy. I have to admit I'm a bit 
>>>>>>>> mystified by
>>>>>>>> the syntax for parDo (I don't know swift at all which is probably 
>>>>>>>> tripping
>>>>>>>> me up). The addition of external (cross-language) transforms could let 
>>>>>>>> you
>>>>>>>> steal everything (e.g. IOs) pretty quickly from other SDKs.
>>>>>>>>
>>>>>>>> On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user <
>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> For everyone who is interested, here's the draft PR:
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/pull/28062
>>>>>>>>>
>>>>>>>>> I haven't had a chance to test it on my M1 machine yet though
>>>>>>>>> (there's a good chance there are a few places that need to properly 
>>>>>>>>> address
>>>>>>>>> endianness. Specifically timestamps in windowed values and length in
>>>>>>>>> iterable coders as those both use specifically bigendian 
>>>>>>>>> representations)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis <byronel...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Cham,
>>>>>>>>>>
>>>>>>>>>> Definitely happy to open a draft PR so folks can
>>>>>>>>>> comment---there's not as much code as it looks like since most of 
>>>>>>>>>> the LOC
>>>>>>>>>> is just generated protobuf. As for the support, I definitely want to 
>>>>>>>>>> add
>>>>>>>>>> external transforms and may actually add that support before adding 
>>>>>>>>>> the
>>>>>>>>>> ability to make composites in the language itself. With the way the 
>>>>>>>>>> SDK is
>>>>>>>>>> laid out adding composites to the pipeline graph is a separate 
>>>>>>>>>> operation
>>>>>>>>>> than defining a composite.
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath <
>>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Byron. This sounds great. I wonder if there is interest
>>>>>>>>>>> in Swift SDK from folks currently subscribed to the +user
>>>>>>>>>>> <u...@beam.apache.org> list.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev <
>>>>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello everyone,
>>>>>>>>>>>>
>>>>>>>>>>>> A couple of months ago I decided that I wanted to really
>>>>>>>>>>>> understand how the Beam FnApi works and how it interacts with the 
>>>>>>>>>>>> Portable
>>>>>>>>>>>> Runner. For me at least that usually means I need to write some 
>>>>>>>>>>>> code so I
>>>>>>>>>>>> can see things happening in a debugger and to really prove to 
>>>>>>>>>>>> myself I
>>>>>>>>>>>> understood what was going on I decided I couldn't use an existing 
>>>>>>>>>>>> SDK
>>>>>>>>>>>> language to do it since there would be the temptation to read some 
>>>>>>>>>>>> code and
>>>>>>>>>>>> convince myself that I actually understood what was going on.
>>>>>>>>>>>>
>>>>>>>>>>>> One thing led to another and it turns out that to get a minimal
>>>>>>>>>>>> FnApi integration going you end up writing a fair bit of an SDK. 
>>>>>>>>>>>> So I
>>>>>>>>>>>> decided to take things to a point where I had an SDK that could 
>>>>>>>>>>>> execute a
>>>>>>>>>>>> word count example via a portable runner backend. I've now reached 
>>>>>>>>>>>> that
>>>>>>>>>>>> point and would like to submit my prototype SDK to the list for 
>>>>>>>>>>>> feedback.
>>>>>>>>>>>>
>>>>>>>>>>>> It's currently living in a branch on my fork here:
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift
>>>>>>>>>>>>
>>>>>>>>>>>> At the moment it runs via the most recent XCode Beta using
>>>>>>>>>>>> Swift 5.9 on Intel Macs, but should also work using beta builds of 
>>>>>>>>>>>> 5.9 for
>>>>>>>>>>>> Linux running on Intel hardware. I haven't had a chance to try it 
>>>>>>>>>>>> on ARM
>>>>>>>>>>>> hardware and make sure all of the endian checks are complete. The
>>>>>>>>>>>> "IntegrationTests.swift" file contains a word count example that 
>>>>>>>>>>>> reads some
>>>>>>>>>>>> local files (as well as a missing file to exercise DLQ 
>>>>>>>>>>>> functionality) and
>>>>>>>>>>>> output counts through two separate group by operations to get it 
>>>>>>>>>>>> past the
>>>>>>>>>>>> "map reduce" size of pipeline. I've tested it against the Python 
>>>>>>>>>>>> Portable
>>>>>>>>>>>> Runner. Since my goal was to learn FnApi there is no Direct Runner 
>>>>>>>>>>>> at this
>>>>>>>>>>>> time.
>>>>>>>>>>>>
>>>>>>>>>>>> I've shown it to a couple of folks already and incorporated
>>>>>>>>>>>> some of that feedback already (for example pardo was originally 
>>>>>>>>>>>> called dofn
>>>>>>>>>>>> when defining pipelines). In general I've tried to make the API as
>>>>>>>>>>>> "Swift-y" as possible, hence the heavy reliance on closures and 
>>>>>>>>>>>> while there
>>>>>>>>>>>> aren't yet composite PTransforms there's the beginnings of what 
>>>>>>>>>>>> would be
>>>>>>>>>>>> needed for a SwiftUI-like declarative API for creating them.
>>>>>>>>>>>>
>>>>>>>>>>>> There are of course a ton of missing bits still to be
>>>>>>>>>>>> implemented, like counters, metrics, windowing, state, timers, etc.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This should be fine and we can get the code documented without
>>>>>>>>>>> these features. I think support for composites and adding an 
>>>>>>>>>>> external
>>>>>>>>>>> transform (see, Java
>>>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java>,
>>>>>>>>>>> Python
>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/python/apache_beam/transforms/external.py#L556>,
>>>>>>>>>>> Go
>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/go/pkg/beam/xlang.go#L155>,
>>>>>>>>>>> TypeScript
>>>>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/transforms/external.ts>)
>>>>>>>>>>> to add support for multi-lang will bring in a lot of features (for 
>>>>>>>>>>> example,
>>>>>>>>>>> I/O connectors) for free.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Any and all feedback welcome and happy to submit a PR if folks
>>>>>>>>>>>> are interested, though the "Swift Way" would be to have it in its 
>>>>>>>>>>>> own repo
>>>>>>>>>>>> so that it can easily be used from the Swift Package Manager.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> +1 for creating a PR (may be as a draft initially). Also it'll
>>>>>>>>>>> be easier to comment on a PR :)
>>>>>>>>>>>
>>>>>>>>>>> - Cham
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> [2]
>>>>>>>>>>> [3]
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> B
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to