I am not convinced that the partitioning and partition key issue is quite
equivalent to partitions in a relational database. In a relational database
partitions have no semantic meaning and are just a way to organize data for
query efficiency. That is not the case in Kafka. Let my try to argue that
partitioning and ordering in streams is not the same as which disk you
store data on or which CPU processes you query, which are true
implementation details.

Kafka's model is a partitioned, ordered log or stream and Samza borrows
this data model from Kafka. So ordering, or time, is something that is
defined within a partition not between. This offset time and ordering is
actually the most natural "timestamp" since it represents the "time" that
the data was published. This time is precise in the sense that no two
events have the same time and perfect order is maintained. In some cases
processing won't make use of the natural time implied by the ordering but
instead resort by some kind of timestamp. But this natural notion of
order/time is actually pretty important: consider a case where you model
mutations as a stream of changes. If you publish two updates to a row, the
order of those updates is preserved in the natural ordering no matter how
close together they occur...this is not true of a physical timestamp which
is ambiguous beyond some precision, and incorrect when compared between
machines, and non-monotonic under some circumstances even on a machine.

There would be two ways to deal with this. One would be to say "this notion
of a stream is too mechanical" and try to paper over it in the language,
another would be to embrace this data model and make the query model
support it.

I think you can argue that this is a good or natural data model for stream
processing.

You could maybe argue that a more intuitive notion of order would be a
total order over all records. However in practice there are several issues
with this:
1. No one knows how to scale a total order that is sensible from the user's
point of view.
2. In practice events themselves don't actually have a total order. That is
the events themselves are occurring in some number of "threads" in the
world. Within these threads, events are ordered. Between these threads
there is no true order you can reason about outside communication. A
partition represents a fixed merging of the sequence emitted by these
"threads of control"

Sometimes people will make use of timestamps. But timestamps generated
across a bunch of machines are actually a fairly nuanced thing to reason
about. The order/natural time has the property that when you have seen
offset 32 it is complete and will never recur, but timestamps are aligned
in this way and so just because you say unix timestamp 1422660730932 that
doesn't mean you won't see previous timestamps in the future.

This leaves you with a partitioned total order as the kind of fundamental
data structure for streams.

Thus two streams partitioned by different keys are actually not the same
streams because they have different orders and hence different natural
clocks. If that is the case, I think we should not try to hide it.

-Jay



On Fri, Jan 30, 2015 at 10:43 AM, Chris Riccomini <criccom...@apache.org>
wrote:

> Hey all,
>
> Just catching up on this thread. The Calcite + Samza approach seems pretty
> compelling to me. I think most of what Julian is arguing for makes sense.
> My main concern is with practicalities.
>
> One specific case of this is the discussion about the partitioning model.
> In an ideal world, I agree, developers wouldn't need to define partitions
> in SQL. Practically speaking, though, Samza (Kafka, really) currently
> doesn't have a metadata repository. Without a metadata repository, we have
> no way of knowing 1) which key a topic is partitioned by, and 2) what the
> schema of the topic is (without looking at records). We know this *within*
> a query (or session), but not *between* queries (or sessions) from disjoint
> users.
>
> One could argue that we should spend time defining such a metadata
> repository, which is a reasonable argument. But that's also a fairly large
> effort. I wonder if we might be able to cut some corners in a clean-ish
> forwards-compatible way, so that developers don't have to wait for us to
> fully implement (or integrate with) something like a Hive metadata store.
>
> In person, one thing you mentioned, Julian, was using hints, rather than
> stuff baked into the syntax. If that's stomach-able, we could support
> partitioning through hints, until we have a full blown metadata store.
>
> Thoughts?
>
> Cheers,
> Chris
>
> On Thu, Jan 29, 2015 at 5:10 PM, Julian Hyde <jul...@hydromatic.net>
> wrote:
>
> >
> > > On Jan 29, 2015, at 4:38 PM, Yi Pan <nickpa...@gmail.com> wrote:
> > >
> > > I am wondering if I can get an average that's per 30 min window
> averages?
> > > I.e. the following is the input events in a stream:
> > >  {10:01, ORCL, 10, 10}
> > >  {10:02, MSFT, 30, 30}
> > >  {10:03, ORCL, 100, 110}
> > >  {10:17, MSFT, 45, 75}
> > >  {10:59, ORCL, 20, 130}
> > >  {11:02, ORCL, 50, 50}
> > > Can I get the non-overlapping window average from 10:00-10:29, and
> > > 10:30-10:59, ... ? Could you give an example how to define that window
> > > operation in your model? Note that in this use case, although I may
> have
> > 1
> > > trading record per minute from the stream, I only generate 2 average
> > > records from 10:00-11:00.
> >
> > That takes a bit of messy date arithmetic, made even more messy by the
> > fact that you can't regard a SQL timestamp as a "milliseconds" value as
> one
> > would in C or Java, nor can you divide it. Hence the trick of
> subtracting a
> > constant timestamp, which yields an interval value, and then doing
> integer
> > division on that interval to find the number of 30-minute periods since
> the
> > epoch.
> >
> > select stream rowtime, ticker, amount,
> >   sum(amount) over (
> >     order by rowtime
> >     partition by ticker,
> >       (rowtime - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 30)
> > from StockTrades;
> >
> > If I were doing this kind of calculation often I'd define a UDF, or even
> > that user-defined window SPI I mentioned earlier.
> >
> > > {quote}
> > > CREATE TABLE Emp(empno INTEGER, name VARCHAR(20), department
> VARCHAR(20))
> > >  PARTITION BY HASHCODE (department);
> > > {quote}
> > > That's good! At least it resolves my question on: "which field is the
> > > partition key". However, I still have the question on the number of
> > > partitions. As I stated that when the system currently does not have a
> > > "auto-scaling" feature, the number of partitions for a stream has to be
> > > explicitly specified. Where do you suggest to put this information in
> w/o
> > > breaking SQL syntax?
> >
> > I imagine you could start the system on Tuesday with 10 partitions per
> > stream and restart it on Wednesday with 8 or 12? You wouldn't want to
> > change the SQL, because that's in the application. But you could change
> the
> > definition of the stream, either the DDL or by changing some other system
> > configuration. Then partitioning function (applied by the system to route
> > the record) could, say, take the value of p modulo of the current number
> of
> > streams.
> >
> > Julian
> >
> >
>

Reply via email to