Hi all,

thanks for summarizing the discussion @Shuyi. I think we need to include the "table update mode" problem as it might not be changed easily in the future. Regarding "support row/map/array data type", I don't see a problem why we should not support them now as the data types are already included in the runtime. The "support custom timestamp extractor" is solved by the computed columns approach. The "custom watermark strategy" can be added by supplying a class name as paramter in my opinion.

Regarding the comments of Lin and Jark:

@Lin: Instantiating a TableSource/Sink should not cost much, but we should not mix catalog discussion and DDL at this point.

4) Event-Time Attributes and Watermarks
4.b) Regarding `ts AS SYSTEMROWTIME()` and Lin's comment about "will violate the rule": there is no explicit rule of doing so. Computed column are also not standard compliant, if we can use information that is encoded in constraints we should use it. Adding more and more top-level properties makes the interaction with connectors more difficult. An additional HEADER keyword sounds too connector-specific and also not SQL compliant to me.

3) SOURCE / SINK / BOTH
GRANT/INVOKE are mutating an existing table, right? In my opinion, independent of SQL databases but focusing on Flink user requirements, a CREATE TABLE statement should be an immutable definition of a connection to an external system.

7) Table Update Mode
As far as I can see, the only thing missing for enabling all table modes is the declaration of a change flag. We could introduce a new keyword here similar to WATERMARK:

CREATE TABLE output_kafka_t1(
  id bigint,
  msg varchar,
  CHANGE_FLAG FOR isRetraction
) WITH (
  type=kafka
  ,...
);

CREATE TABLE output_kafka_t1(
  CHANGE_FLAG FOR isUpsert
  id bigint,
  msg varchar,
  PRIMARY_KEY(id)
) WITH (
  type=kafka
  ,...
);

What do you think?

@Jark: We should definitely stage the discussions and mention the opinions and advantages/disadvantages that have been proposed already in the FLIP.

Regards,
Timo

Am 10.12.18 um 08:10 schrieb Jark Wu:
Hi all,

It's great to see we have an agreement on MVP.

4.b) Ingesting and writing timestamps to systems.
I would treat the field as a physical column not a virtual column. If we
treat it as computed column, it will be confused that the behavior is
different when it is a source or sink.
When it is a physical column, the behavior could be unified. Then the
problem is how to mapping from the field to kafka message timestamp?
One is Lin proposed above and is also used in KSQL[1]. Another idea is
introducing a HEADER column which strictly map by name to the fields in
message header.
For example,

CREATE TABLE output_kafka_t1(
   id bigint,
   ts timestamp HEADER,
   msg varchar
) WITH (
   type=kafka
   ,...
);

This is used in Alibaba but not included in the DDL draft. It will further
extend the SQL syntax, which is we should be cautious about. What do you
think about this two solutions?

4.d) Custom watermark strategies:
@Timo,  I don't have a strong opinion on this.

3) SOURCE/SINK/BOTH
Agree with Lin, GRANT/INVOKE [SELECT|UPDATE] ON TABLE is a clean and
standard way to manage the permission, which is also adopted by HIVE[2] and
many databases.

[1]: https://docs.confluent.io/current/ksql/docs/tutorials/examples.html
[2]:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=45876173#Hivedeprecatedauthorizationmode/LegacyMode-Grant/RevokePrivileges

@Timo, it's great if someone can conclude the discussion and summarize into
a FLIP.
@Shuyi, Thanks a lot for putting it all together. The google doc looks good
to me, and I left some minor comments there.

Regarding to the FLIP, I have some suggestions:
1. The FLIP can contain MILESTONE1 and FUTURE WORKS.
2. The MILESTONE1 is the MVP. It describes the MVP DDL syntax.
3. Separate FUTURE WORKS into two parts: UNDER DISCUSSION and ADOPTED. We
can derive MILESTONE2 from this easily when it is ready.

I summarized the Future Works based on Shuyi's work:

Adopted: (Should detailed described here...)
1. support data type nullability and precision.
2. comment on table and columns.

Under Discussion: (Should briefly describe some options...)
1. Ingesting and writing timestamps to systems.
2. support custom watermark strategy.
3. support table update mode
4. support row/map/array data type
5. support schema derivation
6. support system versioned temporal table
7. support table index

We can continue the further discussion here, also can separate to an other
DISCUSS topic if it is a sophisticated problem such as Table Update Mode,
Temporal Table.

Best,
Jark

On Mon, 10 Dec 2018 at 11:54, Lin Li <lincoln.8...@gmail.com> wrote:

hi all,
Thanks for your valuable input!

4) Event-Time Attributes and Watermarks:
4.b) @Fabian As you mentioned using a computed columns `ts AS
SYSTEMROWTIME()`
for writing out to kafka table sink will violate the rule that computed
fields are not emitted.
Since the timestamp column in kafka's header area is a specific
materialization protocol,
why don't we treat it as an connector property? For an example:
```
CREATE TABLE output_kafka_t1(
   id bigint,
   ts timestamp,
   msg varchar
) WITH (
   type=kafka,
   header.timestamp=ts
   ,...
);
```

4d) For custom watermark strategies
@Fabian Agree with you that opening another topic about this feature later.

3) SOURCE / SINK / BOTH
I think the permissions and availabilities are two separately things,
permissions
can be managed well by using GRANT/INVOKE(you can call it DCL) solutions
which
commonly used in different DBs. The permission part can be an new topic for
later discussion, what do you think?

For the availabilities, @Fabian @Timo  I've another question,
does instantiate a TableSource/Sink cost much or has some other downsides?
IMO, create a new source/sink object via the construct seems not costly.
When receiving a DDL we should associate it with the catalog object
(reusing an existence or create a new one).
Am I lost something important?

5. Schema declaration:
@Timo  yes, your concern about the user convenience is very important. But
I haven't seen a clear way to solve this so far.
Do we put it later and wait for more inputs from the community?

Shuyi Chen <suez1...@gmail.com> 于2018年12月8日周六 下午4:27写道:

Hi all,

Thanks a lot for the great discussion. I think we can continue the
discussion here while carving out a MVP so that the community can start
working on. Based on the discussion so far, I try to summarize what we
will
do for the MVP:

