Wow, this is really coming together, congratulations and thanks for the
great work!

On Wed, Aug 29, 2018 at 1:40 AM Reuven Lax <re...@google.com> wrote:

> I wanted to send a quick note to the community about the current status of
> schema-aware PCollections in Beam. As some might remember we had a good
> discussion last year about the design of these schemas, involving many
> folks from different parts of the community. I sent a summary earlier this
> year explaining how schemas has been integrated into the DoFn framework.
> Much has happened since then, and here are some of the highlights.
>
> First, I want to emphasize that all the schema-aware classes are currently
> marked @Experimental. Nothing is set in stone yet, so if you have questions
> about any decisions made, please start a discussion!
>
> SQL
>
> The first big milestone for schemas was porting all of BeamSQL to use the
> framework, which was done in pr/5956. This was a lot of work, exposed many
> bugs in the schema implementation, but now provides great evidence that
> schemas work!
>
> Schema inference
>
> Beam can automatically infer schemas from Java POJOs (objects with public
> fields) or JavaBean objects (objects with getter/setter methods). Often you
> can do this by simply annotating the class. For example:
>
> @DefaultSchema(JavaFieldSchema.class)
>
> public class UserEvent {
>
>  public String userId;
>
>  public LatLong location;
>
>  Public String countryCode;
>
>  public long transactionCost;
>
>  public double transactionDuration;
>
>  public List<String> traceMessages;
>
> };
>
> @DefaultSchema(JavaFieldSchema.class)
>
> public class LatLong {
>
>  public double latitude;
>
>  public double longitude;
>
> }
>
> Beam will automatically infer schemas for these classes! So if you have a
> PCollection<UserEvent>, it will automatically get the following schema:
>
> UserEvent:
>
>  userId: STRING
>
>  location: ROW(LatLong)
>
>  countryCode: STRING
>
>  transactionCost: INT64
>
>  transactionDuration: DOUBLE
>
>  traceMessages: ARRAY[STRING]]
>
>
> LatLong:
>
>  latitude: DOUBLE
>
>  longitude: DOUBLE
>
> Now it’s not always possible to annotate the class like this (you may not
> own the class definition), so you can also explicitly register this using
> Pipeline:getSchemaRegistry:registerPOJO, and the same for JavaBeans.
>
> Coders
>
> Beam has a built-in coder for any schema-aware PCollection, largely
> removing the need for users to care about coders. We generate low-level
> bytecode (using ByteBuddy) to implement the coder for each schema, so these
> coders are quite performant. This provides a better default coder for Java
> POJO objects as well. In the past users were recommended to use AvroCoder
> for pojos, which many have found inefficient. Now there’s a more-efficient
> solution.
>
> Utility Transforms
>
> Schemas are already useful for implementers of extensions such as SQL, but
> the goal was to use them to make Beam itself easier to use. To this end,
> I’ve been implementing a library of transforms that allow for easy
> manipulation of schema PCollections. So far Filter and Select are merged,
> Group is about to go out for review (it needs some more javadoc and unit
> tests), and Join is being developed but doesn’t yet have a final interface.
>
> Filter
>
> Given a PCollection<LatLong>, I want to keep only those in an area of
> southern manhattan. Well this is easy!
>
> PCollection<LatLong> manhattanEvents = allEvents.apply(Filter
>
>  .whereFieldName("latitude", lat -> lat < 40.720 && lat > 40.699)
>
>  .whereFieldName("longitude", long -> long < -73.969 && long > -74.747));
>
> Schemas along with lambdas allows us to write this transform
> declaratively. The Filter transform also allows you to register filter
> functions that operate on multiple fields at the same time.
>
> Select
>
> Let’s say that I don’t need all the fields in a row. For instance, I’m
> only interested in the userId and traceMessages, and don’t care about the
> location. In that case I can write the following:
>
> PCollection<Row> selected = allEvents.apply(Select.fieldNames(“userId”, “
> traceMessages”));
>
>
> BTW, Beam also keeps track of which fields are accessed by a transform In
> the future we can automatically insert Selects in front of subgraphs to
> drop fields that are not referenced in that subgraph.
>
> Group
>
> Group is one of the more advanced transforms. In its most basic form, it
> provides a convenient way to group by key:
>
> PCollection<KV<Row, Iterable<UserEvent>> byUserAndCountry =
>
>    allEvents.apply(Group.byFieldNames(“userId”, “countryCode”));
>
> Notice how much more concise this is than using GroupByKey directly!
>
> The Group transform really starts to shine however when you start
> specifying aggregations. You can aggregate any field (or fields) and build
> up an output schema based on these aggregations. For example:
>
> PCollection<KV<Row, Row>> aggregated = allEvents.apply(
>
>    Group.byFieldNames(“userId”, “countryCode”)
>
>        .aggregateField("cost", Sum.ofLongs(), "total_cost")
>
>        .aggregateField("cost", Top.<Long>largestFn(10), “top_purchases”)
>
>        .aggregateField("transationDuration", ApproximateQuantilesCombineFn
> .create(21),
>
>              “durationHistogram”)));
>
> This will individually aggregate the specified fields of the input items
> (by user and country), and generate an output schema for these
> aggregations. In this case, the output schema will be the following:
>
> AggregatedSchema:
>
>    total_cost: INT64
>
>    top_purchases: ARRAY[INT64]
>
>    durationHistogram: ARRAY[DOUBLE]
>
> There are some more utility transforms I've written that are worth looking
> at such as Convert (which can convert between user types that share a
> schema) and Unnest (flattens nested schemas). There are also some others
> such as Pivot that we should consider writing
>
> There is still a lot to do. All the todo items are reflected in JIRA,
> however here are some examples of current gaps:
>
>
>    -
>
>    Support for read-only POJOs (those with final fields) and JavaBean
>    (objects without setters).
>    -
>
>    Automatic schema inference from more Java types: protocol buffers,
>    avro, AutoValue, etc.
>    -
>
>    Integration with sources (BigQueryIO, JdbcIO, AvroIO, etc.)
>    -
>
>    Support for JsonPath expressions so users can better express nested
>    fields. E.g. support expressions of the form Select.fields(“field1.field2”,
>    “field3.*”, “field4[0].field5”);
>    -
>
>    Schemas still need to be defined in our portability layer so they can
>    be used cross language.
>
>
> If anyone is interested in helping close these gaps, you'll be helping
> make Beam a better, more-usable system!
>
> Reuven
>
>

Reply via email to