I suggest that we write out a design of what schemas in go would look like and how it would interact with coders. We'll then be in a much better position to decide what the right short-term path forward is. Even if we decide it makes more sense to build up the coder support first, I think this will guide us; e.g. we can build up the coder support in a way that can be extended to full schemas later.
Writing up an overview design shouldn't take too much time and I think is definitely worth it. Reuven On Mon, Jan 7, 2019 at 2:12 PM Robert Burke <[email protected]> wrote: > Kenn has pointed out to me that Coders are not likely going to vanish in > the next while, in particular over the FnAPI, so having a coder registry > does remain useful, as described by an early adopter in another thread. > > On Fri, Jan 4, 2019, 10:51 AM Robert Burke <[email protected]> wrote: > >> I think you're right Kenn. >> >> Reuven alluded to the difficulty in inference of what to use between >> AtomicType and the rest, in particular Struct<Schema>. >> >> Go has the additional concerns around Pointer vs Non Pointer types which >> isn't a concern either Python or Java have, but has implications on >> pipeline efficiency that need addressing, in particular, being able to use >> them in a useful fashion in the Go SDK. >> >> I agree that long term, having schemas as a default codec would be hugely >> beneficial for readability, composability, and allows more processing to be >> on the Runner Harness side of a worker. (I'll save the rest of my thoughts >> on Schemas in Go for the other thread, and say no more of it here.) >> >> *Regarding my proposal for User Defined Coders:* >> >> To avoid users accidentally preventing themselves from using Schemas in >> the future, I need to remove the ability to override the default coder *(4). >> *Then instead of JSON coding by default *(5)*, the SDK should be doing >> Schema coding. The SDK is already doing the recursive type analysis on >> types at pipeline construction time, so it's not a huge stretch to support >> Schemas using that information in the future, once Runner & FnAPI support >> begins to exist. >> >> *(1)* doesn't seem to need changing, as this is the existing AtomicType >> definition Kenn pointed out. >> >> *(2)* is the specific AtomicType override. >> >> *(3) *is the broader Go specific override for Go's unique interface >> semantics. This most of the cases *(4)* would have covered anyway, but >> in a targeted way. >> >> This should still allow Go users to better control their pipeline, and >> associated performance implications (which is my goal in this change), >> while not making an overall incompatible choice for powerful beam features >> for the common case in the future. >> >> Does that sound right? >> >> On Fri, 4 Jan 2019 at 10:05 Kenneth Knowles <[email protected]> wrote: >> >>> On Thu, Jan 3, 2019 at 4:33 PM Reuven Lax <[email protected]> wrote: >>> >>>> If a user wants custom encoding for a primitive type, they can create a >>>> byte-array field and wrap that field with a Coder >>>> >>> >>> This is the crux of the issue, right? >>> >>> Roughly, today, we've got: >>> >>> Schema ::= [ (fieldname, Type) ] >>> >>> Type ::= AtomicType | Array<Type> | Map<Type, Type> | >>> Struct<Schema> >>> >>> AtomicType ::= bytes | int{16, 32, 64} | datetime | string | ... >>> >>> To fully replace custom encodings as they exist, you need: >>> >>> AtomicType ::= bytes<CustomCoder> | ... >>> >>> At this point, an SDK need not surface the concept of "Coder" to a user >>> at all outside the bytes field concept and the wire encoding and efficient >>> should be identical or nearly to what we do with coders today. PCollections >>> in such an SDK have schemas, not coders, so we have successfully turned it >>> completely inside-out relative to how the Java SDK does it. Is that what >>> you have in mind? >>> >>> I really like this, but I agree with Robert that this is a major change >>> that takes a bunch of work and a lot more collaborative thinking in design >>> docs if we hope to get it right/stable. >>> >>> Kenn >>> >>> >>>> (this is why I said that todays Coders are simply special cases); this >>>> should be very rare though, as users rarely should care how Beam encodes a >>>> long or a double. >>>> >>>>> >>>>> Offhand, Schemas seem to be an alternative to pipeline construction, >>>>> rather than coders for value serialization, allowing manual field >>>>> extraction code to be omitted. They do not appear to be a fundamental >>>>> approach to achieve it. For example, the grouping operation still needs to >>>>> encode the whole of the object as a value. >>>>> >>>> >>>> Schemas are properties of the data - essentially a Schema is the data >>>> type of a PCollection. In Java Schemas are also understood by ParDo, so you >>>> can write a ParDo like this: >>>> >>>> @ProcessElement >>>> public void process(@Field("user") String userId, @Field("country") >>>> String countryCode) { >>>> } >>>> >>>> These extra functionalities are part of the graph, but they are enabled >>>> by schemas. >>>> >>>>> >>>>> As mentioned, I'm hoping to have a solution for existing coders by >>>>> January's end, so waiting for your documentation doesn't work on that >>>>> timeline. >>>>> >>>> >>>> I don't think we need to wait for all the documentation to be written. >>>> >>>> >>>>> >>>>> That said, they aren't incompatible ideas as demonstrated by the Java >>>>> implementation. The Go SDK remains in an experimental state. We can change >>>>> things should the need arise in the next few months. Further, whenever >>>>> Generics >>>>> in Go >>>>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md> >>>>> crop up, the existing user surface and execution stack will need to be >>>>> re-written to take advantage of them anyway. That provides an opportunity >>>>> to invert Coder vs Schema dependence while getting a nice performance >>>>> boost, and cleaner code (and deleting much of my code generator). >>>>> >>>>> ---- >>>>> >>>>> Were I to implement schemas to get the same syntatic benefits as the >>>>> Java API, I'd be leveraging the field annotations Go has. This satisfies >>>>> the protocol buffer issue as well, since generated go protos have name & >>>>> json annotations. Schemas could be extracted that way. These are also >>>>> available to anything using static analysis for more direct generation of >>>>> accessors. The reflective approach would also work, which is excellent for >>>>> development purposes. >>>>> >>>>> The rote code that the schemas were replacing would be able to be >>>>> cobbled together into efficient DoFn and CombineFns for serialization. At >>>>> present, it seems like it could be implemented as a side package that uses >>>>> beam, rather than changing portions of the core beam Go packages, The real >>>>> trick would be to do so without "apply" since that's not how the Go SDK is >>>>> shaped. >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <[email protected]> wrote: >>>>> >>>>>> Reuven, it sounds great. I see there is a similar thing to Row coders >>>>>> happening in Apache Arrow <https://arrow.apache.org>, and there is a >>>>>> similarity between Apache Arrow Flight >>>>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23> >>>>>> and data exchange service in portability. How do you see these two things >>>>>> relate to each other in the long term? >>>>>> >>>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> The biggest advantage is actually readability and usability. A >>>>>>> secondary advantage is that it means that Go will be able to interact >>>>>>> seamlessly with BeamSQL, which would be a big win for Go. >>>>>>> >>>>>>> A schema is basically a way of saying that a record has a specific >>>>>>> set of (possibly nested, possibly repeated) fields. So for instance >>>>>>> let's >>>>>>> say that the user's type is a struct with fields named user, country, >>>>>>> purchaseCost. This allows us to provide transforms that operate on field >>>>>>> names. Some example (using the Java API): >>>>>>> >>>>>>> PCollection users = events.apply(Select.fields("user")); // Select >>>>>>> out only the user field. >>>>>>> >>>>>>> PCollection joinedEvents = >>>>>>> queries.apply(Join.innerJoin(clicks).byFields("user")); // Join two >>>>>>> PCollections by user. >>>>>>> >>>>>>> // For each country, calculate the total purchase cost as well as >>>>>>> the top 10 purchases. >>>>>>> // A new schema is created containing fields total_cost and >>>>>>> top_purchases, and rows are created with the aggregation results. >>>>>>> PCollection purchaseStatistics = events.apply( >>>>>>> Group.byFieldNames("country") >>>>>>> .aggregateField("purchaseCost", Sum.ofLongs(), >>>>>>> "total_cost")) >>>>>>> .aggregateField("purchaseCost", >>>>>>> Top.largestLongs(10), "top_purchases")) >>>>>>> >>>>>>> >>>>>>> This is far more readable than what we have today, and what unlocks >>>>>>> this is that Beam actually knows the structure of the record instead of >>>>>>> assuming records are uncrackable blobs. >>>>>>> >>>>>>> Note that a coder is basically a special case of a schema that has a >>>>>>> single field. >>>>>>> >>>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user >>>>>>> types into schemas. We use reflection to analyze many user types (e.g. >>>>>>> simple POJO structs, JavaBean classes, Avro records, protocol buffers, >>>>>>> etc.) to determine the schema, however this is done only when the graph >>>>>>> is >>>>>>> initially generated. We do use code generation (in Java we do bytecode >>>>>>> generation) to make this somewhat more efficient. I'm willing to bet >>>>>>> that >>>>>>> the code generator you've written for structs could be very easily >>>>>>> modified >>>>>>> for schemas instead, so it would not be wasted work if we went with >>>>>>> schemas. >>>>>>> >>>>>>> One of the things I'm working on now is documenting Beam schemas. >>>>>>> They are already very powerful and useful, but since there is still >>>>>>> nothing >>>>>>> in our documentation about them, they are not yet widely used. I expect >>>>>>> to >>>>>>> finish draft documentation by the end of January. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> That's an interesting idea. I must confess I don't rightly know the >>>>>>>> difference between a schema and coder, but here's what I've got with a >>>>>>>> bit >>>>>>>> of searching through memory and the mailing list. Please let me know >>>>>>>> if I'm >>>>>>>> off track. >>>>>>>> >>>>>>>> As near as I can tell, a schema, as far as Beam takes it >>>>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java> >>>>>>>> is >>>>>>>> a mechanism to define what data is extracted from a given row of data. >>>>>>>> So >>>>>>>> in principle, there's an opportunity to be more efficient with data >>>>>>>> with >>>>>>>> many columns that aren't being used, and only extract the data that's >>>>>>>> meaningful to the pipeline. >>>>>>>> The trick then is how to apply the schema to a given serialization >>>>>>>> format, which is something I'm missing in my mental model (and then >>>>>>>> how to >>>>>>>> do it efficiently in Go). >>>>>>>> >>>>>>>> I do know that the Go client package for BigQuery >>>>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does >>>>>>>> something like that, using field tags. Similarly, the >>>>>>>> "encoding/json" <https://golang.org/doc/articles/json_and_go.html> >>>>>>>> package >>>>>>>> in the Go Standard Library permits annotating fields and it will read >>>>>>>> out >>>>>>>> and deserialize the JSON fields and that's it. >>>>>>>> >>>>>>>> A concern I have is that Go (at present) would require pre-compile >>>>>>>> time code generation for schemas to be efficient, and they would still >>>>>>>> mostly boil down to turning []bytes into real structs. Go reflection >>>>>>>> doesn't keep up. >>>>>>>> Go has no mechanism I'm aware of to Just In Time compile more >>>>>>>> efficient processing of values. >>>>>>>> It's also not 100% clear how Schema's would play with protocol >>>>>>>> buffers or similar. >>>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto >>>>>>>> file <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, >>>>>>>> but that's only the specification half, not the using half. >>>>>>>> >>>>>>>> As it stands, the code generator I've been building these last >>>>>>>> months could (in principle) statically analyze a user's struct, and >>>>>>>> then >>>>>>>> generate an efficient dedicated coder for it. It just has no where to >>>>>>>> put >>>>>>>> them such that the Go SDK would use it. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <[email protected]> wrote: >>>>>>>> >>>>>>>>> I'll make a different suggestion. There's been some chatter that >>>>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should >>>>>>>>> make >>>>>>>>> schemas the basic semantics instead of coders. Schemas provide >>>>>>>>> everything a >>>>>>>>> coder provides, but also allows for far more readable code. We can't >>>>>>>>> make >>>>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe >>>>>>>>> in Go >>>>>>>>> we're better off starting with schemas instead of coders? >>>>>>>>> >>>>>>>>> Reuven >>>>>>>>> >>>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> One area that the Go SDK currently lacks: is the ability for >>>>>>>>>> users to specify their own coders for types. >>>>>>>>>> >>>>>>>>>> I've written a proposal document, >>>>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#> >>>>>>>>>> and >>>>>>>>>> while I'm confident about the core, there are certainly some edge >>>>>>>>>> cases >>>>>>>>>> that require discussion before getting on with the implementation. >>>>>>>>>> >>>>>>>>>> At presently, the SDK only permits primitive value types (all >>>>>>>>>> numeric types but complex, strings, and []bytes) which are coded >>>>>>>>>> with beam >>>>>>>>>> coders, and structs whose exported fields are of those type, which >>>>>>>>>> is then >>>>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the >>>>>>>>>> type >>>>>>>>>> anaiyzer, and presents the current work around this issue. >>>>>>>>>> >>>>>>>>>> The high level proposal is to catch up with Python and Java, and >>>>>>>>>> have a coder registry. In addition, arrays, and maps should be >>>>>>>>>> permitted as >>>>>>>>>> well. >>>>>>>>>> >>>>>>>>>> If you have alternatives, or other suggestions and opinions, I'd >>>>>>>>>> love to hear them! Otherwise my intent is to get a PR ready by the >>>>>>>>>> end of >>>>>>>>>> January. >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> Robert Burke >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> http://go/where-is-rebo >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Cheers, >>>>>> Gleb >>>>>> >>>>>
