On Fri, Jan 4, 2019 at 7:05 PM Kenneth Knowles <[email protected]> wrote:
>
> On Thu, Jan 3, 2019 at 4:33 PM Reuven Lax <[email protected]> wrote:
>>
>> If a user wants custom encoding for a primitive type, they can create a 
>> byte-array field and wrap that field with a Coder

I don't think the primary use of coders is a custom encoding for
primitive types, rather it's to provide any encoding for a custom
type. This is what schemas are lacking now.

On the other hand, using coders at the Runner level gives the runner
flexibility in how it wants even well-known types (e.g. windowed
types, lazy iterables) to be encoded for easy manipulation.

> This is the crux of the issue, right?
>
> Roughly, today, we've got:
>
>         Schema ::= [ (fieldname, Type) ]
>
>         Type ::= AtomicType | Array<Type> | Map<Type, Type> | Struct<Schema>
>
>         AtomicType ::= bytes | int{16, 32, 64} | datetime | string | ...

This is starting to look a lot like a re-implementation of protocol
buffers... Perhaps the advantage is that it's more lightweight?

If you squint, it also looks isomorphic to Type == WellKnownCoder.
Which, if we allow extensibility, becomes Type == Coder. (Possibly we
could simplify the nested/unnested/deterministic logic by constraining
unknown coders though.)

> To fully replace custom encodings as they exist, you need:
>
>         AtomicType ::= bytes<CustomCoder> | ...
>
> At this point, an SDK need not surface the concept of "Coder" to a user at 
> all outside the bytes field concept and the wire encoding and efficient 
> should be identical or nearly to what we do with coders today. PCollections 
> in such an SDK have schemas, not coders, so we have successfully turned it 
> completely inside-out relative to how the Java SDK does it. Is that what you 
> have in mind?

If a schema had a Type parameterizable by an encode/decode method
(a.k.a. a Coder) we could do this inversion. The primary difference is
that one would now assert that all PCollections have a composite
record structure, even if they only contain one field. (Under what
conditions would we flatten [ fieldname: SchemaX ] to SchemaX?)