MVP

    1. support CREATE TABLE
    2. support exisiting data type in Flink SQL, ignore nullability and
    precision
    3. support table comments and column comments
    4. support table constraint PRIMARY KEY and UNIQUE
    5. support table properties using key-value pairs
    6. support partitioned by
    7. support computed column
    8. support from-field and from-source timestamp extractors
    9. support PERIODIC-ASCENDING, PERIODIC-BOUNDED, FROM-SOURCE watermark
    strategies.
    10. support a table property to allow explicit enforcement of
    read/write(source/sink) permission of a table

I try to put up the DDL grammar (


https://docs.google.com/document/d/1ug1-aVBSCxZQk58kR-yaK2ETCgL3zg0eDUVGCnW2V9E/edit?usp=sharing
)
based on the MVP features above and the previous design docs. Please
take a
look and comment on it.


Also, I summarize the future Improvement on CREATE TABLE as the
followings:
    1. support table update mode
    2. support data type nullability and precision
    3. support row/map/array data type
    4. support custom timestamp extractor and watermark strategy
    5. support schema derivation
    6. support system versioned temporal table
    7. support table index

I suggest we first agree on the MVP feature list and the MVP grammar. And
then we can either continue the discussion of the future improvements
here,
or create separate JIRAs for each item and discuss further in the JIRA.
What do you guys think?

Shuyi

On Fri, Dec 7, 2018 at 7:54 AM Timo Walther <twal...@apache.org> wrote:

Hi all,

I think we are making good progress. Thanks for all the feedback so
far.
3. Sources/Sinks:
It seems that I can not find supporters for explicit SOURCE/SINK
declaration so I'm fine with not using those keywords.
@Fabian: Maybe we don't haven have to change the TableFactory interface
but just provide some helper functions in the TableFactoryService. This
would solve the availability problem, but the permission problem would
still not be solved. If you are fine with it, we could introduce a
property instead?

5. Schema declaration:
@Lin: We should find an agreement on this as it requires changes to the
TableFactory interface. We should minimize changes to this interface
because it is user-facing. Especially, if format schema and table
schema
differ, the need for such a functionality is very important. Our goal
is
to connect to existing infrastructure. For example, if we are using
Avro
and the existing Avro format has enums but Flink SQL does not support
enums, it would be helpful to let the Avro format derive a table
schema.
Otherwise your need to declare both schemas which leads to CREATE TABLE
statements of 400 lines+.
I think the mentioned query:
CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
format.schema-file = "/my/avrofile.avsc")
is fine and should only be valid if the schema contains no non-computed
columns.

7. Table Update Mode:
After thinking about it again, I agree. The mode of the sinks can be
derived from the query and the existence of a PRIMARY KEY declaration.
But Fabian raised a very good point. How do we deal with sources? Shall
we introduce a new keywords similar to WATERMARKS such that a
upsert/retract flag is not part of the visible schema?

4a. How to mark a field as attribute?
@Jark: Thanks for the explanation of the WATERMARK clause semantics.
This is a nice way of marking existing fields. This sounds good to me.

4c) WATERMARK as constraint
I'm fine with leaving the WATERMARK clause in the schema definition.

4d) Custom watermark strategies:
I would already think about custom watermark strategies as the current
descriptor design already supports this. ScalarFunction's don't work as
a PeriodicWatermarkAssigner has different semantics. Why not simply
entering the a full class name here as it is done in the current
design?
4.b) Ingesting and writing timestamps to systems (like Kafka)
@Fabian: Yes, your suggestion sounds good to me. This behavior would be
similar to our current `timestamps: from-source` design.

Once our discussion has found a conclusion, I would like to volunteer
and summarize the outcome of this mailing thread. It nicely aligns with
the update work on the connector improvements document (that I wanted
to
do anyway) and the ongoing external catalog discussion. Furthermore, I
would also want to propose how to change existing interfaces by keeping
the DDL, connector improvements, and external catalog support in mind.
Would that be ok for you?

Thanks,
Timo



Am 07.12.18 um 14:48 schrieb Fabian Hueske:
Hi all,

Thanks for the discussion.
I'd like to share my point of view as well.

4) Event-Time Attributes and Watermarks:
4.a) I agree with Lin and Jark's proposal. Declaring a watermark on
an
attribute declares it as an event-time attribute.
4.b) Ingesting and writing timestamps to systems (like Kafka). We
could
use
a special function like (ts AS SYSTEMROWTIME()). This function will
indicate that we read the timestamp directly from the system (and not
the
data). We can also write the field back to the system when emitting
the
table (violating the rule that computed fields are not emitted).
4c) I would treat WATERMARK similar to a PRIMARY KEY or UNIQUE KEY
constraint and therefore keep it in the schema definition.
4d) For custom watermark strategies, a simple expressions or
ScalarFunctions won't be sufficient. Sophisticated approaches could
collect
histograms, etc. But I think we can leave that out for later.

3) SOURCE / SINK / BOTH
As you said, there are two things to consider here: permission and
availability of a TableSource/TableSink.
I think that neither should be a reason to add a keyword at such a
sensitive position.
However, I also see Timo's point that it would be good to know
up-front
how
a table can be used without trying to instantiate a TableSource/Sink
for
a
query.
Maybe we can extend the TableFactory such that it provides
information
about which sources/sinks it can provide.

7. Table Update Mode
Something that we definitely need to consider is how tables are
ingested,
i.e., append, retract or upsert.
Especially, since upsert and retraction need a meta-data column that
indicates whether an event is an insert (or upsert) or a delete
change.
This column needs to be identified somehow, most likely as part of
the
input format. Ideally, this column should not be part of the table
schema
(as it would be always true).
Emitting tables is not so much of an issue as the properties of the
table
tell use what to do (append-only/update, unique key y/n).

Best,
Fabian


Am Fr., 7. Dez. 2018 um 10:39 Uhr schrieb Jark Wu <imj...@gmail.com
:
Hi Timo,

Thanks for your quickly feedback! Here are some of my thoughts:

Append, upserts, retract mode on sinks is also a very complex
problem. I
think append/upserts/retract is the ability of a table, user do not
need to
specify a table is used for append or retraction or upsert. The
query
can
choose which mode the sink is. If an unbounded groupby is inserted
into
an
append sink (the sink only implements/supports append), an exception
can be
thrown. A more complex problem is, if we want to write
retractions/upserts
to Kafka, how to encode the change flag (add or retract/delete) on
the
table? Maybe we should propose some protocal for the change flag
encoding,
but I don't have a clear idea about this right now.

