Okay, after a brief detour through "get this working in the Flink Portable Runner" I think I have something pretty workable.
PInput and POutput can actually be structs rather than protocols, which simplifies things quite a bit. It also allows us to use them with property wrappers for a SwiftUI-like experience if we want when defining DoFns (which is what I was originally intending to use them for). That also means the function signature you use for closures would match full-fledged DoFn definitions for the most part which is satisfying. On Thu, Aug 24, 2023 at 5:55 PM Byron Ellis <byronel...@google.com> wrote: > Okay, I tried a couple of different things. > > Implicitly passing the timestamp and window during iteration did not go > well. While physically possible it introduces an invisible side effect into > loop iteration which confused me when I tried to use it and I implemented > it. Also, I'm pretty sure there'd end up being some sort of race condition > nightmare continuing down that path. > > What I decided to do instead was the following: > > 1. Rename the existing "pardo" functions to "pstream" and require that > they always emit a window and timestamp along with their value. This > eliminates the side effect but lets us keep iteration in a bundle where > that might be convenient. For example, in my cheesy GCS implementation it > means that I can keep an OAuth token around for the lifetime of the bundle > as a local variable, which is convenient. It's a bit more typing for users > of pstream, but the expectation here is that if you're using pstream > functions You Know What You Are Doing and most people won't be using it > directly. > > 2. Introduce a new set of pardo functions (I didn't do all of them yet, > but enough to test the functionality and decide I liked it) which take a > function signature of (any PInput<InputType>,any POutput<OutputType>). > PInput takes the (InputType,Date,Window) tuple and converts it into a > struct with friendlier names. Not strictly necessary, but makes the code > nicer to read I think. POutput introduces emit functions that optionally > allow you to specify a timestamp and a window. If you don't for either one > it will take the timestamp and/or window of the input. > > Trying to use that was pretty pleasant to use so I think we should > continue down that path. If you'd like to see it in use, I reimplemented > map() and flatMap() in terms of this new pardo functionality. > > Code has been pushed to the branch/PR if you're interested in taking a > look. > > > > > > > > > > On Thu, Aug 24, 2023 at 2:15 PM Byron Ellis <byronel...@google.com> wrote: > >> Gotcha, I think there's a fairly easy solution to link input and output >> streams.... Let me try it out... might even be possible to have both >> element and stream-wise closure pardos. Definitely possible to have that at >> the DoFn level (called SerializableFn in the SDK because I want to >> use @DoFn as a macro) >> >> On Thu, Aug 24, 2023 at 1:09 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> On Thu, Aug 24, 2023 at 12:58 PM Chamikara Jayalath < >>> chamik...@google.com> wrote: >>> >>>> >>>> >>>> On Thu, Aug 24, 2023 at 12:27 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> I would like to figure out a way to get the stream-y interface to >>>>> work, as I think it's more natural overall. >>>>> >>>>> One hypothesis is that if any elements are carried over loop >>>>> iterations, there will likely be some that are carried over beyond the >>>>> loop >>>>> (after all the callee doesn't know when the loop is supposed to end). We >>>>> could reject "plain" elements that are emitted after this point, requiring >>>>> one to emit timestamp-windowed-values. >>>>> >>>> >>>> Are you assuming that the same stream (or overlapping sets of data) are >>>> pushed to multiple workers ? I thought that the set of data streamed here >>>> are the data that belong to the current bundle (hence already assigned to >>>> the current worker) so any output from the current bundle invocation would >>>> be a valid output of that bundle. >>>> >>>>> >>> Yes, the content of the stream is exactly the contents of the bundle. >>> The question is how to do the input_element:output_element correlation for >>> automatically propagating metadata. >>> >>> >>>> Related to this, we could enforce that the only (user-accessible) way >>>>> to get such a timestamped value is to start with one, e.g. a >>>>> WindowedValue<T>.withValue(O) produces a WindowedValue<O> with the same >>>>> metadata but a new value. Thus a user wanting to do anything "fancy" would >>>>> have to explicitly request iteration over these windowed values rather >>>>> than >>>>> over the raw elements. (This is also forward compatible with expanding the >>>>> metadata that can get attached, e.g. pane infos, and makes the right thing >>>>> the easiest/most natural.) >>>>> >>>>> On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis <byronel...@google.com> >>>>> wrote: >>>>> >>>>>> Ah, that is a good point—being element-wise would make managing >>>>>> windows and time stamps easier for the user. Fortunately it’s a fairly >>>>>> easy >>>>>> change to make and maybe even less typing for the user. I was originally >>>>>> thinking side inputs and metrics would happen outside the loop, but I >>>>>> think >>>>>> you want a class and not a closure at that point for sanity. >>>>>> >>>>>> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw <rober...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Ah, I see. >>>>>>> >>>>>>> Yeah, I've thought about using an iterable for the whole bundle >>>>>>> rather than start/finish bundle callbacks, but one of the questions is >>>>>>> how >>>>>>> that would impact implicit passing of the timestamp (and other) metadata >>>>>>> from input elements to output elements. (You can of course attach the >>>>>>> metadata to any output that happens in the loop body, but it's very >>>>>>> easy to >>>>>>> implicitly to break the 1:1 relationship here (e.g. by doing buffering >>>>>>> or >>>>>>> otherwise modifying local state) and this would be hard to detect. (I >>>>>>> suppose trying to output after the loop finishes could require >>>>>>> something more explicit). >>>>>>> >>>>>>> >>>>>>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis <byronel...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Oh, I also forgot to mention that I included element-wise >>>>>>>> collection operations like "map" that eliminate the need for pardo in >>>>>>>> many >>>>>>>> cases. the groupBy command is actually a map + groupByKey under the >>>>>>>> hood. >>>>>>>> That was to be more consistent with Swift's collection protocol (and is >>>>>>>> also why PCollection and PCollectionStream are different types... >>>>>>>> PCollection implements map and friends as pipeline construction >>>>>>>> operations >>>>>>>> whereas PCollectionStream is an actual stream) >>>>>>>> >>>>>>>> I just happened to push some "IO primitives" that uses map rather >>>>>>>> than pardo in a couple of places to do a true wordcount using good ol' >>>>>>>> Shakespeare and very very primitive GCS IO. >>>>>>>> >>>>>>>> Best, >>>>>>>> B >>>>>>>> >>>>>>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis <byronel...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite a >>>>>>>>> bit before settling on where I ended up. Ultimately I decided to go >>>>>>>>> with >>>>>>>>> something that felt more Swift-y than anything else which means that >>>>>>>>> rather >>>>>>>>> than dealing with a single element like you do in the other SDKs >>>>>>>>> you're >>>>>>>>> dealing with a stream of elements (which of course will often be of >>>>>>>>> size >>>>>>>>> 1). That's a really natural paradigm in the Swift world especially >>>>>>>>> with the >>>>>>>>> async / await structures. So when you see something like: >>>>>>>>> >>>>>>>>> pardo(name:"Read Files") { filenames,output,errors in >>>>>>>>> >>>>>>>>> for try await (filename,_,_) in filenames { >>>>>>>>> ... >>>>>>>>> output.emit(data) >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> filenames is the input stream and then output and errors are both >>>>>>>>> output streams. In theory you can have as many output streams as you >>>>>>>>> like >>>>>>>>> though at the moment there's a compiler bug in the new type pack >>>>>>>>> feature >>>>>>>>> that limits it to "as many as I felt like supporting". Presumably >>>>>>>>> this will >>>>>>>>> get fixed before the official 5.9 release which will probably be in >>>>>>>>> the >>>>>>>>> October timeframe if history is any guide) >>>>>>>>> >>>>>>>>> If you had parameterization you wanted to send that would look >>>>>>>>> like pardo("Parameter") { param,filenames,output,error in ... } where >>>>>>>>> "param" would take on the value of "Parameter." All of this is being >>>>>>>>> typechecked at compile time BTW. >>>>>>>>> >>>>>>>>> >>>>>>>>> the (filename,_,_) is a tuple spreading construct like you have in >>>>>>>>> ES6 and other things where "_" is Swift for "ignore." In this case >>>>>>>>> PCollectionStreams have an element signature of (Of,Date,Window) so >>>>>>>>> you can >>>>>>>>> optionally extract the timestamp and the window if you want to >>>>>>>>> manipulate >>>>>>>>> it somehow. >>>>>>>>> >>>>>>>>> That said it would also be natural to provide elementwise >>>>>>>>> pardos--- that would probably mean having explicit type signatures in >>>>>>>>> the >>>>>>>>> closure. I had that at one point, but it felt less natural the more I >>>>>>>>> used >>>>>>>>> it. I'm also slowly working towards adding a more "traditional" DoFn >>>>>>>>> implementation approach where you implement the DoFn as an object >>>>>>>>> type. In >>>>>>>>> that case it would be very very easy to support both by having a >>>>>>>>> default >>>>>>>>> stream implementation call the equivalent of processElement. To make >>>>>>>>> that >>>>>>>>> performant I need to implement an @DoFn macro and I just haven't >>>>>>>>> gotten to >>>>>>>>> it yet. >>>>>>>>> >>>>>>>>> It's a bit more work and I've been prioritizing implementing >>>>>>>>> composite and external transforms for the reasons you suggest. :-) >>>>>>>>> I've got >>>>>>>>> the basics of a composite transform (there's an equivalent wordcount >>>>>>>>> example) and am hooking it into the pipeline generation, which should >>>>>>>>> also >>>>>>>>> give me everything I need to successfully hook in external transforms >>>>>>>>> as >>>>>>>>> well. That will give me the jump on IOs as you say. I can also treat >>>>>>>>> the >>>>>>>>> pipeline itself as a composite transform which lets me get rid of the >>>>>>>>> Pipeline { pipeline in ... } and just instead have things attach >>>>>>>>> themselves >>>>>>>>> to the pipeline implicitly. >>>>>>>>> >>>>>>>>> That said, there are some interesting IO possibilities that would >>>>>>>>> be Swift native. In particularly, I've been looking at the native >>>>>>>>> Swift >>>>>>>>> binding for DuckDB (which is C++ based). DuckDB is SQL based but not >>>>>>>>> distributed in the same was as, say, Beam SQL... but it would allow >>>>>>>>> for SQL >>>>>>>>> statements on individual files with projection pushdown supported for >>>>>>>>> things like Parquet which could have some cool and performant data >>>>>>>>> lake >>>>>>>>> applications. I'll probably do a couple of the simpler IOs as >>>>>>>>> well---there's a Swift AWS SDK binding that's pretty good that would >>>>>>>>> give >>>>>>>>> me S3 and there's a Cloud auth library as well that makes it pretty >>>>>>>>> easy to >>>>>>>>> work with GCS. >>>>>>>>> >>>>>>>>> In any case, I'm updating the branch as I find a minute here and >>>>>>>>> there. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> B >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Aug 23, 2023 at 5:02 PM Robert Bradshaw < >>>>>>>>> rober...@google.com> wrote: >>>>>>>>> >>>>>>>>>> Neat. >>>>>>>>>> >>>>>>>>>> Nothing like writing and SDK to actually understand how the FnAPI >>>>>>>>>> works :). I like the use of groupBy. I have to admit I'm a bit >>>>>>>>>> mystified by >>>>>>>>>> the syntax for parDo (I don't know swift at all which is probably >>>>>>>>>> tripping >>>>>>>>>> me up). The addition of external (cross-language) transforms could >>>>>>>>>> let you >>>>>>>>>> steal everything (e.g. IOs) pretty quickly from other SDKs. >>>>>>>>>> >>>>>>>>>> On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user < >>>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> For everyone who is interested, here's the draft PR: >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/beam/pull/28062 >>>>>>>>>>> >>>>>>>>>>> I haven't had a chance to test it on my M1 machine yet though >>>>>>>>>>> (there's a good chance there are a few places that need to properly >>>>>>>>>>> address >>>>>>>>>>> endianness. Specifically timestamps in windowed values and length in >>>>>>>>>>> iterable coders as those both use specifically bigendian >>>>>>>>>>> representations) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis < >>>>>>>>>>> byronel...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks Cham, >>>>>>>>>>>> >>>>>>>>>>>> Definitely happy to open a draft PR so folks can >>>>>>>>>>>> comment---there's not as much code as it looks like since most of >>>>>>>>>>>> the LOC >>>>>>>>>>>> is just generated protobuf. As for the support, I definitely want >>>>>>>>>>>> to add >>>>>>>>>>>> external transforms and may actually add that support before >>>>>>>>>>>> adding the >>>>>>>>>>>> ability to make composites in the language itself. With the way >>>>>>>>>>>> the SDK is >>>>>>>>>>>> laid out adding composites to the pipeline graph is a separate >>>>>>>>>>>> operation >>>>>>>>>>>> than defining a composite. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath < >>>>>>>>>>>> chamik...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks Byron. This sounds great. I wonder if there is interest >>>>>>>>>>>>> in Swift SDK from folks currently subscribed to the +user >>>>>>>>>>>>> <user@beam.apache.org> list. >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev < >>>>>>>>>>>>> d...@beam.apache.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hello everyone, >>>>>>>>>>>>>> >>>>>>>>>>>>>> A couple of months ago I decided that I wanted to really >>>>>>>>>>>>>> understand how the Beam FnApi works and how it interacts with >>>>>>>>>>>>>> the Portable >>>>>>>>>>>>>> Runner. For me at least that usually means I need to write some >>>>>>>>>>>>>> code so I >>>>>>>>>>>>>> can see things happening in a debugger and to really prove to >>>>>>>>>>>>>> myself I >>>>>>>>>>>>>> understood what was going on I decided I couldn't use an >>>>>>>>>>>>>> existing SDK >>>>>>>>>>>>>> language to do it since there would be the temptation to read >>>>>>>>>>>>>> some code and >>>>>>>>>>>>>> convince myself that I actually understood what was going on. >>>>>>>>>>>>>> >>>>>>>>>>>>>> One thing led to another and it turns out that to get a >>>>>>>>>>>>>> minimal FnApi integration going you end up writing a fair bit of >>>>>>>>>>>>>> an SDK. So >>>>>>>>>>>>>> I decided to take things to a point where I had an SDK that >>>>>>>>>>>>>> could execute a >>>>>>>>>>>>>> word count example via a portable runner backend. I've now >>>>>>>>>>>>>> reached that >>>>>>>>>>>>>> point and would like to submit my prototype SDK to the list for >>>>>>>>>>>>>> feedback. >>>>>>>>>>>>>> >>>>>>>>>>>>>> It's currently living in a branch on my fork here: >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift >>>>>>>>>>>>>> >>>>>>>>>>>>>> At the moment it runs via the most recent XCode Beta using >>>>>>>>>>>>>> Swift 5.9 on Intel Macs, but should also work using beta builds >>>>>>>>>>>>>> of 5.9 for >>>>>>>>>>>>>> Linux running on Intel hardware. I haven't had a chance to try >>>>>>>>>>>>>> it on ARM >>>>>>>>>>>>>> hardware and make sure all of the endian checks are complete. The >>>>>>>>>>>>>> "IntegrationTests.swift" file contains a word count example that >>>>>>>>>>>>>> reads some >>>>>>>>>>>>>> local files (as well as a missing file to exercise DLQ >>>>>>>>>>>>>> functionality) and >>>>>>>>>>>>>> output counts through two separate group by operations to get it >>>>>>>>>>>>>> past the >>>>>>>>>>>>>> "map reduce" size of pipeline. I've tested it against the Python >>>>>>>>>>>>>> Portable >>>>>>>>>>>>>> Runner. Since my goal was to learn FnApi there is no Direct >>>>>>>>>>>>>> Runner at this >>>>>>>>>>>>>> time. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I've shown it to a couple of folks already and incorporated >>>>>>>>>>>>>> some of that feedback already (for example pardo was originally >>>>>>>>>>>>>> called dofn >>>>>>>>>>>>>> when defining pipelines). In general I've tried to make the API >>>>>>>>>>>>>> as >>>>>>>>>>>>>> "Swift-y" as possible, hence the heavy reliance on closures and >>>>>>>>>>>>>> while there >>>>>>>>>>>>>> aren't yet composite PTransforms there's the beginnings of what >>>>>>>>>>>>>> would be >>>>>>>>>>>>>> needed for a SwiftUI-like declarative API for creating them. >>>>>>>>>>>>>> >>>>>>>>>>>>>> There are of course a ton of missing bits still to be >>>>>>>>>>>>>> implemented, like counters, metrics, windowing, state, timers, >>>>>>>>>>>>>> etc. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> This should be fine and we can get the code documented without >>>>>>>>>>>>> these features. I think support for composites and adding an >>>>>>>>>>>>> external >>>>>>>>>>>>> transform (see, Java >>>>>>>>>>>>> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/External.java>, >>>>>>>>>>>>> Python >>>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/python/apache_beam/transforms/external.py#L556>, >>>>>>>>>>>>> Go >>>>>>>>>>>>> <https://github.com/apache/beam/blob/c7b7921185686da573f76ce7320817c32375c7d0/sdks/go/pkg/beam/xlang.go#L155>, >>>>>>>>>>>>> TypeScript >>>>>>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/transforms/external.ts>) >>>>>>>>>>>>> to add support for multi-lang will bring in a lot of features >>>>>>>>>>>>> (for example, >>>>>>>>>>>>> I/O connectors) for free. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any and all feedback welcome and happy to submit a PR if >>>>>>>>>>>>>> folks are interested, though the "Swift Way" would be to have it >>>>>>>>>>>>>> in its own >>>>>>>>>>>>>> repo so that it can easily be used from the Swift Package >>>>>>>>>>>>>> Manager. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> +1 for creating a PR (may be as a draft initially). Also it'll >>>>>>>>>>>>> be easier to comment on a PR :) >>>>>>>>>>>>> >>>>>>>>>>>>> - Cham >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> [2] >>>>>>>>>>>>> [3] >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> B >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>