> I really like this, but I agree with Robert that this is a major change that 
> takes a bunch of work and a lot more collaborative thinking in design docs if 
> we hope to get it right/stable.
>
> Kenn
>
>>
>> (this is why I said that todays Coders are simply special cases); this 
>> should be very rare though, as users rarely should care how Beam encodes a 
>> long or a double.
>>>
>>>
>>> Offhand, Schemas seem to be an alternative to pipeline construction, rather 
>>> than coders for value serialization, allowing manual field extraction code 
>>> to be omitted. They do not appear to be a fundamental approach to achieve 
>>> it. For example, the grouping operation still needs to encode the whole of 
>>> the object as a value.
>>
>>
>> Schemas are properties of the data - essentially a Schema is the data type 
>> of a PCollection. In Java Schemas are also understood by ParDo, so you can 
>> write a ParDo like this:
>>
>> @ProcessElement
>> public void process(@Field("user") String userId,  @Field("country") String 
>> countryCode) {
>> }
>>
>> These extra functionalities are part of the graph, but they are enabled by 
>> schemas.
>>>
>>>
>>> As mentioned, I'm hoping to have a solution for existing coders by 
>>> January's end, so waiting for your documentation doesn't work on that 
>>> timeline.
>>
>>
>> I don't think we need to wait for all the documentation to be written.
>>
>>>
>>>
>>> That said, they aren't incompatible ideas as demonstrated by the Java 
>>> implementation. The Go SDK remains in an experimental state. We can change 
>>> things should the need arise in the next few months. Further, whenever 
>>> Generics in Go crop up, the existing user surface and execution stack will 
>>> need to be re-written to take advantage of them anyway. That provides an 
>>> opportunity to invert Coder vs Schema dependence while getting a nice 
>>> performance boost, and cleaner code (and deleting much of my code 
>>> generator).
>>>
>>> ----
>>>
>>> Were I to implement schemas to get the same syntatic benefits as the Java 
>>> API, I'd be leveraging the field annotations Go has. This satisfies the 
>>> protocol buffer issue as well, since generated go protos have name & json 
>>> annotations. Schemas could be extracted that way. These are also available 
>>> to anything using static analysis for more direct generation of accessors. 
>>> The reflective approach would also work, which is excellent for development 
>>> purposes.
>>>
>>> The rote code that the schemas were replacing would be able to be cobbled 
>>> together into efficient DoFn and CombineFns for serialization. At present, 
>>> it seems like it could be implemented as a side package that uses beam, 
>>> rather than changing portions of the core beam Go packages, The real trick 
>>> would be to do so without "apply" since that's not how the Go SDK is shaped.
>>>
>>>
>>>
>>>
>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <[email protected]> wrote:
>>>>
>>>> Reuven, it sounds great. I see there is a similar thing to Row coders 
>>>> happening in Apache Arrow, and there is a similarity between Apache Arrow 
>>>> Flight and data exchange service in portability. How do you see these two 
>>>> things relate to each other in the long term?
>>>>
>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>> The biggest advantage is actually readability and usability. A secondary 
>>>>> advantage is that it means that Go will be able to interact seamlessly 
>>>>> with BeamSQL, which would be a big win for Go.
>>>>>
>>>>> A schema is basically a way of saying that a record has a specific set of 
>>>>> (possibly nested, possibly repeated) fields. So for instance let's say 
>>>>> that the user's type is a struct with fields named user, country, 
>>>>> purchaseCost. This allows us to provide transforms that operate on field 
>>>>> names. Some example (using the Java API):
>>>>>
>>>>> PCollection users = events.apply(Select.fields("user"));  // Select out 
>>>>> only the user field.
>>>>>
>>>>> PCollection joinedEvents = 
>>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two 
>>>>> PCollections by user.
>>>>>
>>>>> // For each country, calculate the total purchase cost as well as the top 
>>>>> 10 purchases.
>>>>> // A new schema is created containing fields total_cost and 
>>>>> top_purchases, and rows are created with the aggregation results.
>>>>> PCollection purchaseStatistics = events.apply(
>>>>>     Group.byFieldNames("country")
>>>>>                .aggregateField("purchaseCost", Sum.ofLongs(), 
>>>>> "total_cost"))
>>>>>                 .aggregateField("purchaseCost", Top.largestLongs(10), 
>>>>> "top_purchases"))
>>>>>
>>>>>
>>>>> This is far more readable than what we have today, and what unlocks this 
>>>>> is that Beam actually knows the structure of the record instead of 
>>>>> assuming records are uncrackable blobs.
>>>>>
>>>>> Note that a coder is basically a special case of a schema that has a 
>>>>> single field.
>>>>>
>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user types 
>>>>> into schemas. We use reflection to analyze many user types (e.g. simple 
>>>>> POJO structs, JavaBean classes, Avro records, protocol buffers, etc.) to 
>>>>> determine the schema, however this is done only when the graph is 
>>>>> initially generated. We do use code generation (in Java we do bytecode 
>>>>> generation) to make this somewhat more efficient. I'm willing to bet that 
>>>>> the code generator you've written for structs could be very easily 
>>>>> modified for schemas instead, so it would not be wasted work if we went 
>>>>> with schemas.
>>>>>
>>>>> One of the things I'm working on now is documenting Beam schemas. They 
>>>>> are already very powerful and useful, but since there is still nothing in 
>>>>> our documentation about them, they are not yet widely used. I expect to 
>>>>> finish draft documentation by the end of January.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <[email protected]> wrote:
>>>>>>
>>>>>> That's an interesting idea. I must confess I don't rightly know the 
>>>>>> difference between a schema and coder, but here's what I've got with a 
>>>>>> bit of searching through memory and the mailing list. Please let me know 
>>>>>> if I'm off track.
>>>>>>
>>>>>> As near as I can tell, a schema, as far as Beam takes it is a mechanism 
>>>>>> to define what data is extracted from a given row of data. So in 
>>>>>> principle, there's an opportunity to be more efficient with data with 
>>>>>> many columns that aren't being used, and only extract the data that's 
>>>>>> meaningful to the pipeline.
>>>>>> The trick then is how to apply the schema to a given serialization 
>>>>>> format, which is something I'm missing in my mental model (and then how 
>>>>>> to do it efficiently in Go).
>>>>>>
>>>>>> I do know that the Go client package for BigQuery does something like 
>>>>>> that, using field tags. Similarly, the "encoding/json" package in the Go 
>>>>>> Standard Library permits annotating fields and it will read out and 
>>>>>> deserialize the JSON fields and that's it.
>>>>>>
>>>>>> A concern I have is that Go (at present) would require pre-compile time 
>>>>>> code generation for schemas to be efficient, and they would still mostly 
>>>>>> boil down to turning []bytes into real structs. Go reflection doesn't 
>>>>>> keep up.
>>>>>> Go has no mechanism I'm aware of to Just In Time compile more efficient 
>>>>>> processing of values.
>>>>>> It's also not 100% clear how Schema's would play with protocol buffers 
>>>>>> or similar.
>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto file, 
>>>>>> but that's only the specification half, not the using half.
>>>>>>
>>>>>> As it stands, the code generator I've been building these last months 
>>>>>> could (in principle) statically analyze a user's struct, and then 
>>>>>> generate an efficient dedicated coder for it. It just has no where to 
>>>>>> put them such that the Go SDK would use it.
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <[email protected]> wrote:
>>>>>>>
>>>>>>> I'll make a different suggestion. There's been some chatter that 
>>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should 
>>>>>>> make schemas the basic semantics instead of coders. Schemas provide 
>>>>>>> everything a coder provides, but also allows for far more readable 
>>>>>>> code. We can't make such a change in Beam Java 2.X for compatibility 
>>>>>>> reasons, but maybe in Go we're better off starting with schemas instead 
>>>>>>> of coders?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <[email protected]> wrote:
>>>>>>>>
>>>>>>>> One area that the Go SDK currently lacks: is the ability for users to 
>>>>>>>> specify their own coders for types.
>>>>>>>>
>>>>>>>> I've written a proposal document, and while I'm confident about the 
>>>>>>>> core, there are certainly some edge cases that require discussion 
>>>>>>>> before getting on with the implementation.
>>>>>>>>
>>>>>>>> At presently, the SDK only permits primitive value types (all numeric 
>>>>>>>> types but complex, strings, and []bytes) which are coded with beam 
>>>>>>>> coders, and structs whose exported fields are of those type, which is 
>>>>>>>> then encoded as JSON. Protocol buffer support is hacked in to avoid 
>>>>>>>> the type anaiyzer, and presents the current work around this issue.
>>>>>>>>
>>>>>>>> The high level proposal is to catch up with Python and Java, and have 
>>>>>>>> a coder registry. In addition, arrays, and maps should be permitted as 
>>>>>>>> well.
>>>>>>>>
>>>>>>>> If you have alternatives, or other suggestions and opinions, I'd love 
>>>>>>>> to hear them! Otherwise my intent is to get a PR ready by the end of 
>>>>>>>> January.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Robert Burke
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> http://go/where-is-rebo
>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> Gleb

Reply via email to