Gotcha, I think there's a fairly easy solution to link input and output streams.... Let me try it out... might even be possible to have both element and stream-wise closure pardos. Definitely possible to have that at the DoFn level (called SerializableFn in the SDK because I want to use @DoFn as a macro)
On Thu, Aug 24, 2023 at 1:09 PM Robert Bradshaw <rober...@google.com> wrote: > On Thu, Aug 24, 2023 at 12:58 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> >> >> On Thu, Aug 24, 2023 at 12:27 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> I would like to figure out a way to get the stream-y interface to work, >>> as I think it's more natural overall. >>> >>> One hypothesis is that if any elements are carried over loop iterations, >>> there will likely be some that are carried over beyond the loop (after all >>> the callee doesn't know when the loop is supposed to end). We could reject >>> "plain" elements that are emitted after this point, requiring one to emit >>> timestamp-windowed-values. >>> >> >> Are you assuming that the same stream (or overlapping sets of data) are >> pushed to multiple workers ? I thought that the set of data streamed here >> are the data that belong to the current bundle (hence already assigned to >> the current worker) so any output from the current bundle invocation would >> be a valid output of that bundle. >> >>> > Yes, the content of the stream is exactly the contents of the bundle. The > question is how to do the input_element:output_element correlation for > automatically propagating metadata. > > >> Related to this, we could enforce that the only (user-accessible) way to >>> get such a timestamped value is to start with one, e.g. a >>> WindowedValue<T>.withValue(O) produces a WindowedValue<O> with the same >>> metadata but a new value. Thus a user wanting to do anything "fancy" would >>> have to explicitly request iteration over these windowed values rather than >>> over the raw elements. (This is also forward compatible with expanding the >>> metadata that can get attached, e.g. pane infos, and makes the right thing >>> the easiest/most natural.) >>> >>> On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis <byronel...@google.com> >>> wrote: >>> >>>> Ah, that is a good point—being element-wise would make managing windows >>>> and time stamps easier for the user. Fortunately it’s a fairly easy change >>>> to make and maybe even less typing for the user. I was originally thinking >>>> side inputs and metrics would happen outside the loop, but I think you want >>>> a class and not a closure at that point for sanity. >>>> >>>> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> Ah, I see. >>>>> >>>>> Yeah, I've thought about using an iterable for the whole bundle rather >>>>> than start/finish bundle callbacks, but one of the questions is how that >>>>> would impact implicit passing of the timestamp (and other) metadata from >>>>> input elements to output elements. (You can of course attach the metadata >>>>> to any output that happens in the loop body, but it's very easy to >>>>> implicitly to break the 1:1 relationship here (e.g. by doing buffering or >>>>> otherwise modifying local state) and this would be hard to detect. (I >>>>> suppose trying to output after the loop finishes could require >>>>> something more explicit). >>>>> >>>>> >>>>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis <byronel...@google.com> >>>>> wrote: >>>>> >>>>>> Oh, I also forgot to mention that I included element-wise collection >>>>>> operations like "map" that eliminate the need for pardo in many cases. >>>>>> the >>>>>> groupBy command is actually a map + groupByKey under the hood. That was >>>>>> to >>>>>> be more consistent with Swift's collection protocol (and is also why >>>>>> PCollection and PCollectionStream are different types... PCollection >>>>>> implements map and friends as pipeline construction operations whereas >>>>>> PCollectionStream is an actual stream) >>>>>> >>>>>> I just happened to push some "IO primitives" that uses map rather >>>>>> than pardo in a couple of places to do a true wordcount using good ol' >>>>>> Shakespeare and very very primitive GCS IO. >>>>>> >>>>>> Best, >>>>>> B >>>>>> >>>>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis <byronel...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite a >>>>>>> bit before settling on where I ended up. Ultimately I decided to go with >>>>>>> something that felt more Swift-y than anything else which means that >>>>>>> rather >>>>>>> than dealing with a single element like you do in the other SDKs you're >>>>>>> dealing with a stream of elements (which of course will often be of size >>>>>>> 1). That's a really natural paradigm in the Swift world especially with >>>>>>> the >>>>>>> async / await structures. So when you see something like: >>>>>>> >>>>>>> pardo(name:"Read Files") { filenames,output,errors in >>>>>>> >>>>>>> for try await (filename,_,_) in filenames { >>>>>>> ... >>>>>>> output.emit(data) >>>>>>> >>>>>>> } >>>>>>> >>>>>>> filenames is the input stream and then output and errors are both >>>>>>> output streams. In theory you can have as many output streams as you >>>>>>> like >>>>>>> though at the moment there's a compiler bug in the new type pack feature >>>>>>> that limits it to "as many as I felt like supporting". Presumably this >>>>>>> will >>>>>>> get fixed before the official 5.9 release which will probably be in the >>>>>>> October timeframe if history is any guide) >>>>>>> >>>>>>> If you had parameterization you wanted to send that would look like >>>>>>> pardo("Parameter") { param,filenames,output,error in ... } where "param" >>>>>>> would take on the value of "Parameter." All of this is being >>>>>>> typechecked at >>>>>>> compile time BTW. >>>>>>> >>>>>>> >>>>>>> the (filename,_,_) is a tuple spreading construct like you have in >>>>>>> ES6 and other things where "_" is Swift for "ignore." In this case >>>>>>> PCollectionStreams have an element signature of (Of,Date,Window) so you >>>>>>> can >>>>>>> optionally extract the timestamp and the window if you want to >>>>>>> manipulate >>>>>>> it somehow. >>>>>>> >>>>>>> That said it would also be natural to provide elementwise pardos--- >>>>>>> that would probably mean having explicit type signatures in the >>>>>>> closure. I >>>>>>> had that at one point, but it felt less natural the more I used it. I'm >>>>>>> also slowly working towards adding a more "traditional" DoFn >>>>>>> implementation >>>>>>> approach where you implement the DoFn as an object type. In that case it >>>>>>> would be very very easy to support both by having a default stream >>>>>>> implementation call the equivalent of processElement. To make that >>>>>>> performant I need to implement an @DoFn macro and I just haven't gotten >>>>>>> to >>>>>>> it yet. >>>>>>> >>>>>>> It's a bit more work and I've been prioritizing implementing >>>>>>> composite and external transforms for the reasons you suggest. :-) I've >>>>>>> got >>>>>>> the basics of a composite transform (there's an equivalent wordcount >>>>>>> example) and am hooking it into the pipeline generation, which should >>>>>>> also >>>>>>> give me everything I need to successfully hook in external transforms as >>>>>>> well. That will give me the jump on IOs as you say. I can also treat the >>>>>>> pipeline itself as a composite transform which lets me get rid of the >>>>>>> Pipeline { pipeline in ... } and just instead have things attach >>>>>>> themselves >>>>>>> to the pipeline implicitly. >>>>>>> >>>>>>> That said, there are some interesting IO possibilities that would be >>>>>>> Swift native. In particularly, I've been looking at the native Swift >>>>>>> binding for DuckDB (which is C++ based). DuckDB is SQL based but not >>>>>>> distributed in the same was as, say, Beam SQL... but it would allow for >>>>>>> SQL >>>>>>> statements on individual files with projection pushdown supported for >>>>>>> things like Parquet which could have some cool and performant data lake >>>>>>> applications. I'll probably do a couple of the simpler IOs as >>>>>>> well---there's a Swift AWS SDK binding that's pretty good that would >>>>>>> give >>>>>>> me S3 and there's a Cloud auth library as well that makes it pretty >>>>>>> easy to >>>>>>> work with GCS. >>>>>>> >>>>>>> In any case, I'm updating the branch as I find a minute here and >>>>>>> there. >>>>>>> >>>>>>> Best, >>>>>>> B >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Aug 23, 2023 at 5:02 PM Robert Bradshaw <rober...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Neat. >>>>>>>> >>>>>>>> Nothing like writing and SDK to actually understand how the FnAPI >>>>>>>> works :). I like the use of groupBy. I have to admit I'm a bit >>>>>>>> mystified by >>>>>>>> the syntax for parDo (I don't know swift at all which is probably >>>>>>>> tripping >>>>>>>> me up). The addition of external (cross-language) transforms could let >>>>>>>> you >>>>>>>> steal everything (e.g. IOs) pretty quickly from other SDKs. >>>>>>>> >>>>>>>> On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user < >>>>>>>> u...@beam.apache.org> wrote: >>>>>>>> >>>>>>>>> For everyone who is interested, here's the draft PR: >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/pull/28062 >>>>>>>>> >>>>>>>>> I haven't had a chance to test it on my M1 machine yet though >>>>>>>>> (there's a good chance there are a few places that need to properly >>>>>>>>> address >>>>>>>>> endianness. Specifically timestamps in windowed values and length in >>>>>>>>> iterable coders as those both use specifically bigendian >>>>>>>>> representations) >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis <byronel...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Cham, >>>>>>>>>> >>>>>>>>>> Definitely happy to open a draft PR so folks can >>>>>>>>>> comment---there's not as much code as it looks like since most of >>>>>>>>>> the LOC >>>>>>>>>> is just generated protobuf. As for the support, I definitely want to >>>>>>>>>> add >>>>>>>>>> external transforms and may actually add that support before adding >>>>>>>>>> the >>>>>>>>>> ability to make composites in the language itself. With the way the >>>>>>>>>> SDK is >>>>>>>>>> laid out adding composites to the pipeline graph is a separate >>>>>>>>>> operation >>>>>>>>>> than defining a composite. >>>>>>>>>> >>>>>>>>>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath < >>>>>>>>>> chamik...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks Byron. This sounds great. I wonder if there is interest >>>>>>>>>>> in Swift SDK from folks currently subscribed to the +user >>>>>>>>>>> <u...@beam.apache.org> list. >>>>>>>>>>> >>>>>>>>>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev < >>>>>>>>>>> dev@beam.apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello everyone, >>>>>>>>>>>> >>>>>>>>>>>> A couple of months ago I decided that I wanted to really >>>>>>>>>>>> understand how the Beam FnApi works and how it interacts with the >>>>>>>>>>>> Portable >>>>>>>>>>>> Runner. For me at least that usually means I need to write some >>>>>>>>>>>> code so I >>>>>>>>>>>> can see things happening in a debugger and to really prove to >>>>>>>>>>>> myself I >>>>>>>>>>>> understood what was going on I decided I couldn't use an existing >>>>>>>>>>>> SDK >>>>>>>>>>>> language to do it since there would be the temptation to read some >>>>>>>>>>>> code and >>>>>>>>>>>> convince myself that I actually understood what was going on. >>>>>>>>>>>> >>>>>>>>>>>> One thing led to another and it turns out that to get a minimal >>>>>>>>>>>> FnApi integration going you end up writing a fair bit of an SDK. >>>>>>>>>>>> So I >>>>>>>>>>>> decided to take things to a point where I had an SDK that could >>>>>>>>>>>> execute a >>>>>>>>>>>> word count example via a portable runner backend. I've now reached >>>>>>>>>>>> that >>>>>>>>>>>> point and would like to submit my prototype SDK to the list for >>>>>>>>>>>> feedback. >>>>>>>>>>>> >>>>>>>>>>>> It's currently living in a branch on my fork here: >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift >>>>>>>>>>>> >>>>>>>>>>>> At the moment it runs via the most recent XCode Beta using >>>>>>>>>>>> Swift 5.9 on Intel Macs, but should also work using beta builds of >>>>>>>>>>>> 5.9 for >>>>>>>>>>>> Linux running on Intel hardware. I haven't had a chance to try it >>>>>>>>>>>> on ARM >>>>>>>>>>>> hardware and make sure all of the endian checks are complete. The >>>>>>>>>>>> "IntegrationTests.swift" file contains a word count example that >>>>>>>>>>>> reads some >>>>>>>>>>>> local files (as well as a missing file to exercise DLQ >>>>>>>>>>>> functionality) and >>>>>>>>>>>> output counts through two separate group by operations to get it >>>>>>>>>>>> past the >>>>>>>>>>>> "map reduce" size of pipeline. I've tested it against the Python >>>>>>>>>>>> Portable >>>>>>>>>>>> Runner. Since my goal was to learn FnApi there is no Direct Runner >>>>>>>>>>>> at this >>>>>>>>>>>> time. >>>>>>>>>>>> >>>>>>>>>>>> I've shown it to a couple of folks already and incorporated >>>>>>>>>>>> some of that feedback already (for example pardo was originally >>>>>>>>>>>> called dofn >>>>>>>>>>>> when defining pipelines). In general I've tried to make the API as >>>>>>>>>>>> "Swift-y" as possible, hence the heavy reliance on closures and >>>>>>>>>>>> while there >>>>>>>>>>>> aren't yet composite PTransforms there's the beginnings of what >>>>>>>>>>>> would be >>>>>>>>>>>> needed for a SwiftUI-like declarative API for creating them. >>>>>>>>>>>> >>>>>>>>>>>> There are of course a ton of missing bits still to be >>>>>>>>>>>> implemented, like counters, metrics, windowing, state, timers, etc. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This should be fine and we can get the code documented without >>>>>>>>>>> these features. I think support for composites and adding an >>>>>>>>>>> external >>>>>>>>>>> transform (see, Java >>>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java>, >>>>>>>>>>> Python >>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/python/apache_beam/transforms/external.py#L556>, >>>>>>>>>>> Go >>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/go/pkg/beam/xlang.go#L155>, >>>>>>>>>>> TypeScript >>>>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/transforms/external.ts>) >>>>>>>>>>> to add support for multi-lang will bring in a lot of features (for >>>>>>>>>>> example, >>>>>>>>>>> I/O connectors) for free. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Any and all feedback welcome and happy to submit a PR if folks >>>>>>>>>>>> are interested, though the "Swift Way" would be to have it in its >>>>>>>>>>>> own repo >>>>>>>>>>>> so that it can easily be used from the Swift Package Manager. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> +1 for creating a PR (may be as a draft initially). Also it'll >>>>>>>>>>> be easier to comment on a PR :) >>>>>>>>>>> >>>>>>>>>>> - Cham >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> [2] >>>>>>>>>>> [3] >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> B >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>