On Thu, Aug 24, 2023 at 12:58 PM Chamikara Jayalath <chamik...@google.com> wrote:
> > > On Thu, Aug 24, 2023 at 12:27 PM Robert Bradshaw <rober...@google.com> > wrote: > >> I would like to figure out a way to get the stream-y interface to work, >> as I think it's more natural overall. >> >> One hypothesis is that if any elements are carried over loop iterations, >> there will likely be some that are carried over beyond the loop (after all >> the callee doesn't know when the loop is supposed to end). We could reject >> "plain" elements that are emitted after this point, requiring one to emit >> timestamp-windowed-values. >> > > Are you assuming that the same stream (or overlapping sets of data) are > pushed to multiple workers ? I thought that the set of data streamed here > are the data that belong to the current bundle (hence already assigned to > the current worker) so any output from the current bundle invocation would > be a valid output of that bundle. > >> Yes, the content of the stream is exactly the contents of the bundle. The question is how to do the input_element:output_element correlation for automatically propagating metadata. > Related to this, we could enforce that the only (user-accessible) way to >> get such a timestamped value is to start with one, e.g. a >> WindowedValue<T>.withValue(O) produces a WindowedValue<O> with the same >> metadata but a new value. Thus a user wanting to do anything "fancy" would >> have to explicitly request iteration over these windowed values rather than >> over the raw elements. (This is also forward compatible with expanding the >> metadata that can get attached, e.g. pane infos, and makes the right thing >> the easiest/most natural.) >> >> On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis <byronel...@google.com> >> wrote: >> >>> Ah, that is a good point—being element-wise would make managing windows >>> and time stamps easier for the user. Fortunately it’s a fairly easy change >>> to make and maybe even less typing for the user. I was originally thinking >>> side inputs and metrics would happen outside the loop, but I think you want >>> a class and not a closure at that point for sanity. >>> >>> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> Ah, I see. >>>> >>>> Yeah, I've thought about using an iterable for the whole bundle rather >>>> than start/finish bundle callbacks, but one of the questions is how that >>>> would impact implicit passing of the timestamp (and other) metadata from >>>> input elements to output elements. (You can of course attach the metadata >>>> to any output that happens in the loop body, but it's very easy to >>>> implicitly to break the 1:1 relationship here (e.g. by doing buffering or >>>> otherwise modifying local state) and this would be hard to detect. (I >>>> suppose trying to output after the loop finishes could require >>>> something more explicit). >>>> >>>> >>>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis <byronel...@google.com> >>>> wrote: >>>> >>>>> Oh, I also forgot to mention that I included element-wise collection >>>>> operations like "map" that eliminate the need for pardo in many cases. the >>>>> groupBy command is actually a map + groupByKey under the hood. That was to >>>>> be more consistent with Swift's collection protocol (and is also why >>>>> PCollection and PCollectionStream are different types... PCollection >>>>> implements map and friends as pipeline construction operations whereas >>>>> PCollectionStream is an actual stream) >>>>> >>>>> I just happened to push some "IO primitives" that uses map rather than >>>>> pardo in a couple of places to do a true wordcount using good ol' >>>>> Shakespeare and very very primitive GCS IO. >>>>> >>>>> Best, >>>>> B >>>>> >>>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis <byronel...@google.com> >>>>> wrote: >>>>> >>>>>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite a >>>>>> bit before settling on where I ended up. Ultimately I decided to go with >>>>>> something that felt more Swift-y than anything else which means that >>>>>> rather >>>>>> than dealing with a single element like you do in the other SDKs you're >>>>>> dealing with a stream of elements (which of course will often be of size >>>>>> 1). That's a really natural paradigm in the Swift world especially with >>>>>> the >>>>>> async / await structures. So when you see something like: >>>>>> >>>>>> pardo(name:"Read Files") { filenames,output,errors in >>>>>> >>>>>> for try await (filename,_,_) in filenames { >>>>>> ... >>>>>> output.emit(data) >>>>>> >>>>>> } >>>>>> >>>>>> filenames is the input stream and then output and errors are both >>>>>> output streams. In theory you can have as many output streams as you like >>>>>> though at the moment there's a compiler bug in the new type pack feature >>>>>> that limits it to "as many as I felt like supporting". Presumably this >>>>>> will >>>>>> get fixed before the official 5.9 release which will probably be in the >>>>>> October timeframe if history is any guide) >>>>>> >>>>>> If you had parameterization you wanted to send that would look like >>>>>> pardo("Parameter") { param,filenames,output,error in ... } where "param" >>>>>> would take on the value of "Parameter." All of this is being typechecked >>>>>> at >>>>>> compile time BTW. >>>>>> >>>>>> >>>>>> the (filename,_,_) is a tuple spreading construct like you have in >>>>>> ES6 and other things where "_" is Swift for "ignore." In this case >>>>>> PCollectionStreams have an element signature of (Of,Date,Window) so you >>>>>> can >>>>>> optionally extract the timestamp and the window if you want to manipulate >>>>>> it somehow. >>>>>> >>>>>> That said it would also be natural to provide elementwise pardos--- >>>>>> that would probably mean having explicit type signatures in the closure. >>>>>> I >>>>>> had that at one point, but it felt less natural the more I used it. I'm >>>>>> also slowly working towards adding a more "traditional" DoFn >>>>>> implementation >>>>>> approach where you implement the DoFn as an object type. In that case it >>>>>> would be very very easy to support both by having a default stream >>>>>> implementation call the equivalent of processElement. To make that >>>>>> performant I need to implement an @DoFn macro and I just haven't gotten >>>>>> to >>>>>> it yet. >>>>>> >>>>>> It's a bit more work and I've been prioritizing implementing >>>>>> composite and external transforms for the reasons you suggest. :-) I've >>>>>> got >>>>>> the basics of a composite transform (there's an equivalent wordcount >>>>>> example) and am hooking it into the pipeline generation, which should >>>>>> also >>>>>> give me everything I need to successfully hook in external transforms as >>>>>> well. That will give me the jump on IOs as you say. I can also treat the >>>>>> pipeline itself as a composite transform which lets me get rid of the >>>>>> Pipeline { pipeline in ... } and just instead have things attach >>>>>> themselves >>>>>> to the pipeline implicitly. >>>>>> >>>>>> That said, there are some interesting IO possibilities that would be >>>>>> Swift native. In particularly, I've been looking at the native Swift >>>>>> binding for DuckDB (which is C++ based). DuckDB is SQL based but not >>>>>> distributed in the same was as, say, Beam SQL... but it would allow for >>>>>> SQL >>>>>> statements on individual files with projection pushdown supported for >>>>>> things like Parquet which could have some cool and performant data lake >>>>>> applications. I'll probably do a couple of the simpler IOs as >>>>>> well---there's a Swift AWS SDK binding that's pretty good that would give >>>>>> me S3 and there's a Cloud auth library as well that makes it pretty easy >>>>>> to >>>>>> work with GCS. >>>>>> >>>>>> In any case, I'm updating the branch as I find a minute here and >>>>>> there. >>>>>> >>>>>> Best, >>>>>> B >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Aug 23, 2023 at 5:02 PM Robert Bradshaw <rober...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Neat. >>>>>>> >>>>>>> Nothing like writing and SDK to actually understand how the FnAPI >>>>>>> works :). I like the use of groupBy. I have to admit I'm a bit >>>>>>> mystified by >>>>>>> the syntax for parDo (I don't know swift at all which is probably >>>>>>> tripping >>>>>>> me up). The addition of external (cross-language) transforms could let >>>>>>> you >>>>>>> steal everything (e.g. IOs) pretty quickly from other SDKs. >>>>>>> >>>>>>> On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user < >>>>>>> user@beam.apache.org> wrote: >>>>>>> >>>>>>>> For everyone who is interested, here's the draft PR: >>>>>>>> >>>>>>>> https://github.com/apache/beam/pull/28062 >>>>>>>> >>>>>>>> I haven't had a chance to test it on my M1 machine yet though >>>>>>>> (there's a good chance there are a few places that need to properly >>>>>>>> address >>>>>>>> endianness. Specifically timestamps in windowed values and length in >>>>>>>> iterable coders as those both use specifically bigendian >>>>>>>> representations) >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis <byronel...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Cham, >>>>>>>>> >>>>>>>>> Definitely happy to open a draft PR so folks can comment---there's >>>>>>>>> not as much code as it looks like since most of the LOC is just >>>>>>>>> generated >>>>>>>>> protobuf. As for the support, I definitely want to add external >>>>>>>>> transforms >>>>>>>>> and may actually add that support before adding the ability to make >>>>>>>>> composites in the language itself. With the way the SDK is laid out >>>>>>>>> adding >>>>>>>>> composites to the pipeline graph is a separate operation than >>>>>>>>> defining a >>>>>>>>> composite. >>>>>>>>> >>>>>>>>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath < >>>>>>>>> chamik...@google.com> wrote: >>>>>>>>> >>>>>>>>>> Thanks Byron. This sounds great. I wonder if there is interest in >>>>>>>>>> Swift SDK from folks currently subscribed to the +user >>>>>>>>>> <user@beam.apache.org> list. >>>>>>>>>> >>>>>>>>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev < >>>>>>>>>> d...@beam.apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> Hello everyone, >>>>>>>>>>> >>>>>>>>>>> A couple of months ago I decided that I wanted to really >>>>>>>>>>> understand how the Beam FnApi works and how it interacts with the >>>>>>>>>>> Portable >>>>>>>>>>> Runner. For me at least that usually means I need to write some >>>>>>>>>>> code so I >>>>>>>>>>> can see things happening in a debugger and to really prove to >>>>>>>>>>> myself I >>>>>>>>>>> understood what was going on I decided I couldn't use an existing >>>>>>>>>>> SDK >>>>>>>>>>> language to do it since there would be the temptation to read some >>>>>>>>>>> code and >>>>>>>>>>> convince myself that I actually understood what was going on. >>>>>>>>>>> >>>>>>>>>>> One thing led to another and it turns out that to get a minimal >>>>>>>>>>> FnApi integration going you end up writing a fair bit of an SDK. So >>>>>>>>>>> I >>>>>>>>>>> decided to take things to a point where I had an SDK that could >>>>>>>>>>> execute a >>>>>>>>>>> word count example via a portable runner backend. I've now reached >>>>>>>>>>> that >>>>>>>>>>> point and would like to submit my prototype SDK to the list for >>>>>>>>>>> feedback. >>>>>>>>>>> >>>>>>>>>>> It's currently living in a branch on my fork here: >>>>>>>>>>> >>>>>>>>>>> https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift >>>>>>>>>>> >>>>>>>>>>> At the moment it runs via the most recent XCode Beta using Swift >>>>>>>>>>> 5.9 on Intel Macs, but should also work using beta builds of 5.9 >>>>>>>>>>> for Linux >>>>>>>>>>> running on Intel hardware. I haven't had a chance to try it on ARM >>>>>>>>>>> hardware >>>>>>>>>>> and make sure all of the endian checks are complete. The >>>>>>>>>>> "IntegrationTests.swift" file contains a word count example that >>>>>>>>>>> reads some >>>>>>>>>>> local files (as well as a missing file to exercise DLQ >>>>>>>>>>> functionality) and >>>>>>>>>>> output counts through two separate group by operations to get it >>>>>>>>>>> past the >>>>>>>>>>> "map reduce" size of pipeline. I've tested it against the Python >>>>>>>>>>> Portable >>>>>>>>>>> Runner. Since my goal was to learn FnApi there is no Direct Runner >>>>>>>>>>> at this >>>>>>>>>>> time. >>>>>>>>>>> >>>>>>>>>>> I've shown it to a couple of folks already and incorporated some >>>>>>>>>>> of that feedback already (for example pardo was originally called >>>>>>>>>>> dofn when >>>>>>>>>>> defining pipelines). In general I've tried to make the API as >>>>>>>>>>> "Swift-y" as >>>>>>>>>>> possible, hence the heavy reliance on closures and while there >>>>>>>>>>> aren't yet >>>>>>>>>>> composite PTransforms there's the beginnings of what would be >>>>>>>>>>> needed for a >>>>>>>>>>> SwiftUI-like declarative API for creating them. >>>>>>>>>>> >>>>>>>>>>> There are of course a ton of missing bits still to be >>>>>>>>>>> implemented, like counters, metrics, windowing, state, timers, etc. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> This should be fine and we can get the code documented without >>>>>>>>>> these features. I think support for composites and adding an external >>>>>>>>>> transform (see, Java >>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java>, >>>>>>>>>> Python >>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/python/apache_beam/transforms/external.py#L556>, >>>>>>>>>> Go >>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/go/pkg/beam/xlang.go#L155>, >>>>>>>>>> TypeScript >>>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/transforms/external.ts>) >>>>>>>>>> to add support for multi-lang will bring in a lot of features (for >>>>>>>>>> example, >>>>>>>>>> I/O connectors) for free. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Any and all feedback welcome and happy to submit a PR if folks >>>>>>>>>>> are interested, though the "Swift Way" would be to have it in its >>>>>>>>>>> own repo >>>>>>>>>>> so that it can easily be used from the Swift Package Manager. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> +1 for creating a PR (may be as a draft initially). Also it'll be >>>>>>>>>> easier to comment on a PR :) >>>>>>>>>> >>>>>>>>>> - Cham >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> [2] >>>>>>>>>> [3] >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> B >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>