Great summary Chris. I agree that we should really really strive not to have another mandatory operational dependency for out of the box usage.
To clarify what I was saying, I am less concerned about the partition count which is readily discoverable and basically just part of the "ddl" for the topic regardless of how you create it, I was actually concerned about the partition key. I.e. create stream partitioned_page_views as select * from page_views produces different output depending on the partition key if you consider the ordering of the stream relevant (which I do). So I think the discussion item there is really about the time vs tuple driven production which amounts to CQL vs StreamBase semantics. I don't think I understand all the tradeoffs, but some of the corner cases of the time based production were quite odd (see that paper I linked from SAMZA-390) when you have duplicate timestamps (and when you have a million messages per second, millisecond resolution on time means you will have hundreds of thousands of messages with each timestamp). Personally I really think supporting multiple schemas that disagree on the fields is going to take you outside the realm of sql and be quite difficult to use. I think for databases they should publish each table to it's own topic and include a txn/scn type thing to allow downstream users who want to merge into a total order to do so. -Jay On Sat, Jan 31, 2015 at 12:50 PM, Chris Riccomini <criccom...@apache.org> wrote: > Hey all, > > Trying to respond in order. > > > If one wants finer-grained control over what the output topic would be > like, wouldn't it make sense to use a CREATE TABLE AS <query> statement? > > Yes. CREATE TABLE AS could be used to define all three things that I > mentioned (partition key, partition count, and schema). I think the > discussion is about whether we *should* use it, and if so, for which ones. > See Jay's comments about wanting to explicitly define partitions in the > language. I agree with you, though. On the producer-side, between SEELCT > ... clasuses, and CREATE TABLE AS, we have enough to define outgoing > schemas. > > The point of my comments was just to explore what we could do with 1) > neither a schema registry 2) nor any SQL-level stream-specific clauses. > Once we've established that spectrum, I think we can argue where along it > we'd like to be. > > > How would you know that even if you did look at the records? I'm pretty > sure that's impossible with avro.... Isn't it mandatory to have some sort > of metadata repository for Samza SQL to work? > > It really depends on the serde. JSON, BSON, and MessagePack can all be > decoded (and the schema inferred) just by reading the message. The three > you mention (Avro, Thrift, Protobuf) might require a schema registry, > though all three could be encoded with per-message schemas as well. It > depends on the serde. I'm not saying that this is a good idea, but there > are certainly a lot of ways to make SQL work (for some definition of work) > without a schema registry. > > > Given the solutions that already exist though, would it really be that > much effort to leverage one of them? > > I might be overly-pessimistic, but adding a new piece of software to the > stack almost always has a non-trivial amount of overhead to > it. Operational, mental-model, code, etc. Samza is already so heavy-weight > as it is (ZK, YARN, Kafka). I agree with your statement that half the power > of SQL comes from the DDL. What I'm trying to do is figure out what the MVP > is. It would totally be useful to have a schema registry that defines > partition keys, partition counts, schemas, etc. What I was suggesting is > that we might be able to forego it initially, and try and implement the SQL > in a future-proof way, such that adding it later isn't a big problem. > > Basically, I want to limit scope, so we could have something to show for > ourselves fast, rather than trying to do everything up front. > > > I think the schema repository acts as the stand in for the database > catalog or hcat. People who don't have that will have to give their schema > with the query. I think that can be some kind of plugin to provide the > schema so it is automatically inferred for the avro people and manually > provided for others. > > I agree. If you have a schema repo, we can use it. If you don't, you can > either 1) define the schema in your query or 2) you'll fail at runtime. > That seems OK to me. The json model that Julian and Milinda mention also > seems like a viable solution for those without a schema registry, though > it's a bit of a pain. Basically, to get the best experience, you need a > schema repo. > > But this still leaves us with the partition key/partition count discussion. > It sounds like there's some consensus on the partition key being defined in > SQL (CREATE TABLE foo PARTITION BY member_id AS SELECT ... or something). > > Is it fair to say that the last remaining point of my original comment is > on partition count being defined in the SQL or not? Personally, I tend to > agree with Jay's comments, but I'm not picky about whether the partition > count is defined directly in the query, or simply as a hint: > > CREATE TABLE foo PARTITION BY member_id WITH 200 PARTITIONS SELECT ... > > Or: > > -- partition_count=200 > CREATE TABLE foo PARTITION BY member_id SELECT ... > > I'm mostly just concerned that we have no way to define this now, so it > seems mandatory that we have *some* way of defining it (unless we expect > users to manually run a create-topic command before executing every query). > > I guess I have two remaining questions, then: > > 1. What should we do about the partition count definition? (Jay's > discussion) > 2. We had been building the operator layer (Yi's stuff) to support > heterogeneous schema types in a single stream. The schema discussion we've > been having seems to suggest that we only want to support one schema per > stream. Since Yi's stuff is more general, it could be made to work with > just one schema per-stream, but I just wanted to call it out as a conscious > decision. This seems as though it would prevent us from being able to > select over streams that want a strong time-order amongst messages of > different types (e.g. a change log from a DB shard that had more than on > table in it). > > Cheers, > Chris > > On Fri, Jan 30, 2015 at 3:50 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Chris, > > > > I think the schema repository acts as the stand in for the database > catalog > > or hcat. People who don't have that will have to give their schema with > the > > query. I think that can be some kind of plugin to provide the schema so > it > > is automatically inferred for the avro people and manually provided for > > others. > > > > -Jay > > > > On Fri, Jan 30, 2015 at 12:06 PM, Chris Riccomini <criccom...@apache.org > > > > wrote: > > > > > Hey all, > > > > > > I have a few more comments on the metadata issue that I brought up. The > > > three things that we lack right now are: > > > > > > 1. Partition count. > > > 2. Partition key. > > > 3. Stream (or message) schema. > > > > > > These have to be evaluated both on the ingest (first consumers) and > > egress > > > (final producers) of a query. > > > > > > On the consumer-side, (1) can be determined simply by asking Kafka. On > > the > > > producer-side, we have no way to determine how many partitions an > output > > > stream should be right now, short of having the user specify it in some > > > way. > > > > > > On the producer-side (2) must defined in SQL, or via a hint, for final > > > output to a stream. Intermediate streams can be keyed based off of the > > > consumers within the session. On the consumer-side, the consumer can > > always > > > re-partition an input stream based on the partition key that it needs, > so > > > it doesn't need to know a priori what the key was. > > > > > > Message schemas (3) can be evaluated on the consumer-side at runtime by > > > poking at the incoming messages, which should be self-describing. > Output > > > message schemas can be derived based off of the transformations of > > incoming > > > messages on the producer-side. > > > > > > If we're OK with these strategies, then it seems we only really need to > > > worry about: > > > > > > 1. Producer-side partition count. > > > 2. Producer-side partition key. > > > > > > One other annoying property of (3) is that compile-time validation of a > > > query won't be able to check that a field for a given stream actually > > > exists--it'll fail at runtime. This could perhaps be accelerated by > > > sampling some messages before deploying the job, to check that the > > messages > > > have the appropriate fields, but it's still a runtime check. > > > > > > Cheers, > > > Chris > > > > > > On Fri, Jan 30, 2015 at 10:43 AM, Chris Riccomini < > criccom...@apache.org > > > > > > wrote: > > > > > > > Hey all, > > > > > > > > Just catching up on this thread. The Calcite + Samza approach seems > > > pretty > > > > compelling to me. I think most of what Julian is arguing for makes > > sense. > > > > My main concern is with practicalities. > > > > > > > > One specific case of this is the discussion about the partitioning > > model. > > > > In an ideal world, I agree, developers wouldn't need to define > > partitions > > > > in SQL. Practically speaking, though, Samza (Kafka, really) currently > > > > doesn't have a metadata repository. Without a metadata repository, we > > > have > > > > no way of knowing 1) which key a topic is partitioned by, and 2) what > > the > > > > schema of the topic is (without looking at records). We know this > > > *within* > > > > a query (or session), but not *between* queries (or sessions) from > > > disjoint > > > > users. > > > > > > > > One could argue that we should spend time defining such a metadata > > > > repository, which is a reasonable argument. But that's also a fairly > > > large > > > > effort. I wonder if we might be able to cut some corners in a > clean-ish > > > > forwards-compatible way, so that developers don't have to wait for us > > to > > > > fully implement (or integrate with) something like a Hive metadata > > store. > > > > > > > > In person, one thing you mentioned, Julian, was using hints, rather > > than > > > > stuff baked into the syntax. If that's stomach-able, we could support > > > > partitioning through hints, until we have a full blown metadata > store. > > > > > > > > Thoughts? > > > > > > > > Cheers, > > > > Chris > > > > > > > > On Thu, Jan 29, 2015 at 5:10 PM, Julian Hyde <jul...@hydromatic.net> > > > > wrote: > > > > > > > >> > > > >> > On Jan 29, 2015, at 4:38 PM, Yi Pan <nickpa...@gmail.com> wrote: > > > >> > > > > >> > I am wondering if I can get an average that's per 30 min window > > > >> averages? > > > >> > I.e. the following is the input events in a stream: > > > >> > {10:01, ORCL, 10, 10} > > > >> > {10:02, MSFT, 30, 30} > > > >> > {10:03, ORCL, 100, 110} > > > >> > {10:17, MSFT, 45, 75} > > > >> > {10:59, ORCL, 20, 130} > > > >> > {11:02, ORCL, 50, 50} > > > >> > Can I get the non-overlapping window average from 10:00-10:29, and > > > >> > 10:30-10:59, ... ? Could you give an example how to define that > > window > > > >> > operation in your model? Note that in this use case, although I > may > > > >> have 1 > > > >> > trading record per minute from the stream, I only generate 2 > average > > > >> > records from 10:00-11:00. > > > >> > > > >> That takes a bit of messy date arithmetic, made even more messy by > the > > > >> fact that you can't regard a SQL timestamp as a "milliseconds" value > > as > > > one > > > >> would in C or Java, nor can you divide it. Hence the trick of > > > subtracting a > > > >> constant timestamp, which yields an interval value, and then doing > > > integer > > > >> division on that interval to find the number of 30-minute periods > > since > > > the > > > >> epoch. > > > >> > > > >> select stream rowtime, ticker, amount, > > > >> sum(amount) over ( > > > >> order by rowtime > > > >> partition by ticker, > > > >> (rowtime - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 30) > > > >> from StockTrades; > > > >> > > > >> If I were doing this kind of calculation often I'd define a UDF, or > > even > > > >> that user-defined window SPI I mentioned earlier. > > > >> > > > >> > {quote} > > > >> > CREATE TABLE Emp(empno INTEGER, name VARCHAR(20), department > > > >> VARCHAR(20)) > > > >> > PARTITION BY HASHCODE (department); > > > >> > {quote} > > > >> > That's good! At least it resolves my question on: "which field is > > the > > > >> > partition key". However, I still have the question on the number > of > > > >> > partitions. As I stated that when the system currently does not > > have a > > > >> > "auto-scaling" feature, the number of partitions for a stream has > to > > > be > > > >> > explicitly specified. Where do you suggest to put this information > > in > > > >> w/o > > > >> > breaking SQL syntax? > > > >> > > > >> I imagine you could start the system on Tuesday with 10 partitions > per > > > >> stream and restart it on Wednesday with 8 or 12? You wouldn't want > to > > > >> change the SQL, because that's in the application. But you could > > change > > > the > > > >> definition of the stream, either the DDL or by changing some other > > > system > > > >> configuration. Then partitioning function (applied by the system to > > > route > > > >> the record) could, say, take the value of p modulo of the current > > > number of > > > >> streams. > > > >> > > > >> Julian > > > >> > > > >> > > > > > > > > > >