Hi.

Can you give me an example of the actual syntax of such a cast?

On Tue, 10 Sep 2019, 16:30 Fabian Hueske, <fhue...@gmail.com> wrote:

> Hi Niels,
>
> I think (not 100% sure) you could also cast the event time attribute to
> TIMESTAMP before you emit the table.
> This should remove the event time property (and thereby the
> TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output
> types.
>
> Best, Fabian
>
> Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes <ni...@basjes.nl>:
>
>> Hi,
>>
>> It has taken me quite a bit of time to figure this out.
>> This is the solution I have now (works on my machine).
>>
>> Please tell me where I can improve this.
>>
>> Turns out that the schema you provide for registerDataStream only needs
>> the 'top level' fields of the Avro datastructure.
>> With only the top fields there you can still access nested fields with
>> something like "topfield.x.y.z" in the SQL statement.
>>
>> What I found is that the easiest way to make this all work is to ensure
>> the rowtime field in the structure is at the top level (which makes sense
>> in general) and generate the fields string where I only need to know the
>> name of the "rowtime" field.
>>
>> So I have
>>
>>     DataStream<Measurement> inputStream = ...
>>
>>
>> then I register the stream with
>>
>>
>>     TypeInformation<Measurement> typeInformation = 
>> TypeInformation.of(Measurement.class);
>>     String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
>>
>>     List<String> rootSchema = new ArrayList<>();
>>     for (String fieldName: fieldNames) {
>>         if (rowtimeFieldName.equals(fieldName)) {
>>             rootSchema.add(fieldName + ".rowtime");
>>         } else {
>>             rootSchema.add(fieldName);
>>         }
>>     }
>>
>>     tableEnv.registerDataStream("MeasurementStream", inputStream, 
>> String.join(",", rootSchema));
>>
>>
>> Now after the actual SQL has been executed I have a
>>
>>     Table resultTable = ...
>>
>> Now simply feeding this into a DataStream with something like this fails
>> badly.
>>
>>     TypeInformation<Row> tupleType = new 
>> RowTypeInfo(resultTable.getSchema().getFieldTypes());
>>     DataStream<Row>      resultSet = tableEnv.toAppendStream(resultTable, 
>> tupleType);
>>
>> will result in
>>
>>     org.apache.flink.table.api.TableException: The time indicator type is an 
>> internal type only.
>>        at 
>> org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
>>
>> Turns out that the schema of the output contains a field that was created
>> by TUMBLE_START which is of type TimeIndicatorTypeInfo
>>
>> So I have to do it this way (NASTY!):
>>
>>     final TypeInformation<?>[] fieldTypes = 
>> resultTable.getSchema().getFieldTypes();
>>     int index;
>>     for(index = 0 ; index < fieldTypes.length ; index++) {
>>         if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
>>            fieldTypes[index] = SQL_TIMESTAMP;
>>         }
>>     }
>>     TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes);
>>     DataStream<Row>      resultSet = tableEnv.toAppendStream(resultTable, 
>> tupleType);
>>
>> Which gives me the desired DataStream.
>>
>>
>> Niels Basjes
>>
>>
>>
>>
>>
>> On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <twal...@apache.org> wrote:
>>
>>> Hi Niels,
>>>
>>> if you are coming from DataStream API, all you need to do is to write a
>>> timestamp extractor.
>>>
>>> When you call:
>>>
>>> tableEnv.registerDataStream("TestStream", letterStream,
>>> "EventTime.rowtime, letter, counter");
>>>
>>> The ".rowtime" means that the framework will extract the rowtime from
>>> the stream record timestamp. You don't need to name all fields again but
>>> could simply construct a string from
>>> letterStream.getTypeInfo().getFieldNames(). I hope we can improve this
>>> further in the future as part of FLIP-37.
>>>
>>> Regards,
>>> Timo
>>>
>>> Am 14.08.19 um 17:00 schrieb Niels Basjes:
>>>
>>> Hi,
>>>
>>> Experimenting with the StreamTableEnvironment I build something like
>>> this:
>>>
>>> DataStream<Tuple3<Long, String, Long>> letterStream = ...
>>> tableEnv.registerDataStream("TestStream", letterStream,
>>> "EventTime.rowtime, letter, counter");
>>>
>>>
>>> Because the "EventTime" was tagged with ".rowtime" it is now being used
>>> as the rowtime and has the DATETIME so I can do this
>>>
>>> TUMBLE_START(eventTime, INTERVAL '1' MINUTE)
>>>
>>>
>>> So far so good.
>>>
>>> Working towards a more realistic scenario I have a source that produces
>>> a stream of records that have been defined using Apache Avro.
>>>
>>> So I have a Measurement.avdl that (among other things) contains
>>> something like this:
>>>
>>> record Measurement {
>>>    /** The time (epoch in milliseconds since 1970-01-01 UTC) when the
>>> event occurred */
>>>     long                        timestamp;
>>>     string                      letter;
>>>     long                        pageviews;
>>> }
>>>
>>>
>>> Now because the registerDataStream call can also derive the schema from
>>> the provided data I can do this:
>>>
>>> DataStream<Measurement> inputStream = ...
>>> tableEnv.registerDataStream("DataStream", inputStream);
>>>
>>>
>>> This is very nice because any real schema is big (few hundred columns)
>>> and changes over time.
>>>
>>> Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a
>>> consequence I get this error
>>>
>>> Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL
>>> MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
>>>
>>>
>>> So far I have now yet figured how to make the system understand that the
>>> timestamp column show be treated as the rowtime.
>>> How do I do that?
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>>
>>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>

Reply via email to