Hi, that would be regular SQL cast syntax:
SELECT a, b, c, CAST(eventTime AS TIMESTAMP) FROM ... Am Di., 10. Sept. 2019 um 18:07 Uhr schrieb Niels Basjes <ni...@basjes.nl>: > Hi. > > Can you give me an example of the actual syntax of such a cast? > > On Tue, 10 Sep 2019, 16:30 Fabian Hueske, <fhue...@gmail.com> wrote: > >> Hi Niels, >> >> I think (not 100% sure) you could also cast the event time attribute to >> TIMESTAMP before you emit the table. >> This should remove the event time property (and thereby the >> TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output >> types. >> >> Best, Fabian >> >> Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes <ni...@basjes.nl >> >: >> >>> Hi, >>> >>> It has taken me quite a bit of time to figure this out. >>> This is the solution I have now (works on my machine). >>> >>> Please tell me where I can improve this. >>> >>> Turns out that the schema you provide for registerDataStream only needs >>> the 'top level' fields of the Avro datastructure. >>> With only the top fields there you can still access nested fields with >>> something like "topfield.x.y.z" in the SQL statement. >>> >>> What I found is that the easiest way to make this all work is to ensure >>> the rowtime field in the structure is at the top level (which makes sense >>> in general) and generate the fields string where I only need to know the >>> name of the "rowtime" field. >>> >>> So I have >>> >>> DataStream<Measurement> inputStream = ... >>> >>> >>> then I register the stream with >>> >>> >>> TypeInformation<Measurement> typeInformation = >>> TypeInformation.of(Measurement.class); >>> String [] fieldNames = TableEnvironment.getFieldNames(typeInformation); >>> >>> List<String> rootSchema = new ArrayList<>(); >>> for (String fieldName: fieldNames) { >>> if (rowtimeFieldName.equals(fieldName)) { >>> rootSchema.add(fieldName + ".rowtime"); >>> } else { >>> rootSchema.add(fieldName); >>> } >>> } >>> >>> tableEnv.registerDataStream("MeasurementStream", inputStream, >>> String.join(",", rootSchema)); >>> >>> >>> Now after the actual SQL has been executed I have a >>> >>> Table resultTable = ... >>> >>> Now simply feeding this into a DataStream with something like this fails >>> badly. >>> >>> TypeInformation<Row> tupleType = new >>> RowTypeInfo(resultTable.getSchema().getFieldTypes()); >>> DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, >>> tupleType); >>> >>> will result in >>> >>> org.apache.flink.table.api.TableException: The time indicator type is >>> an internal type only. >>> at >>> org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172) >>> >>> Turns out that the schema of the output contains a field that was >>> created by TUMBLE_START which is of type TimeIndicatorTypeInfo >>> >>> So I have to do it this way (NASTY!): >>> >>> final TypeInformation<?>[] fieldTypes = >>> resultTable.getSchema().getFieldTypes(); >>> int index; >>> for(index = 0 ; index < fieldTypes.length ; index++) { >>> if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) { >>> fieldTypes[index] = SQL_TIMESTAMP; >>> } >>> } >>> TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes); >>> DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, >>> tupleType); >>> >>> Which gives me the desired DataStream. >>> >>> >>> Niels Basjes >>> >>> >>> >>> >>> >>> On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <twal...@apache.org> wrote: >>> >>>> Hi Niels, >>>> >>>> if you are coming from DataStream API, all you need to do is to write a >>>> timestamp extractor. >>>> >>>> When you call: >>>> >>>> tableEnv.registerDataStream("TestStream", letterStream, >>>> "EventTime.rowtime, letter, counter"); >>>> >>>> The ".rowtime" means that the framework will extract the rowtime from >>>> the stream record timestamp. You don't need to name all fields again but >>>> could simply construct a string from >>>> letterStream.getTypeInfo().getFieldNames(). I hope we can improve this >>>> further in the future as part of FLIP-37. >>>> >>>> Regards, >>>> Timo >>>> >>>> Am 14.08.19 um 17:00 schrieb Niels Basjes: >>>> >>>> Hi, >>>> >>>> Experimenting with the StreamTableEnvironment I build something like >>>> this: >>>> >>>> DataStream<Tuple3<Long, String, Long>> letterStream = ... >>>> tableEnv.registerDataStream("TestStream", letterStream, >>>> "EventTime.rowtime, letter, counter"); >>>> >>>> >>>> Because the "EventTime" was tagged with ".rowtime" it is now being used >>>> as the rowtime and has the DATETIME so I can do this >>>> >>>> TUMBLE_START(eventTime, INTERVAL '1' MINUTE) >>>> >>>> >>>> So far so good. >>>> >>>> Working towards a more realistic scenario I have a source that produces >>>> a stream of records that have been defined using Apache Avro. >>>> >>>> So I have a Measurement.avdl that (among other things) contains >>>> something like this: >>>> >>>> record Measurement { >>>> /** The time (epoch in milliseconds since 1970-01-01 UTC) when the >>>> event occurred */ >>>> long timestamp; >>>> string letter; >>>> long pageviews; >>>> } >>>> >>>> >>>> Now because the registerDataStream call can also derive the schema from >>>> the provided data I can do this: >>>> >>>> DataStream<Measurement> inputStream = ... >>>> tableEnv.registerDataStream("DataStream", inputStream); >>>> >>>> >>>> This is very nice because any real schema is big (few hundred columns) >>>> and changes over time. >>>> >>>> Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a >>>> consequence I get this error >>>> >>>> Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL >>>> MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' >>>> >>>> >>>> So far I have now yet figured how to make the system understand that >>>> the timestamp column show be treated as the rowtime. >>>> How do I do that? >>>> >>>> -- >>>> Best regards / Met vriendelijke groeten, >>>> >>>> Niels Basjes >>>> >>>> >>>> >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >>> >>