Hi,

that would be regular SQL cast syntax:

SELECT a, b, c, CAST(eventTime AS TIMESTAMP) FROM ...


Am Di., 10. Sept. 2019 um 18:07 Uhr schrieb Niels Basjes <ni...@basjes.nl>:

> Hi.
>
> Can you give me an example of the actual syntax of such a cast?
>
> On Tue, 10 Sep 2019, 16:30 Fabian Hueske, <fhue...@gmail.com> wrote:
>
>> Hi Niels,
>>
>> I think (not 100% sure) you could also cast the event time attribute to
>> TIMESTAMP before you emit the table.
>> This should remove the event time property (and thereby the
>> TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output
>> types.
>>
>> Best, Fabian
>>
>> Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes <ni...@basjes.nl
>> >:
>>
>>> Hi,
>>>
>>> It has taken me quite a bit of time to figure this out.
>>> This is the solution I have now (works on my machine).
>>>
>>> Please tell me where I can improve this.
>>>
>>> Turns out that the schema you provide for registerDataStream only needs
>>> the 'top level' fields of the Avro datastructure.
>>> With only the top fields there you can still access nested fields with
>>> something like "topfield.x.y.z" in the SQL statement.
>>>
>>> What I found is that the easiest way to make this all work is to ensure
>>> the rowtime field in the structure is at the top level (which makes sense
>>> in general) and generate the fields string where I only need to know the
>>> name of the "rowtime" field.
>>>
>>> So I have
>>>
>>>     DataStream<Measurement> inputStream = ...
>>>
>>>
>>> then I register the stream with
>>>
>>>
>>>     TypeInformation<Measurement> typeInformation = 
>>> TypeInformation.of(Measurement.class);
>>>     String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
>>>
>>>     List<String> rootSchema = new ArrayList<>();
>>>     for (String fieldName: fieldNames) {
>>>         if (rowtimeFieldName.equals(fieldName)) {
>>>             rootSchema.add(fieldName + ".rowtime");
>>>         } else {
>>>             rootSchema.add(fieldName);
>>>         }
>>>     }
>>>
>>>     tableEnv.registerDataStream("MeasurementStream", inputStream, 
>>> String.join(",", rootSchema));
>>>
>>>
>>> Now after the actual SQL has been executed I have a
>>>
>>>     Table resultTable = ...
>>>
>>> Now simply feeding this into a DataStream with something like this fails
>>> badly.
>>>
>>>     TypeInformation<Row> tupleType = new 
>>> RowTypeInfo(resultTable.getSchema().getFieldTypes());
>>>     DataStream<Row>      resultSet = tableEnv.toAppendStream(resultTable, 
>>> tupleType);
>>>
>>> will result in
>>>
>>>     org.apache.flink.table.api.TableException: The time indicator type is 
>>> an internal type only.
>>>        at 
>>> org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
>>>
>>> Turns out that the schema of the output contains a field that was
>>> created by TUMBLE_START which is of type TimeIndicatorTypeInfo
>>>
>>> So I have to do it this way (NASTY!):
>>>
>>>     final TypeInformation<?>[] fieldTypes = 
>>> resultTable.getSchema().getFieldTypes();
>>>     int index;
>>>     for(index = 0 ; index < fieldTypes.length ; index++) {
>>>         if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
>>>            fieldTypes[index] = SQL_TIMESTAMP;
>>>         }
>>>     }
>>>     TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes);
>>>     DataStream<Row>      resultSet = tableEnv.toAppendStream(resultTable, 
>>> tupleType);
>>>
>>> Which gives me the desired DataStream.
>>>
>>>
>>> Niels Basjes
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <twal...@apache.org> wrote:
>>>
>>>> Hi Niels,
>>>>
>>>> if you are coming from DataStream API, all you need to do is to write a
>>>> timestamp extractor.
>>>>
>>>> When you call:
>>>>
>>>> tableEnv.registerDataStream("TestStream", letterStream,
>>>> "EventTime.rowtime, letter, counter");
>>>>
>>>> The ".rowtime" means that the framework will extract the rowtime from
>>>> the stream record timestamp. You don't need to name all fields again but
>>>> could simply construct a string from
>>>> letterStream.getTypeInfo().getFieldNames(). I hope we can improve this
>>>> further in the future as part of FLIP-37.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> Am 14.08.19 um 17:00 schrieb Niels Basjes:
>>>>
>>>> Hi,
>>>>
>>>> Experimenting with the StreamTableEnvironment I build something like
>>>> this:
>>>>
>>>> DataStream<Tuple3<Long, String, Long>> letterStream = ...
>>>> tableEnv.registerDataStream("TestStream", letterStream,
>>>> "EventTime.rowtime, letter, counter");
>>>>
>>>>
>>>> Because the "EventTime" was tagged with ".rowtime" it is now being used
>>>> as the rowtime and has the DATETIME so I can do this
>>>>
>>>> TUMBLE_START(eventTime, INTERVAL '1' MINUTE)
>>>>
>>>>
>>>> So far so good.
>>>>
>>>> Working towards a more realistic scenario I have a source that produces
>>>> a stream of records that have been defined using Apache Avro.
>>>>
>>>> So I have a Measurement.avdl that (among other things) contains
>>>> something like this:
>>>>
>>>> record Measurement {
>>>>    /** The time (epoch in milliseconds since 1970-01-01 UTC) when the
>>>> event occurred */
>>>>     long                        timestamp;
>>>>     string                      letter;
>>>>     long                        pageviews;
>>>> }
>>>>
>>>>
>>>> Now because the registerDataStream call can also derive the schema from
>>>> the provided data I can do this:
>>>>
>>>> DataStream<Measurement> inputStream = ...
>>>> tableEnv.registerDataStream("DataStream", inputStream);
>>>>
>>>>
>>>> This is very nice because any real schema is big (few hundred columns)
>>>> and changes over time.
>>>>
>>>> Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a
>>>> consequence I get this error
>>>>
>>>> Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL
>>>> MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
>>>>
>>>>
>>>> So far I have now yet figured how to make the system understand that
>>>> the timestamp column show be treated as the rowtime.
>>>> How do I do that?
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>>
>>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>

Reply via email to