Hi Weston,

> Only the minimal schema can be used for the actual compute operations
while the rest is just carried along and processed by the user at the end
> If all you are doing is custom iterative processing (as opposed to
running traditional relational algebra operators) the scanner should be
enough

We would like our users to issue SQL-ish queries to analyze the structural
logs on the fly, and the projected/filtered/aggregated fields could come
from any field extracted from the logs instead of just the common minimal
fields. As you pointed out, there are some changes that need to be made for
the regular ExecPlan to make it work I think. I got a rough idea on this
part now, and I will do some more investigation to see how this could be
done. Thanks so much for the detailed guidance and ideas.

Regards,
Yue

On Wed, Nov 17, 2021 at 7:16 AM Weston Pace <weston.p...@gmail.com> wrote:

> I think the ExecPlan itself would probably need some changes.  Right
> now each node has an output schema.  Most of the node implementations
> depend on this in some way or another.
>
> For example, a filter node binds the expression to the schema once at
> plan construction time.  If the schema is variable what happens to the
> expression "x > 3" when a batch comes by that has no "x" column?  What
> happens if a batch comes by where the "x" column is a string and not a
> number?  You could probably adapt some logic here but it doesn't exist
> today.  The projection node will have pretty much the same dilemma.
>
> The aggregate nodes will run into similar problems (How do you build
> an effective hash map if your join column "group_id" is sometimes a
> string and sometimes an integer?)
>
> Perhaps it would be easier to think about it as having some minimal
> schema with "extra" (e.g. in structured logging you always have
> timestamp, level, and body, then everything else could just be extra).
> Only the minimal schema can be used for the actual compute operations
> while the rest is just carried along and processed by the user at the
> end.
>
> Another approach could be to adapt the scanner so that it can handle
> outputting batches with different schemas.  If all you are doing is
> custom iterative processing (as opposed to running traditional
> relational algebra operators) the scanner should be enough.  You can
> get an iterator of batches and setup your own custom processing
> pipeline from that.
>
> These are just ideas from the hip.  I think most relational algebra
> systems rely on fixed schemas, and I would worry a little about the
> additional complexity of trying to fight against that.
>
> -Weston
>
> On Mon, Nov 15, 2021 at 1:11 AM Yue Ni <niyue....@gmail.com> wrote:
> >
> > > This schema evolution work has not been done in the scanner yet so if
> > > you are interested you might want to look at ARROW-11003[1].
> >
> > Thanks. I will keep an eye on it.
> >
> > > Or did you have a different use case for multiple schemas in mind that
> > > doesn't quite fit the "promote to common schema" case?
> >
> > I think my use case is not exactly the same as the dataset scanner use
> > case. In my case, the data set could be a set of JSON files (JSON lines
> > actually, https://jsonlines.org) that are essentially structural logs
> from
> > other applications (like this example,
> >
> https://stackify.com/what-is-structured-logging-and-why-developers-need-it/
> )
> > , so the schema may vary a lot from app to app. Currently we have a
> > home-brewed compute engine leveraging arrow as data format, and I am
> > researching if there is any chance we could leverage arrow's new C++
> > compute engine, probably writing some custom execution operators to make
> it
> > work.
> >
> > On Mon, Nov 15, 2021 at 4:12 PM Weston Pace <weston.p...@gmail.com>
> wrote:
> >
> > > > "The consumer of a Scan does not need to know how it is implemented,
> > > > only that a uniform API is provided to obtain the next RecordBatch
> > > > with a known schema.", I interpret this as `Scan` operator may
> > > > produce multiple RecordBatches, and each of them should have a known
> > > > schema, but next batch's schema could be different with the previous
> > > > batch. Is this understanding correct?
> > >
> > > No.  The compute/query engine expects that all batches share the same
> > > schema.
> > >
> > > However, it is often the case that files in a dataset have different
> > > schemas.  One way to handle this is to "promote up" to the most common
> > > data type for a given column.  For example, if some columns are int32
> > > and some are int64 then promote all to int64.  "schema evolution" is
> > > one term I've seen thrown around for this process.
> > >
> > > I believe the plan is to eventually have this promotion happen during
> > > the scanning process.  So the scanner has to deal with fragments that
> > > may have differing schemas but by the time the data reaches the query
> > > engine the record batches all have the consistent common schema.
> > >
> > > This schema evolution work has not been done in the scanner yet so if
> > > you are interested you might want to look at ARROW-11003[1].  Or did
> > > you have a different use case for multiple schemas in mind that
> > > doesn't quite fit the "promote to common schema" case?
> > >
> > > [1] https://issues.apache.org/jira/browse/ARROW-11003
> > >
> > >
> > > On Sun, Nov 14, 2021 at 7:03 PM Yue Ni <niyue....@gmail.com> wrote:
> > > >
> > > > Hi there,
> > > >
> > > > I am evaluating Apache Arrow C++ compute engine for my project, and
> > > wonder
> > > > what the schema assumption is for execution operators in the compute
> > > > engine.
> > > >
> > > > In my use case, multiple record batches for computation may have
> > > different
> > > > schemas. I read the Apache Arrow Query Engine for C++ design doc (
> > > >
> > >
> https://docs.google.com/document/d/10RoUZmiMQRi_J1FcPeVAUAMJ6d_ZuiEbaM2Y33sNPu4
> > > ),and
> > > > for the `Scan` operator, it is said "The consumer of a Scan does not
> need
> > > > to know how it is implemented, only that a uniform API is provided to
> > > > obtain the next RecordBatch with a known schema.", I interpret this
> as
> > > > `Scan` operator may produce multiple RecordBatches, and each of them
> > > should
> > > > have a known schema, but next batch's schema could be different with
> the
> > > > previous batch. Is this understanding correct?
> > > >
> > > > And I read arrow's source code, in `exec_plan.h`:
> > > > ```
> > > > class ARROW_EXPORT ExecNode {
> > > > ...
> > > > /// The datatypes for batches produced by this node
> > > >   const std::shared_ptr<Schema>& output_schema() const { return
> > > > output_schema_; }
> > > > ...
> > > > ```
> > > > It looks like each `ExecNode` needs to provide an `output_schema`.
> Is it
> > > > allowed to return `output_schema` that may change during ExecNode's
> > > > execution? If I would like to implement an execution node that will
> > > produce
> > > > multiple batches that may have different schemas, is this feasible
> within
> > > > Arrow C++ compute engine API framework? Thanks.
> > > >
> > > > Regards,
> > > > Yue
> > >
>

Reply via email to