Wow, this is really coming together, congratulations and thanks for the great work!
On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <re...@google.com> wrote: > I wanted to send a quick note to the community about the current status of > schema-aware PCollections in Beam. As some might remember we had a good > discussion last year about the design of these schemas, involving many > folks from different parts of the community. I sent a summary earlier this > year explaining how schemas has been integrated into the DoFn framework. > Much has happened since then, and here are some of the highlights. > > First, I want to emphasize that all the schema-aware classes are currently > marked @Experimental. Nothing is set in stone yet, so if you have questions > about any decisions made, please start a discussion! > > SQL > > The first big milestone for schemas was porting all of BeamSQL to use the > framework, which was done in pr/5956. This was a lot of work, exposed many > bugs in the schema implementation, but now provides great evidence that > schemas work! > > Schema inference > > Beam can automatically infer schemas from Java POJOs (objects with public > fields) or JavaBean objects (objects with getter/setter methods). Often you > can do this by simply annotating the class. For example: > > @DefaultSchema(JavaFieldSchema.class) > > public class UserEvent { > > public String userId; > > public LatLong location; > > Public String countryCode; > > public long transactionCost; > > public double transactionDuration; > > public List<String> traceMessages; > > }; > > @DefaultSchema(JavaFieldSchema.class) > > public class LatLong { > > public double latitude; > > public double longitude; > > } > > Beam will automatically infer schemas for these classes! So if you have a > PCollection<UserEvent>, it will automatically get the following schema: > > UserEvent: > > userId: STRING > > location: ROW(LatLong) > > countryCode: STRING > > transactionCost: INT64 > > transactionDuration: DOUBLE > > traceMessages: ARRAY[STRING]] > > > LatLong: > > latitude: DOUBLE > > longitude: DOUBLE > > Now it’s not always possible to annotate the class like this (you may not > own the class definition), so you can also explicitly register this using > Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans. > > Coders > > Beam has a built-in coder for any schema-aware PCollection, largely > removing the need for users to care about coders. We generate low-level > bytecode (using ByteBuddy) to implement the coder for each schema, so these > coders are quite performant. This provides a better default coder for Java > POJO objects as well. In the past users were recommended to use AvroCoder > for pojos, which many have found inefficient. Now there’s a more-efficient > solution. > > Utility Transforms > > Schemas are already useful for implementers of extensions such as SQL, but > the goal was to use them to make Beam itself easier to use. To this end, > I’ve been implementing a library of transforms that allow for easy > manipulation of schema PCollections. So far Filter and Select are merged, > Group is about to go out for review (it needs some more javadoc and unit > tests), and Join is being developed but doesn’t yet have a final interface. > > Filter > > Given a PCollection<LatLong>, I want to keep only those in an area of > southern manhattan. Well this is easy! > > PCollection<LatLong> manhattanEvents = allEvents.apply(Filter > > .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699) > > .whereFieldName("longitude", long -> long < -73.969 && long > -74.747)); > > Schemas along with lambdas allows us to write this transform > declaratively. The Filter transform also allows you to register filter > functions that operate on multiple fields at the same time. > > Select > > Let’s say that I don’t need all the fields in a row. For instance, I’m > only interested in the userId and traceMessages, and don’t care about the > location. In that case I can write the following: > > PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, “ > traceMessages”)); > > > BTW, Beam also keeps track of which fields are accessed by a transform In > the future we can automatically insert Selects in front of subgraphs to > drop fields that are not referenced in that subgraph. > > Group > > Group is one of the more advanced transforms. In its most basic form, it > provides a convenient way to group by key: > > PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry = > > allEvents.apply(Group.byFieldNames(“userId”, “countryCode”)); > > Notice how much more concise this is than using GroupByKey directly! > > The Group transform really starts to shine however when you start > specifying aggregations. You can aggregate any field (or fields) and build > up an output schema based on these aggregations. For example: > > PCollection<KV<Row, Row>> aggregated = allEvents.apply( > > Group.byFieldNames(“userId”, “countryCode”) > > .aggregateField("cost", Sum.ofLongs(), "total_cost") > > .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”) > > .aggregateField("transationDuration", ApproximateQuantilesCombineFn > .create(21), > > “durationHistogram”))); > > This will individually aggregate the specified fields of the input items > (by user and country), and generate an output schema for these > aggregations. In this case, the output schema will be the following: > > AggregatedSchema: > > total_cost: INT64 > > top_purchases: ARRAY[INT64] > > durationHistogram: ARRAY[DOUBLE] > > There are some more utility transforms I've written that are worth looking > at such as Convert (which can convert between user types that share a > schema) and Unnest (flattens nested schemas). There are also some others > such as Pivot that we should consider writing > > There is still a lot to do. All the todo items are reflected in JIRA, > however here are some examples of current gaps: > > > - > > Support for read-only POJOs (those with final fields) and JavaBean > (objects without setters). > - > > Automatic schema inference from more Java types: protocol buffers, > avro, AutoValue, etc. > - > > Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.) > - > > Support for JsonPath expressions so users can better express nested > fields. E.g. support expressions of the form Select.fields(“field1.field2”, > “field3.*”, “field4[0].field5”); > - > > Schemas still need to be defined in our portability layer so they can > be used cross language. > > > If anyone is interested in helping close these gaps, you'll be helping > make Beam a better, more-usable system! > > Reuven > >