Thanks Austin, glad to hear the commentary isn’t just irritating. 🙂 On Sat, Aug 26, 2023 at 2:29 PM Austin Bennett <aus...@apache.org> wrote:
> This is great that is coming together, and am glad for the messages along > the way to understand process, choices, ...! > > > > On Fri, Aug 25, 2023, 2:04 PM Byron Ellis via user <user@beam.apache.org> > wrote: > >> Okay, after a brief detour through "get this working in the Flink >> Portable Runner" I think I have something pretty workable. >> >> PInput and POutput can actually be structs rather than protocols, which >> simplifies things quite a bit. It also allows us to use them with property >> wrappers for a SwiftUI-like experience if we want when defining DoFns >> (which is what I was originally intending to use them for). That also means >> the function signature you use for closures would match full-fledged DoFn >> definitions for the most part which is satisfying. >> >> >> >> On Thu, Aug 24, 2023 at 5:55 PM Byron Ellis <byronel...@google.com> >> wrote: >> >>> Okay, I tried a couple of different things. >>> >>> Implicitly passing the timestamp and window during iteration did not go >>> well. While physically possible it introduces an invisible side effect into >>> loop iteration which confused me when I tried to use it and I implemented >>> it. Also, I'm pretty sure there'd end up being some sort of race condition >>> nightmare continuing down that path. >>> >>> What I decided to do instead was the following: >>> >>> 1. Rename the existing "pardo" functions to "pstream" and require that >>> they always emit a window and timestamp along with their value. This >>> eliminates the side effect but lets us keep iteration in a bundle where >>> that might be convenient. For example, in my cheesy GCS implementation it >>> means that I can keep an OAuth token around for the lifetime of the bundle >>> as a local variable, which is convenient. It's a bit more typing for users >>> of pstream, but the expectation here is that if you're using pstream >>> functions You Know What You Are Doing and most people won't be using it >>> directly. >>> >>> 2. Introduce a new set of pardo functions (I didn't do all of them yet, >>> but enough to test the functionality and decide I liked it) which take a >>> function signature of (any PInput<InputType>,any POutput<OutputType>). >>> PInput takes the (InputType,Date,Window) tuple and converts it into a >>> struct with friendlier names. Not strictly necessary, but makes the code >>> nicer to read I think. POutput introduces emit functions that optionally >>> allow you to specify a timestamp and a window. If you don't for either one >>> it will take the timestamp and/or window of the input. >>> >>> Trying to use that was pretty pleasant to use so I think we should >>> continue down that path. If you'd like to see it in use, I reimplemented >>> map() and flatMap() in terms of this new pardo functionality. >>> >>> Code has been pushed to the branch/PR if you're interested in taking a >>> look. >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> On Thu, Aug 24, 2023 at 2:15 PM Byron Ellis <byronel...@google.com> >>> wrote: >>> >>>> Gotcha, I think there's a fairly easy solution to link input and output >>>> streams.... Let me try it out... might even be possible to have both >>>> element and stream-wise closure pardos. Definitely possible to have that at >>>> the DoFn level (called SerializableFn in the SDK because I want to >>>> use @DoFn as a macro) >>>> >>>> On Thu, Aug 24, 2023 at 1:09 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> On Thu, Aug 24, 2023 at 12:58 PM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Thu, Aug 24, 2023 at 12:27 PM Robert Bradshaw <rober...@google.com> >>>>>> wrote: >>>>>> >>>>>>> I would like to figure out a way to get the stream-y interface to >>>>>>> work, as I think it's more natural overall. >>>>>>> >>>>>>> One hypothesis is that if any elements are carried over loop >>>>>>> iterations, there will likely be some that are carried over beyond the >>>>>>> loop >>>>>>> (after all the callee doesn't know when the loop is supposed to end). We >>>>>>> could reject "plain" elements that are emitted after this point, >>>>>>> requiring >>>>>>> one to emit timestamp-windowed-values. >>>>>>> >>>>>> >>>>>> Are you assuming that the same stream (or overlapping sets of data) >>>>>> are pushed to multiple workers ? I thought that the set of data streamed >>>>>> here are the data that belong to the current bundle (hence already >>>>>> assigned >>>>>> to the current worker) so any output from the current bundle invocation >>>>>> would be a valid output of that bundle. >>>>>> >>>>>>> >>>>> Yes, the content of the stream is exactly the contents of the bundle. >>>>> The question is how to do the input_element:output_element correlation for >>>>> automatically propagating metadata. >>>>> >>>>> >>>>>> Related to this, we could enforce that the only (user-accessible) way >>>>>>> to get such a timestamped value is to start with one, e.g. a >>>>>>> WindowedValue<T>.withValue(O) produces a WindowedValue<O> with the same >>>>>>> metadata but a new value. Thus a user wanting to do anything "fancy" >>>>>>> would >>>>>>> have to explicitly request iteration over these windowed values rather >>>>>>> than >>>>>>> over the raw elements. (This is also forward compatible with expanding >>>>>>> the >>>>>>> metadata that can get attached, e.g. pane infos, and makes the right >>>>>>> thing >>>>>>> the easiest/most natural.) >>>>>>> >>>>>>> On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis <byronel...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Ah, that is a good point—being element-wise would make managing >>>>>>>> windows and time stamps easier for the user. Fortunately it’s a fairly >>>>>>>> easy >>>>>>>> change to make and maybe even less typing for the user. I was >>>>>>>> originally >>>>>>>> thinking side inputs and metrics would happen outside the loop, but I >>>>>>>> think >>>>>>>> you want a class and not a closure at that point for sanity. >>>>>>>> >>>>>>>> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw < >>>>>>>> rober...@google.com> wrote: >>>>>>>> >>>>>>>>> Ah, I see. >>>>>>>>> >>>>>>>>> Yeah, I've thought about using an iterable for the whole bundle >>>>>>>>> rather than start/finish bundle callbacks, but one of the questions >>>>>>>>> is how >>>>>>>>> that would impact implicit passing of the timestamp (and other) >>>>>>>>> metadata >>>>>>>>> from input elements to output elements. (You can of course attach the >>>>>>>>> metadata to any output that happens in the loop body, but it's very >>>>>>>>> easy to >>>>>>>>> implicitly to break the 1:1 relationship here (e.g. by doing >>>>>>>>> buffering or >>>>>>>>> otherwise modifying local state) and this would be hard to detect. (I >>>>>>>>> suppose trying to output after the loop finishes could require >>>>>>>>> something more explicit). >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis <byronel...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Oh, I also forgot to mention that I included element-wise >>>>>>>>>> collection operations like "map" that eliminate the need for pardo >>>>>>>>>> in many >>>>>>>>>> cases. the groupBy command is actually a map + groupByKey under the >>>>>>>>>> hood. >>>>>>>>>> That was to be more consistent with Swift's collection protocol (and >>>>>>>>>> is >>>>>>>>>> also why PCollection and PCollectionStream are different types... >>>>>>>>>> PCollection implements map and friends as pipeline construction >>>>>>>>>> operations >>>>>>>>>> whereas PCollectionStream is an actual stream) >>>>>>>>>> >>>>>>>>>> I just happened to push some "IO primitives" that uses map rather >>>>>>>>>> than pardo in a couple of places to do a true wordcount using good >>>>>>>>>> ol' >>>>>>>>>> Shakespeare and very very primitive GCS IO. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> B >>>>>>>>>> >>>>>>>>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis < >>>>>>>>>> byronel...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite >>>>>>>>>>> a bit before settling on where I ended up. Ultimately I decided to >>>>>>>>>>> go with >>>>>>>>>>> something that felt more Swift-y than anything else which means >>>>>>>>>>> that rather >>>>>>>>>>> than dealing with a single element like you do in the other SDKs >>>>>>>>>>> you're >>>>>>>>>>> dealing with a stream of elements (which of course will often be of >>>>>>>>>>> size >>>>>>>>>>> 1). That's a really natural paradigm in the Swift world especially >>>>>>>>>>> with the >>>>>>>>>>> async / await structures. So when you see something like: >>>>>>>>>>> >>>>>>>>>>> pardo(name:"Read Files") { filenames,output,errors in >>>>>>>>>>> >>>>>>>>>>> for try await (filename,_,_) in filenames { >>>>>>>>>>> ... >>>>>>>>>>> output.emit(data) >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> filenames is the input stream and then output and errors are >>>>>>>>>>> both output streams. In theory you can have as many output streams >>>>>>>>>>> as you >>>>>>>>>>> like though at the moment there's a compiler bug in the new type >>>>>>>>>>> pack >>>>>>>>>>> feature that limits it to "as many as I felt like supporting". >>>>>>>>>>> Presumably >>>>>>>>>>> this will get fixed before the official 5.9 release which will >>>>>>>>>>> probably be >>>>>>>>>>> in the October timeframe if history is any guide) >>>>>>>>>>> >>>>>>>>>>> If you had parameterization you wanted to send that would look >>>>>>>>>>> like pardo("Parameter") { param,filenames,output,error in ... } >>>>>>>>>>> where >>>>>>>>>>> "param" would take on the value of "Parameter." All of this is being >>>>>>>>>>> typechecked at compile time BTW. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> the (filename,_,_) is a tuple spreading construct like you have >>>>>>>>>>> in ES6 and other things where "_" is Swift for "ignore." In this >>>>>>>>>>> case >>>>>>>>>>> PCollectionStreams have an element signature of (Of,Date,Window) so >>>>>>>>>>> you can >>>>>>>>>>> optionally extract the timestamp and the window if you want to >>>>>>>>>>> manipulate >>>>>>>>>>> it somehow. >>>>>>>>>>> >>>>>>>>>>> That said it would also be natural to provide elementwise >>>>>>>>>>> pardos--- that would probably mean having explicit type signatures >>>>>>>>>>> in the >>>>>>>>>>> closure. I had that at one point, but it felt less natural the more >>>>>>>>>>> I used >>>>>>>>>>> it. I'm also slowly working towards adding a more "traditional" DoFn >>>>>>>>>>> implementation approach where you implement the DoFn as an object >>>>>>>>>>> type. In >>>>>>>>>>> that case it would be very very easy to support both by having a >>>>>>>>>>> default >>>>>>>>>>> stream implementation call the equivalent of processElement. To >>>>>>>>>>> make that >>>>>>>>>>> performant I need to implement an @DoFn macro and I just haven't >>>>>>>>>>> gotten to >>>>>>>>>>> it yet. >>>>>>>>>>> >>>>>>>>>>> It's a bit more work and I've been prioritizing implementing >>>>>>>>>>> composite and external transforms for the reasons you suggest. :-) >>>>>>>>>>> I've got >>>>>>>>>>> the basics of a composite transform (there's an equivalent wordcount >>>>>>>>>>> example) and am hooking it into the pipeline generation, which >>>>>>>>>>> should also >>>>>>>>>>> give me everything I need to successfully hook in external >>>>>>>>>>> transforms as >>>>>>>>>>> well. That will give me the jump on IOs as you say. I can also >>>>>>>>>>> treat the >>>>>>>>>>> pipeline itself as a composite transform which lets me get rid of >>>>>>>>>>> the >>>>>>>>>>> Pipeline { pipeline in ... } and just instead have things attach >>>>>>>>>>> themselves >>>>>>>>>>> to the pipeline implicitly. >>>>>>>>>>> >>>>>>>>>>> That said, there are some interesting IO possibilities that >>>>>>>>>>> would be Swift native. In particularly, I've been looking at the >>>>>>>>>>> native >>>>>>>>>>> Swift binding for DuckDB (which is C++ based). DuckDB is SQL based >>>>>>>>>>> but not >>>>>>>>>>> distributed in the same was as, say, Beam SQL... but it would allow >>>>>>>>>>> for SQL >>>>>>>>>>> statements on individual files with projection pushdown supported >>>>>>>>>>> for >>>>>>>>>>> things like Parquet which could have some cool and performant data >>>>>>>>>>> lake >>>>>>>>>>> applications. I'll probably do a couple of the simpler IOs as >>>>>>>>>>> well---there's a Swift AWS SDK binding that's pretty good that >>>>>>>>>>> would give >>>>>>>>>>> me S3 and there's a Cloud auth library as well that makes it pretty >>>>>>>>>>> easy to >>>>>>>>>>> work with GCS. >>>>>>>>>>> >>>>>>>>>>> In any case, I'm updating the branch as I find a minute here and >>>>>>>>>>> there. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> B >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Aug 23, 2023 at 5:02 PM Robert Bradshaw < >>>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Neat. >>>>>>>>>>>> >>>>>>>>>>>> Nothing like writing and SDK to actually understand how the >>>>>>>>>>>> FnAPI works :). I like the use of groupBy. I have to admit I'm a >>>>>>>>>>>> bit >>>>>>>>>>>> mystified by the syntax for parDo (I don't know swift at all which >>>>>>>>>>>> is >>>>>>>>>>>> probably tripping me up). The addition of external (cross-language) >>>>>>>>>>>> transforms could let you steal everything (e.g. IOs) pretty >>>>>>>>>>>> quickly from >>>>>>>>>>>> other SDKs. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user < >>>>>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> For everyone who is interested, here's the draft PR: >>>>>>>>>>>>> >>>>>>>>>>>>> https://github.com/apache/beam/pull/28062 >>>>>>>>>>>>> >>>>>>>>>>>>> I haven't had a chance to test it on my M1 machine yet though >>>>>>>>>>>>> (there's a good chance there are a few places that need to >>>>>>>>>>>>> properly address >>>>>>>>>>>>> endianness. Specifically timestamps in windowed values and length >>>>>>>>>>>>> in >>>>>>>>>>>>> iterable coders as those both use specifically bigendian >>>>>>>>>>>>> representations) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis < >>>>>>>>>>>>> byronel...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks Cham, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Definitely happy to open a draft PR so folks can >>>>>>>>>>>>>> comment---there's not as much code as it looks like since most >>>>>>>>>>>>>> of the LOC >>>>>>>>>>>>>> is just generated protobuf. As for the support, I definitely >>>>>>>>>>>>>> want to add >>>>>>>>>>>>>> external transforms and may actually add that support before >>>>>>>>>>>>>> adding the >>>>>>>>>>>>>> ability to make composites in the language itself. With the way >>>>>>>>>>>>>> the SDK is >>>>>>>>>>>>>> laid out adding composites to the pipeline graph is a separate >>>>>>>>>>>>>> operation >>>>>>>>>>>>>> than defining a composite. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath < >>>>>>>>>>>>>> chamik...@google.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks Byron. This sounds great. I wonder if there is >>>>>>>>>>>>>>> interest in Swift SDK from folks currently subscribed to the >>>>>>>>>>>>>>> +user <user@beam.apache.org> list. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev < >>>>>>>>>>>>>>> d...@beam.apache.org> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello everyone, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> A couple of months ago I decided that I wanted to really >>>>>>>>>>>>>>>> understand how the Beam FnApi works and how it interacts with >>>>>>>>>>>>>>>> the Portable >>>>>>>>>>>>>>>> Runner. For me at least that usually means I need to write >>>>>>>>>>>>>>>> some code so I >>>>>>>>>>>>>>>> can see things happening in a debugger and to really prove to >>>>>>>>>>>>>>>> myself I >>>>>>>>>>>>>>>> understood what was going on I decided I couldn't use an >>>>>>>>>>>>>>>> existing SDK >>>>>>>>>>>>>>>> language to do it since there would be the temptation to read >>>>>>>>>>>>>>>> some code and >>>>>>>>>>>>>>>> convince myself that I actually understood what was going on. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> One thing led to another and it turns out that to get a >>>>>>>>>>>>>>>> minimal FnApi integration going you end up writing a fair bit >>>>>>>>>>>>>>>> of an SDK. So >>>>>>>>>>>>>>>> I decided to take things to a point where I had an SDK that >>>>>>>>>>>>>>>> could execute a >>>>>>>>>>>>>>>> word count example via a portable runner backend. I've now >>>>>>>>>>>>>>>> reached that >>>>>>>>>>>>>>>> point and would like to submit my prototype SDK to the list >>>>>>>>>>>>>>>> for feedback. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It's currently living in a branch on my fork here: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> At the moment it runs via the most recent XCode Beta using >>>>>>>>>>>>>>>> Swift 5.9 on Intel Macs, but should also work using beta >>>>>>>>>>>>>>>> builds of 5.9 for >>>>>>>>>>>>>>>> Linux running on Intel hardware. I haven't had a chance to try >>>>>>>>>>>>>>>> it on ARM >>>>>>>>>>>>>>>> hardware and make sure all of the endian checks are complete. >>>>>>>>>>>>>>>> The >>>>>>>>>>>>>>>> "IntegrationTests.swift" file contains a word count example >>>>>>>>>>>>>>>> that reads some >>>>>>>>>>>>>>>> local files (as well as a missing file to exercise DLQ >>>>>>>>>>>>>>>> functionality) and >>>>>>>>>>>>>>>> output counts through two separate group by operations to get >>>>>>>>>>>>>>>> it past the >>>>>>>>>>>>>>>> "map reduce" size of pipeline. I've tested it against the >>>>>>>>>>>>>>>> Python Portable >>>>>>>>>>>>>>>> Runner. Since my goal was to learn FnApi there is no Direct >>>>>>>>>>>>>>>> Runner at this >>>>>>>>>>>>>>>> time. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I've shown it to a couple of folks already and incorporated >>>>>>>>>>>>>>>> some of that feedback already (for example pardo was >>>>>>>>>>>>>>>> originally called dofn >>>>>>>>>>>>>>>> when defining pipelines). In general I've tried to make the >>>>>>>>>>>>>>>> API as >>>>>>>>>>>>>>>> "Swift-y" as possible, hence the heavy reliance on closures >>>>>>>>>>>>>>>> and while there >>>>>>>>>>>>>>>> aren't yet composite PTransforms there's the beginnings of >>>>>>>>>>>>>>>> what would be >>>>>>>>>>>>>>>> needed for a SwiftUI-like declarative API for creating them. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> There are of course a ton of missing bits still to be >>>>>>>>>>>>>>>> implemented, like counters, metrics, windowing, state, timers, >>>>>>>>>>>>>>>> etc. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This should be fine and we can get the code documented >>>>>>>>>>>>>>> without these features. I think support for composites and >>>>>>>>>>>>>>> adding an >>>>>>>>>>>>>>> external transform (see, Java >>>>>>>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java>, >>>>>>>>>>>>>>> Python >>>>>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/python/apache_beam/transforms/external.py#L556>, >>>>>>>>>>>>>>> Go >>>>>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/go/pkg/beam/xlang.go#L155>, >>>>>>>>>>>>>>> TypeScript >>>>>>>>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/transforms/external.ts>) >>>>>>>>>>>>>>> to add support for multi-lang will bring in a lot of features >>>>>>>>>>>>>>> (for example, >>>>>>>>>>>>>>> I/O connectors) for free. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Any and all feedback welcome and happy to submit a PR if >>>>>>>>>>>>>>>> folks are interested, though the "Swift Way" would be to have >>>>>>>>>>>>>>>> it in its own >>>>>>>>>>>>>>>> repo so that it can easily be used from the Swift Package >>>>>>>>>>>>>>>> Manager. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> +1 for creating a PR (may be as a draft initially). Also >>>>>>>>>>>>>>> it'll be easier to comment on a PR :) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - Cham >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>> [3] >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> B >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>