3. Sources/Sinks: The source/sink tag is similar to the
append/upsert/retract problem. Besides source/sink, actully we have
stream
source, stream sink, batch source, batch sink, and the stream sink
also
include append/upsert/retract three modes. Should we put all the
tags
on
the CREATE TABLE? IMO, the table's ability is defined by the table
itself,
user do not need to specify it. If it is only a readable table, an
exception can be thrown when write to it. As the source/sink tag can
be
omitted in CREATE TABLE, could we skip it and only support CREATE
TABLE
in
the first version, and add it back in the future when we really need
it? It
keeps API compatible and make sure the MVP is what we consider
clearly.
4a. How to mark a field as attribute?
The watermark definition includes two parts: use which field as time
attribute and use what generate strategy.
When we want to mark `ts` field as attribute: WATERMARK FOR `ts` AS
OFFSET
'5' SECOND.
If we have a POJO{id, user, ts} field named "pojo", we can mark it
like
this: WATERMARK FOR pojo.ts AS OFFSET '5' SECOND

4b. timestamp write to Kafka message header
Even though we can define multiple time attribute on a table, only
one
time
attribute can be actived/used in a query (in a stream). When we
enable
`writeTiemstamp`, the only attribute actived in the stream will be
write to
Kafka message header. What I mean the timestmap in StreamRecord is
the
time
attribute in the stream.

4c. Yes. We introduced the WATERMARK keyword similar to the INDEX,
PRIMARY
KEY keywords.

@Timo, Do you have any other advice or questions on the watermark
syntax ?
For example, the builtin strategy name: "BOUNDED WITH OFFSET" VS
"OFFSET"
VS ...


Cheers,
Jark

On Fri, 7 Dec 2018 at 17:13, Lin Li <lincoln.8...@gmail.com> wrote:

Hi Timo,
Thanks for your feedback, here's some thoughts of mine:

3. Sources/Sinks:
"Let's assume an interactive CLI session, people should be able to
list
all
source table and sink tables to know upfront if they can use an
INSERT
INTO
here or not."
This requirement can be simply resolved by a document that list all
supported source/sink/both connectors and the sql-client can
perform
a
quick check. It's only an implementation choice, not necessary for
the
syntax.
For connector implementation, a connector may implement one or some
or
all
of the [Stream|Batch]Source/[Stream|Batch]Sink traits, we can
derive
the
availability for any give query without the SOURCE/SINk keywords or
specific table properties in WITH clause.
Since there's still indeterminacy, shall we skip these two keywords
for
the
MVP DDL? We can make further discussion after users' feedback.

6. Partitioning and keys
Agree with you that raise the priority of table constraint and
partitioned
table support for better connectivity to Hive and Kafka. I'll add
partitioned table syntax(compatible to hive) into the DDL Draft doc
later[1].

5. Schema declaration
"if users want to declare computed columns they have a "schema"
constraints
but without columns
CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
format.schema-file = "/my/avrofile.avsc") "

  From the point of my view, this ddl is invalid because the primary
key
constraint already references two columns but types unseen.
And Xuefu pointed a important matching problem, so let's put schema
derivation as a follow-up extension ?




Timo Walther <twal...@apache.org> 于2018年12月6日周四 下午6:05写道:

Hi everyone,

great to have such a lively discussion. My next batch of feedback:

@Jark: We don't need to align the descriptor approach with SQL.
I'm
open
for different approaches as long as we can serve a broad set of
use
cases and systems. The descriptor approach was a first attempt to
cover
all aspects and connector/format characteristics. Just another
example,
that is missing in the DDL design: How can a user decide if
append,
retraction, or upserts should be used to sink data into the target
system? Do we want to define all these improtant properties in the
big
WITH property map? If yes, we are already close to the descriptor
approach. Regarding the "standard way", most DDL languages have
very
custom syntax so there is not a real "standard".

3. Sources/Sinks: @Lin: If a table has both read/write access it
can
be
created using a regular CREATE TABLE (omitting a specific
source/sink)
declaration. Regarding the transition from source/sink to both,
yes
we
would need to update the a DDL and catalogs. But is this a
problem?
One
also needs to add new queries that use the tables. @Xuefu: It is
not
only about security aspects. Especially for streaming use cases,
not
every connector can be used as a source easily. For example, a
JDBC
sink
is easier than a JDBC source. Let's assume an interactive CLI
session,
people should be able to list all source table and sink tables to
know
upfront if they can use an INSERT INTO here or not.

6. Partitioning and keys: @Lin: I would like to include this in
the
design given that Hive integration and Kafka key support are in
the
making/are on our roadmap for this release.

5. Schema declaration: @Lin: You are right it is not conflicting.
I
just
wanted to raise the point because if users want to declare
computed
columns they have a "schema" constraints but without columns. Are
we
ok
with a syntax like ...
CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro,
format.schema-file = "/my/avrofile.avsc") ?
@Xuefu: Yes, you are right that an external schema might not
excatly
match but this is true for both directions:
table schema "derives" format schema and format schema "derives"
table
schema.

7. Hive compatibility: @Xuefu: I agree that Hive is popular but we
should not just adopt everything from Hive as there syntax is very
batch-specific. We should come up with a superset of historical
and
future requirements. Supporting Hive queries can be an
intermediate
layer on top of Flink's DDL.

4. Time attributes: @Lin: I'm fine with changing the
TimestampExtractor
interface as this is also important for better separation of
connector
and table module [1]. However, I'm wondering about watermark
generation.
4a. timestamps are in the schema twice:
@Jark: "existing field is Long/Timestamp, we can just use it as
rowtime": yes, but we need to mark a field as such an attribute.
How
does the syntax for marking look like? Also in case of timestamps
that
are nested in the schema?

4b. how can we write out a timestamp into the message header?:
I agree to simply ignore computed columns when writing out. This
is
like
'field-change: add' that I mentioned in the improvements document.
@Jark: "then the timestmap in StreamRecord will be write to Kafka
message header": Unfortunately, there is no timestamp in the
stream
record. Additionally, multiple time attributes can be in a schema.
So
we
need a constraint that tells the sink which column to use
(possibly
computed as well)?

