Hi. Can you give me an example of the actual syntax of such a cast?
On Tue, 10 Sep 2019, 16:30 Fabian Hueske, <fhue...@gmail.com> wrote: > Hi Niels, > > I think (not 100% sure) you could also cast the event time attribute to > TIMESTAMP before you emit the table. > This should remove the event time property (and thereby the > TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output > types. > > Best, Fabian > > Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes <ni...@basjes.nl>: > >> Hi, >> >> It has taken me quite a bit of time to figure this out. >> This is the solution I have now (works on my machine). >> >> Please tell me where I can improve this. >> >> Turns out that the schema you provide for registerDataStream only needs >> the 'top level' fields of the Avro datastructure. >> With only the top fields there you can still access nested fields with >> something like "topfield.x.y.z" in the SQL statement. >> >> What I found is that the easiest way to make this all work is to ensure >> the rowtime field in the structure is at the top level (which makes sense >> in general) and generate the fields string where I only need to know the >> name of the "rowtime" field. >> >> So I have >> >> DataStream<Measurement> inputStream = ... >> >> >> then I register the stream with >> >> >> TypeInformation<Measurement> typeInformation = >> TypeInformation.of(Measurement.class); >> String [] fieldNames = TableEnvironment.getFieldNames(typeInformation); >> >> List<String> rootSchema = new ArrayList<>(); >> for (String fieldName: fieldNames) { >> if (rowtimeFieldName.equals(fieldName)) { >> rootSchema.add(fieldName + ".rowtime"); >> } else { >> rootSchema.add(fieldName); >> } >> } >> >> tableEnv.registerDataStream("MeasurementStream", inputStream, >> String.join(",", rootSchema)); >> >> >> Now after the actual SQL has been executed I have a >> >> Table resultTable = ... >> >> Now simply feeding this into a DataStream with something like this fails >> badly. >> >> TypeInformation<Row> tupleType = new >> RowTypeInfo(resultTable.getSchema().getFieldTypes()); >> DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, >> tupleType); >> >> will result in >> >> org.apache.flink.table.api.TableException: The time indicator type is an >> internal type only. >> at >> org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172) >> >> Turns out that the schema of the output contains a field that was created >> by TUMBLE_START which is of type TimeIndicatorTypeInfo >> >> So I have to do it this way (NASTY!): >> >> final TypeInformation<?>[] fieldTypes = >> resultTable.getSchema().getFieldTypes(); >> int index; >> for(index = 0 ; index < fieldTypes.length ; index++) { >> if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) { >> fieldTypes[index] = SQL_TIMESTAMP; >> } >> } >> TypeInformation<Row> tupleType = new RowTypeInfo(fieldTypes); >> DataStream<Row> resultSet = tableEnv.toAppendStream(resultTable, >> tupleType); >> >> Which gives me the desired DataStream. >> >> >> Niels Basjes >> >> >> >> >> >> On Wed, Aug 14, 2019 at 5:13 PM Timo Walther <twal...@apache.org> wrote: >> >>> Hi Niels, >>> >>> if you are coming from DataStream API, all you need to do is to write a >>> timestamp extractor. >>> >>> When you call: >>> >>> tableEnv.registerDataStream("TestStream", letterStream, >>> "EventTime.rowtime, letter, counter"); >>> >>> The ".rowtime" means that the framework will extract the rowtime from >>> the stream record timestamp. You don't need to name all fields again but >>> could simply construct a string from >>> letterStream.getTypeInfo().getFieldNames(). I hope we can improve this >>> further in the future as part of FLIP-37. >>> >>> Regards, >>> Timo >>> >>> Am 14.08.19 um 17:00 schrieb Niels Basjes: >>> >>> Hi, >>> >>> Experimenting with the StreamTableEnvironment I build something like >>> this: >>> >>> DataStream<Tuple3<Long, String, Long>> letterStream = ... >>> tableEnv.registerDataStream("TestStream", letterStream, >>> "EventTime.rowtime, letter, counter"); >>> >>> >>> Because the "EventTime" was tagged with ".rowtime" it is now being used >>> as the rowtime and has the DATETIME so I can do this >>> >>> TUMBLE_START(eventTime, INTERVAL '1' MINUTE) >>> >>> >>> So far so good. >>> >>> Working towards a more realistic scenario I have a source that produces >>> a stream of records that have been defined using Apache Avro. >>> >>> So I have a Measurement.avdl that (among other things) contains >>> something like this: >>> >>> record Measurement { >>> /** The time (epoch in milliseconds since 1970-01-01 UTC) when the >>> event occurred */ >>> long timestamp; >>> string letter; >>> long pageviews; >>> } >>> >>> >>> Now because the registerDataStream call can also derive the schema from >>> the provided data I can do this: >>> >>> DataStream<Measurement> inputStream = ... >>> tableEnv.registerDataStream("DataStream", inputStream); >>> >>> >>> This is very nice because any real schema is big (few hundred columns) >>> and changes over time. >>> >>> Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a >>> consequence I get this error >>> >>> Cannot apply 'TUMBLE' to arguments of type 'TUMBLE(<BIGINT>, <INTERVAL >>> MINUTE>)'. Supported form(s): 'TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)' >>> >>> >>> So far I have now yet figured how to make the system understand that the >>> timestamp column show be treated as the rowtime. >>> How do I do that? >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >>> >>> >>> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> >