4c. separate all time attribute concerns into a special clause
next
to
the regular schema?
@Jark: I don't have a strong opinion on this. I just have the
feeling
that the "schema part" becomes quite messy because the actual
schema
with types and fields is accompanied by so much metadata about
timestamps, watermarks, keys,... and we would need to introduce a
new
WATERMARK keyword within a schema that was close to standard up to
this
point.

Thanks everyone,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-9461



Am 06.12.18 um 07:08 schrieb Jark Wu:
Hi Timo,

Thank you for the valuable feedbacks.

First of all, I think we don't need to align the SQL
functionality
to
Descriptor. Because SQL is a more standard API, we should be as
cautious
as
possible to extend the SQL syntax. If something can be done in a
standard
way, we shouldn't introduce something new.

Here are some of my thoughts:

1. Scope: Agree.
2. Constraints: Agree.
4. Time attributes:
     4a. timestamps are in the schema twice.
      If an existing field is Long/Timestamp, we can just use it
as
rowtime,
no twice defined. If it is not a Long/Timestamp, we use computed
column
to
get an expected timestamp column to be rowtime, is this what you
mean
defined twice?  But I don't think it is a problem, but an
advantages,
because it is easy to use, user do not need to consider whether
to
"replace
the existing column" or "add a new column", he will not be
confused
what's
the real schema is, what's the index of rowtime in the schema?
Regarding
to
the optimization, even if timestamps are in schema twice, when
the
original
timestamp is never used in query, then the projection pushdown
optimization
can cut this field as early as possible, which is exactly the
same
as
"replacing the existing column" in runtime.

      4b. how can we write out a timestamp into the message
header?
       That's a good point. I think computed column is just a
virtual
column
on table which is only relative to reading. If we want to write
to
a
table
with computed column defined, we only need to provide the columns
except
computed columns (see SQL Server [1]). The computed column is
ignored
in
the insert statement. Get back to the question, how can we write
out
a
timestamp into the message header? IMO, we can provide a
configuration
to
support this, such as `kafka.writeTimestamp=true`, then the
timestmap
in
StreamRecord will be write to Kafka message header. What do you
think?
       4c. separate all time attribute concerns into a special
clause
next
to
the regular schema?
       Separating watermark into a special clause similar to
PARTITIONED
BY is
also a good idea. Conceptually, it's fine to put watermark in
schema
part
or out schema part. But if we want to support multiple watermark
definition, maybe it would be better to put it in schema part. It
is
similar to Index Definition that we can define several indexes
on a
table
in schema part.

       4d. How can people come up with a custom watermark
strategy?
       In most cases, the built-in strategy can works good. If we
need
a
custom one, we can use a scalar function which restrict to only
return
a
nullable Long, and use it in SQL like: WATERMARK for rowtime AS
watermarkUdf(a, b, c). The `watermarkUdf` is a user-defined
scalar
function
accepts 3 parameters and return a nullable Long which can be used
as
punctuated watermark assigner. Another choice is implementing a
class
extending the
`org.apache.flink.table.sources.wmstrategies.WatermarkStrategy`
and
use
it
in SQL: WATERMARK for rowtime AS 'com.my.MyWatermarkStrategy'.
But
if
scalar function can cover the requirements here, I would prefer
it
here,
because it keeps standard compliant. BTW, this feature is not in
MVP,
we
can discuss it more depth in the future when we need it.

5. Schema declaration:
I like the proposal to omit the schema if we can get the schema
from
external storage or something schema file. Actually, we have
already
encountered this requirement in out company.


+1 to @Xuefu that we should be as close as possible to Hive
syntax
while
keeping SQL ANSI standard. This will make it more acceptable and
reduce
the
learning cost for user.

[1]:

https://docs.microsoft.com/en-us/sql/relational-databases/partitions/create-partitioned-tables-and-indexes?view=sql-server-2017
Best,
Jark

On Thu, 6 Dec 2018 at 12:09, Zhang, Xuefu <
xuef...@alibaba-inc.com
wrote:
Hi Timo/Shuyi/Lin,

Thanks for the discussions. It seems that we are converging to
something
meaningful. Here are some of my thoughts:

1. +1 on MVP DDL
3. Markers for source or sink seem more about permissions on
tables
that
belong to a security component. Unless the table is created
differently
based on source, sink, or both, it doesn't seem necessary to use
these
keywords to enforce permissions.
5. It might be okay if schema declaration is always needed.
While
there
might be some duplication sometimes, it's not always true. For
example,
external schema may not be exactly matching Flink schema. For
instance,
data types. Even if so, perfect match is not required. For
instance,
the
external schema file may evolve while table schema in Flink may
stay
unchanged. A responsible reader should be able to scan the file
based
on
file schema and return the data based on table schema.

Other aspects:

7. Hive compatibility. Since Flink SQL will soon be able to
operate
on
Hive metadata and data, it's an add-on benefit if we can be
compatible
with
Hive syntax/semantics while following ANSI standard. At least we
should
be
as close as possible. Hive DDL can found at

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
Thanks,
Xuefu




------------------------------------------------------------------
Sender:Lin Li <lincoln.8...@gmail.com>
Sent at:2018 Dec 6 (Thu) 10:49
Recipient:dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] Flink SQL DDL Design

Hi Timo and Shuyi,
     thanks for your feedback.

1. Scope
agree with you we should focus on the MVP DDL first.

2. Constraints
yes, this can be a follow-up issue.

3. Sources/Sinks
If a TABLE has both read/write access requirements, should we
declare
it
using
`CREATE [SOURCE_SINK|BOTH] TABLE tableName ...` ? A further
question,
if a
TABLE
t1 firstly declared as read only (as a source table), then for
some
new
requirements
t1 will change to a sink table,  in this case we need updating
both
the
DDL
and catalogs.
Further more, let's think about the BATCH query, update one
table
in-place
can be a common case.
e.g.,
```
CREATE TABLE t1 (
     col1 varchar,
     col2 int,
     col3 varchar
     ...
);

INSERT [OVERWRITE] TABLE t1
AS
SELECT
     (some computing ...)
FROM t1;
```
So, let's forget these SOURCE/SINK keywords in DDL. For the
validation
purpose, we can find out other ways.

4. Time attributes
As Shuyi mentioned before, there exists an
`org.apache.flink.table.sources.tsextractors.TimestampExtractor`
for
custom
defined time attributes usage, but this expression based class
is
more
friendly for table api not the SQL.
```
/**
     * Provides the an expression to extract the timestamp for a
rowtime
attribute.
     */
abstract class TimestampExtractor extends FieldComputer[Long]
with
Serializable {

     /** Timestamp extractors compute the timestamp as Long. */
     override def getReturnType: TypeInformation[Long] =
Types.LONG.asInstanceOf[TypeInformation[Long]]
}
```
BTW, I think both the Scalar function and the TimestampExtractor
are
expressing computing logic, the TimestampExtractor has no more
advantage in
SQL scenarios.


6. Partitioning and keys
Primary Key is included in Constraint part, and partitioned
table
support
can be another topic later.

5. Schema declaration
Agree with you that we can do better schema derivation for user
convenience, but this is not conflict with the syntax.
Table properties can carry any useful informations both for the
users
and
the framework, I like your `contract name` proposal,
e.g., `WITH (format.type = avro)`, the framework can recognize
some
`contract name` like `format.type`, `connector.type` and etc.
And also derive the table schema from an existing schema file
can
be
handy
especially one with too many table columns.

Regards,
Lin


Timo Walther <twal...@apache.org> 于2018年12月5日周三 下午10:40写道:

Hi Jark and Shuyi,

thanks for pushing the DDL efforts forward. I agree that we
should
aim
to combine both Shuyi's design and your design.

Here are a couple of concerns that I think we should address in
the
design:
1. Scope: Let's focuses on a MVP DDL for CREATE TABLE
statements
first.
I think this topic has already enough potential for long
discussions
and
is very helpful for users. We can discuss CREATE VIEW and
CREATE
FUNCTION afterwards as they are not related to each other.

2. Constraints: I think we should consider things like
nullability,
VARCHAR length, and decimal scale and precision in the future
as
they
allow for nice optimizations. However, since both the
translation
and
runtime operators do not support those features. I would not
introduce
a
arbitrary default value but omit those parameters for now. This
can
be
a
follow-up issue once the basic DDL has been merged.

3. Sources/Sinks: We had a discussion about CREATE TABLE vs
CREATE
[SOURCE|SINK|] TABLE before. In my opinion we should allow for
these
explicit declaration because in most production scenarios,
teams
have
strict read/write access requirements. For example, a data
science
team
should only consume from a event Kafka topic but should not
accidently
write back to the single source of truth.

4. Time attributes: In general, I like your computed columns
approach
because it makes defining a rowtime attributes transparent and
simple.
However, there are downsides that we should discuss.
4a. Jarks current design means that timestamps are in the
schema
twice.
The design that is mentioned in [1] makes this more flexible as
it
either allows to replace an existing column or add a computed
column.
4b. We need to consider the zoo of storage systems that is out
there
right now. Take Kafka as an example, how can we write out a
timestamp
into the message header? We need to think of a reverse
operation
to a
computed column.
4c. Does defining a watermark really fit into the schema part
of
a
table? Shouldn't we separate all time attribute concerns into a
special
clause next to the regular schema, similar how PARTITIONED BY
does
it
in
Hive?
4d. How can people come up with a custom watermark strategy? I
guess
this can not be implemented in a scalar function and would
require
some
new type of UDF?

6. Partitioning and keys: Another question that the DDL design
should
answer is how do we express primary keys (for upserts),
partitioning
keys (for Hive, Kafka message keys). All part of the table
schema?
5. Schema declaration: I find it very annoying that we want to
force
people to declare all columns and types again even though this
is
usually already defined in some company-wide format. I know
that
catalog
support will greatly improve this. But if no catalog is used,
people
need to manually define a schema with 50+ fields in a Flink
DDL.
What I
actually promoted having two ways of reading data:

1. Either the format derives its schema from the table schema.
CREATE TABLE (col INT) WITH (format.type = avro)

2. Or the table schema can be omitted and the format schema
defines
the
table schema (+ time attributes).
CREATE TABLE WITH (format.type = avro, format.schema-file =
"/my/avrofile.avsc")

Please let me know what you think about each item. I will try
to
incorporate your feedback in [1] this week.

Regards,
Timo

[1]


https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit#heading=h.41fd6rs7b3cf
Am 05.12.18 um 13:01 schrieb Jark Wu:
Hi Shuyi,

It's exciting to see we can make such a great progress here.

Regarding to the watermark:

Watermarks can be defined on any columns (including
computed-column)
in
table schema.
The computed column can be computed from existing columns
using
builtin
functions and *UserDefinedFunctions* (ScalarFunction).
So IMO, it can work out for almost all the scenarios not only
common
scenarios.

I don't think using a `TimestampExtractor` to support custom
timestamp
extractor in SQL is a good idea. Because `TimestampExtractor`
is not a SQL standard function. If we support
`TimestampExtractor`
in
SQL,
do we need to support CREATE FUNCTION for
`TimestampExtractor`?
I think `ScalarFunction` can do the same thing with
`TimestampExtractor`
but more powerful and standard.

The core idea of the watermark definition syntax is that the
schema
part
defines all the columns of the table, it is exactly what the
query
sees.
The watermark part is something like a primary key definition
or
constraint
on SQL Table, it has no side effect on the schema, only
defines
what
watermark strategy is and makes which field as the rowtime
attribute
field.
If the rowtime field is not in the existing fields, we can use
computed
column
to generate it from other existing fields. The Descriptor
Pattern
API
[1]
is very useful when writing a Table API job, but is not
contradictory
to
the
Watermark DDL from my perspective.

[1]:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
.

Best,
Jark

On Wed, 5 Dec 2018 at 17:58, Shuyi Chen <suez1...@gmail.com>
wrote:
Hi Jark and Shaoxuan,

Thanks a lot for the summary. I think we are making great
progress
here.
Below are my thoughts.

*(1) watermark definition
IMO, it's better to keep it consistent with the rowtime
extractors
and
watermark strategies defined in


https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connect.html#rowtime-attributes
.
Using built-in functions seems to be too much for most of the
common
scenarios.
*(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
Actually, I think we can put the source/sink type info into
the
table
properties, so we can use CREATE TABLE.
(3) View DDL with properties
We can remove the view properties section now for the MVP and
add
it
back
later if needed.
(4) Type Definition
I agree we can put the type length or precision into future
versions.
As
for the grammar difference, currently, I am using the grammar
in
Calcite
type DDL, but since we'll extend the parser in Flink, so we
can
definitely
change if needed.

Shuyi

On Tue, Dec 4, 2018 at 10:48 PM Jark Wu <imj...@gmail.com>
wrote:
Hi Shaoxuan,

Thanks for pointing that out. Yes, the source/sink tag on
create
table
is
the another major difference.

Summarize the main differences again:

*(1) watermark definition
*(2) CREATE SOURCE/SINK TABLE or CREATE TABLE
(3) View DDL with properties
(4) Type Definition

Best,
Jark

On Wed, 5 Dec 2018 at 14:08, Shaoxuan Wang <
wshaox...@gmail.com
wrote:
Hi Jark,
Thanks for the summary. Your plan for the 1st round
implementation
of
DDL
looks good to me.
Have we reached the agreement on simplifying/unifying
"create
[source/sink]
table" to "create table"? "Watermark definition" and
"create
table"
are
the
major obstacles on the way to merge two design proposals
FMPOV.
@Shuyi,
It
would be great if you can spend time and respond to these
two
parts
first.
Regards,
Shaoxuan


On Wed, Dec 5, 2018 at 12:20 PM Jark Wu <imj...@gmail.com>
wrote:
Hi Shuyi,

It seems that you have reviewed the DDL doc [1] that Lin
and I
drafted.
This doc covers all the features running in Alibaba.
But some of features might be not needed in the first
version
of
Flink
SQL
DDL.

So my suggestion would be to focus on the MVP DDLs and
reach
agreement
ASAP
based on the DDL draft [1] and the DDL design [2] Shuyi
proposed.
And we can discuss on the main differences one by one.

The following is the MVP DDLs should be included in the
first
version
in
my
opinion (feedbacks are welcome):
(1) Table DDL:
        (1.1) Type definition
        (1.2) computed column definition
        (1.3) watermark definition
        (1.4) with properties
        (1.5) table constraint (primary key/unique)
        (1.6) column nullability (nice to have)
(2) View DDL
(3) Function DDL

The main differences from two DDL docs (sth maybe missed,
welcome
to
point
out):
*(1.3) watermark*: this is the main and the most important
difference,
it
would be great if @Timo Walther <twal...@apache.org>
@Fabian
Hueske
<fhue...@gmail.com>  give some feedbacks.
     (1.1) Type definition:
          (a) Should VARCHAR carry a length, e.g.
VARCHAR(128)
?
               In most cases, the varchar length is not
used
because
they
are
stored as String in Flink. But it can be used to optimize
in
the
future
if
we know the column is a fixed length VARCHAR.
               So IMO, we can support VARCHAR with length
in
the
future,
and
just VARCHAR in this version.
          (b) Should DECIMAL support custom scale and
precision,
e.g.
DECIMAL(12, 5)?
               If we clearly know the scale and precision
of
the
Decimal,
we
can have some optimization on
serialization/deserialization.
IMO,
we
can
support just support DECIMAL in this version,
               which means DECIMAL(38, 18) as default. And
support
custom
scale
and precision in the future.
     (2) View DDL: Do we need WITH properties in View DDL
(proposed
in
doc[2])?
What are the properties on the view used for?


The features could be supported and discussed in the
future:
(1) period definition on table
(2) Type DDL
(3) Index DDL
(4) Library DDL
(5) Drop statement

[1] Flink DDL draft by Lin and Jark:


https://docs.google.com/document/d/1o16jC-AxnZoxMfHQptkKQkSC6ZDDBRhKg6gm8VGnY-k/edit#
[2] Flink SQL DDL design by Shuyi:


https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit#
Cheers,
Jark

On Thu, 29 Nov 2018 at 16:13, Shaoxuan Wang <
wshaox...@gmail.com>
wrote:
Sure Shuyu,
What I hope is that we can reach an agreement on DDL
gramma
as
soon
as
possible. There are a few differences between your
proposal
and
ours.
Once
Lin and Jark propose our design, we can quickly discuss
on
the
those
differences, and see how far away towards a unified
design.
WRT the external catalog, I think it is an orthogonal
topic,
we
can
design
it in parallel. I believe @Xuefu, @Bowen are already
working
on.
We
should/will definitely involve them to review the final
design
of
DDL
implementation. I would suggest that we should give it a
higher
priority
on
the DDL implementation, as it is a crucial component for
the
user
experience of SQL_CLI.

Regards,
Shaoxuan



On Thu, Nov 29, 2018 at 6:56 AM Shuyi Chen <
suez1...@gmail.com
wrote:
Thanks a lot, Shaoxuan, Jack and Lin. We should
definitely
collaborate
here, we have also our own DDL implementation running in
production
for
almost 2 years at Uber. With the joint experience from
both
companies,
we
can definitely make the Flink SQL DDL better.

As @shaoxuan suggest, Jark can come up with a doc that
talks
about
the
current DDL design in Alibaba, and we can discuss and
merge
them
into
one,
make it as a FLIP, and plan the tasks for
implementation.
Also,
we
should
take into account the new external catalog effort in the
design.
What
do
you guys think?

Shuyi

On Wed, Nov 28, 2018 at 6:45 AM Jark Wu <
imj...@gmail.com
wrote:
Hi Shaoxuan,

I think summarizing it into a google doc is a good
idea.
We
will
prepare
it
in the next few days.

Thanks,
Jark

Shaoxuan Wang <wshaox...@gmail.com> 于2018年11月28日周三
下午9:17写道:
Hi Lin and Jark,
Thanks for sharing those details. Can you please
consider
summarizing
your
DDL design into a google doc.
We can still continue the discussions on Shuyi's
proposal.
But
having a
separate google doc will be easy for the DEV to
understand/comment/discuss
on your proposed DDL implementation.

Regards,
Shaoxuan


On Wed, Nov 28, 2018 at 7:39 PM Jark Wu <
imj...@gmail.com
wrote:
Hi Shuyi,

Thanks for bringing up this discussion and the
awesome
work!
I
have
left
some comments in the doc.

I want to share something more about the watermark
definition
learned
from
Alibaba.

       1.

       Table should be able to accept multiple
watermark
definition.
       Because a table may have more than one rowtime
field.
For
example,
one
       rowtime field is from existing field but
missing
in
some
records,
another
       is the ingestion timestamp in Kafka but not
very
accurate.
In
this
case,
       user may define two rowtime fields with
watermarks
in
the
Table
and
choose
       one in different situation.
       2.

       Watermark stragety always work with rowtime
field
together.
Based on the two points metioned above, I think we
should
combine
the
watermark strategy and rowtime field selection (i.e.
which
existing
field
used to generate watermark) in one clause, so that we
can
define
multiple
watermarks in one Table.

Here I will share the watermark syntax used in
Alibaba
(simply
modified):
watermarkDefinition:
WATERMARK [watermarkName] FOR <rowtime_field> AS
wm_strategy
wm_strategy:
      BOUNDED WITH OFFSET 'string' timeUnit
|
      ASCENDING

The “WATERMARK” keyword starts a watermark
definition.
The
“FOR”
keyword
defines which existing field used to generate
watermark,
this
field
should
already exist in the schema (we can use
computed-column
to
derive
from
other fields). The “AS” keyword defines watermark
strategy,
such
as
BOUNDED
WITH OFFSET (covers almost all the requirements) and
ASCENDING.
When the expected rowtime field does not exist in the
schema,
we
can
use
computed-column syntax to derive it from other
existing
fields
using
built-in functions or user defined functions. So the
rowtime/watermark
definition doesn’t need to care about “field-change”
strategy
(replace/add/from-field). And the proctime field
definition
can
also
be
defined using computed-column. Such as pt as
PROCTIME()
which
defines a
proctime field named “pt” in the schema.

Looking forward to working with you guys!

Best,
Jark Wu


Lin Li <lincoln.8...@gmail.com> 于2018年11月28日周三
下午6:33写道:
@Shuyi
Thanks for the proposal!  We have a simple DDL
implementation
(extends
Calcite's parser) which been running for almost two
years
on
production
and
works well.
I think the most valued things we'd learned is
keeping
simplicity
and
standard compliance.
Here's the approximate grammar, FYI
CREATE TABLE

CREATE TABLE tableName(
            columnDefinition [, columnDefinition]*
            [ computedColumnDefinition [,
computedColumnDefinition]*
]
            [ tableConstraint [, tableConstraint]* ]
            [ tableIndex [, tableIndex]* ]
        [ PERIOD FOR SYSTEM_TIME ]
            [ WATERMARK watermarkName FOR
rowTimeColumn
AS
withOffset(rowTimeColumn, offset) ]     ) [ WITH (
tableOption
[
,
tableOption]* ) ] [ ; ]

columnDefinition ::=
            columnName dataType [ NOT NULL ]

dataType  ::=
            {
              [ VARCHAR ]
              | [ BOOLEAN ]
              | [ TINYINT ]
              | [ SMALLINT ]
              | [ INT ]
              | [ BIGINT ]
              | [ FLOAT ]
              | [ DECIMAL ]
              | [ DOUBLE ]
              | [ DATE ]
              | [ TIME ]
              | [ TIMESTAMP ]
              | [ VARBINARY ]
            }

computedColumnDefinition ::=
            columnName AS computedColumnExpression

tableConstraint ::=
        { PRIMARY KEY | UNIQUE }
            (columnName [, columnName]* )

tableIndex ::=
            [ UNIQUE ] INDEX indexName
             (columnName [, columnName]* )

rowTimeColumn ::=
            columnName

tableOption ::=
            property=value
            offset ::=
            positive integer (unit: ms)

CREATE VIEW

CREATE VIEW viewName
      [
            ( columnName [, columnName]* )
      ]
            AS queryStatement;

CREATE FUNCTION

     CREATE FUNCTION functionName
      AS 'className';

     className ::=
            fully qualified name


Shuyi Chen <suez1...@gmail.com> 于2018年11月28日周三
上午3:28写道:
Thanks a lot, Timo and Xuefu. Yes, I think we can
finalize
the
design
doc
first and start implementation w/o the unified
connector
API
ready
by
skipping some featue.

Xuefu, I like the idea of making Flink specific
properties
into
generic
key-value pairs, so that it will make integration
with
Hive
DDL
(or
others,
e.g. Beam DDL) easier.

I'll run a final pass over the design doc and
finalize
the
design
in
the
next few days. And we can start creating tasks and
collaborate
on
the
implementation. Thanks a lot for all the comments
and
inputs.
Cheers!
Shuyi

On Tue, Nov 27, 2018 at 7:02 AM Zhang, Xuefu <
xuef...@alibaba-inc.com>
wrote:

Yeah! I agree with Timo that DDL can actually
proceed
w/o
being
blocked
by
connector API. We can leave the unknown out while
defining
the
basic
syntax.
@Shuyi

As commented in the doc, I think we can probably
stick
with
simple
syntax
with general properties, without extending the
syntax
too
much
that
it
mimics the descriptor API.

Part of our effort on Flink-Hive integration is
also
to
make
DDL
syntax
compatible with Hive's. The one in the current
proposal
seems
making
our
effort more challenging.

We can help and collaborate. At this moment, I
think
we
can
finalize
on
the proposal and then we can divide the tasks for
better
collaboration.
Please let me know if there are  any questions or
suggestions.
Thanks,
Xuefu





------------------------------------------------------------------
Sender:Timo Walther <twal...@apache.org>
Sent at:2018 Nov 27 (Tue) 16:21
Recipient:dev <dev@flink.apache.org>
Subject:Re: [DISCUSS] Flink SQL DDL Design

Thanks for offering your help here, Xuefu. It
would
be
great
to
move
these efforts forward. I agree that the DDL is
somehow
releated
to
the
unified connector API design but we can also start
with
the
basic
functionality now and evolve the DDL during this
release
and
next
releases.
For example, we could identify the MVP DDL syntax
that
skips
defining
key constraints and maybe even time attributes.
This
DDL
could
be
used
for batch usecases, ETL, and materializing SQL
queries
(no
time
operations like windows).

The unified connector API is high on our priority
list
for
the
1.8
release. I will try to update the document until
mid
of
next
week.
Regards,

Timo


Am 27.11.18 um 08:08 schrieb Shuyi Chen:
Thanks a lot, Xuefu. I was busy for some other
stuff
for
the
last 2
weeks,
but we are definitely interested in moving this
forward.
I
think
once
the
unified connector API design [1] is done, we can
finalize
the
DDL
design
as
well and start creating concrete subtasks to
collaborate
on
the
implementation with the community.

Shuyi

[1]

https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
On Mon, Nov 26, 2018 at 7:01 PM Zhang, Xuefu <
xuef...@alibaba-inc.com>
wrote:

Hi Shuyi,

I'm wondering if you folks still have the
bandwidth
working
on
this.
We have some dedicated resource and like to move
this
forward.
We
can
collaborate.

Thanks,

Xuefu



------------------------------------------------------------------
发件人:wenlong.lwl<wenlong88....@gmail.com>
日 期:2018年11月05日 11:15:35
收件人:<dev@flink.apache.org>
主 题:Re: [DISCUSS] Flink SQL DDL Design

Hi, Shuyi, thanks for the proposal.

I have two concerns about the table ddl:

1. how about remove the source/sink mark from
the
ddl,
because
it
is
not
necessary, the framework determine the table
referred
is a
source
or a
sink
according to the context of the query using the
table.
it
will
be
more
convenient for use defining a table which can be
both
a
source
and
sink,
and more convenient for catalog to persistent
and
manage
the
meta
infos.
2. how about just keeping one pure string map as
parameters
for
table,
like
create tabe Kafka10SourceTable (
intField INTEGER,
stringField VARCHAR(128),
longField BIGINT,
rowTimeField TIMESTAMP
) with (
connector.type = ’kafka’,
connector.property-version = ’1’,
connector.version = ’0.10’,
connector.properties.topic = ‘test-kafka-topic’,
connector.properties.startup-mode =
‘latest-offset’,
connector.properties.specific-offset = ‘offset’,
format.type = 'json'
format.prperties.version=’1’,
format.derive-schema = 'true'
);
Because:
1. in TableFactory, what user use is a string
map
properties,
defining
parameters by string-map can be the closest way
to
mapping
how
user
use
the
parameters.
2. The table descriptor can be extended by user,
like
what
is
done
in
Kafka
and Json, it means that the parameter keys in
connector
or
format
can
be
different in different implementation, we can
not
restrict
the
key
in
a
specified set, so we need a map in connector
scope
and a
map
in
connector.properties scope. why not just give
user a
single
map,
let
them
put parameters in a format they like, which is
also
the
simplest
way
to
implement DDL parser.
3. whether we can define a format clause or not,
depends
on
the
implementation of the connector, using different
clause
in
DDL
may
make
a
misunderstanding that we can combine the
connectors
with
arbitrary
formats,
which may not work actually.

On Sun, 4 Nov 2018 at 18:25, Dominik Wosiński <
wos...@gmail.com
wrote:
+1, Thanks for the proposal.

I guess this is a long-awaited change. This can
vastly
increase
the
functionalities of the SQL Client as it will be
possible
to
use
complex
extensions like for example those provided by
Apache
Bahir[1].
Best Regards,
Dom.

[1]
https://github.com/apache/bahir-flink

sob., 3 lis 2018 o 17:17 Rong Rong <
walter...@gmail.com>
napisał(a):
+1. Thanks for putting the proposal together
Shuyi.
DDL has been brought up in a couple of times
previously
[1,2].
Utilizing
DDL will definitely be a great extension to
the
current
Flink
SQL
to
systematically support some of the previously
brought
up
features
such
as
[3]. And it will also be beneficial to see the
document
closely
aligned
with the previous discussion for unified SQL
connector
API
[4].
I also left a few comments on the doc. Looking
forward
to
the
alignment
with the other couple of efforts and
contributing
to
them!
Best,
Rong

[1]


http://mail-archives.apache.org/mod_mbox/flink-dev/201805.mbox/%3CCAMZk55ZTJA7MkCK1Qu4gLPu1P9neqCfHZtTcgLfrFjfO4Xv5YQ%40mail.gmail.com%3E
[2]


http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3CDC070534-0782-4AFD-8A85-8A82B384B8F7%40gmail.com%3E
[3]
https://issues.apache.org/jira/browse/FLINK-8003
[4]


http://mail-archives.apache.org/mod_mbox/flink-dev/201810.mbox/%3c6676cb66-6f31-23e1-eff5-2e9c19f88...@apache.org%3E
On Fri, Nov 2, 2018 at 10:22 AM Bowen Li <
bowenl...@gmail.com
wrote:
Thanks Shuyi!

I left some comments there. I think the
design
of
SQL
DDL
and
Flink-Hive
integration/External catalog enhancements
will
work
closely
with
each
other. Hope we are well aligned on the
directions
of
the
two
designs,
and I
look forward to working with you guys on
both!
Bowen


On Thu, Nov 1, 2018 at 10:57 PM Shuyi Chen <
suez1...@gmail.com
wrote:
Hi everyone,

SQL DDL support has been a long-time ask
from
the
community.
Current
Flink
SQL support only DML (e.g. SELECT and INSERT
statements).
In
its
current
form, Flink SQL users still need to
define/create
table
sources
and
sinks
programmatically in Java/Scala. Also, in SQL
Client,
without
DDL
support,
the current implementation does not allow
dynamical
creation
of
table,
type
or functions with SQL, this adds friction
for
its
adoption.
I drafted a design doc [1] with a few other
community
members
that
proposes
the design and implementation for adding DDL
support
in
Flink.
The
initial
design considers DDL for table, view, type,
library
and
function.
It
will
be great to get feedback on the design from
the
community,
and
align
with
latest effort in unified SQL connector API
[2]
and
Flink
Hive
integration
[3].

Any feedback is highly appreciated.

Thanks
Shuyi Chen

[1]


https://docs.google.com/document/d/1TTP-GCC8wSsibJaSUyFZ_5NBAHYEB1FVmPpP7RgDGBA/edit?usp=sharing
[2]


https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?usp=sharing
[3]


https://docs.google.com/document/d/1SkppRD_rE3uOKSN-LuZCqn4f7dz0zW5aa6T_hBZq5_o/edit?usp=sharing
--
"So you have to trust that the dots will
somehow
connect
in
your
future."
--
"So you have to trust that the dots will somehow
connect
in
your
future."
--
"So you have to trust that the dots will somehow connect
in
your
future."
--
"So you have to trust that the dots will somehow connect in
your
future."


--
"So you have to trust that the dots will somehow connect in your future."


Reply via email to