Fabian, Should we instead add this as a registered TypeInfoFactory?
Greg On Thu, Oct 27, 2016 at 3:55 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Yes, I think you are right. > TypeInfoParser needs to be extended to parse the java.sql.* types into the > corresponding TypeInfos. > > Can you open a JIRA for that? > > Thanks, Fabian > > 2016-10-27 9:31 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: > >> Hi, >> >> >> >> I dig meanwhile more through this and I think I found a bug actually. >> >> >> >> The scenario that I was trying to describe was something like >> >> 1. You have a generic datastream with Tuple (alternatively I could >> move to row I guess) and you get the data from whatever stream (e.g., >> kafka, network socket…) >> >> 2. In the map/flat map function you parse and instantiate the >> tuple generically >> >> 3. In the “returns()” function of the map you enforce the types >> >> >> >> DataStream<Tuple> = env.socketTextStream(…) >> >> .map(new mapFunction(){ >> >> Public Tuple map(String value){ >> >> Tuple out = >> Tuple.getTupleClass(#) >> >> … >> >> out.setField(SqlTimeTypeInfo.TIMESTAMP,0) >> >> … >> >> >> >> }}) .returns(“Tuple#<java.sql.TIMESTAMP,…>”); >> >> >> >> >> >> The problem is that if you rely on the type extraction mechanism called >> after the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will >> not happen but instead a GenericType<TIMESTAMP> will be created. >> >> It looks like the type parsers were not extended to consider this types >> >> >> >> Dr. Radu Tudoran >> >> Senior Research Engineer - Big Data Expert >> >> IT R&D Division >> >> >> >> [image: cid:image007.jpg@01CD52EB.AD060EE0] >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> >> European Research Center >> >> Riesstrasse 25, 80992 München >> >> >> >> E-mail: *radu.tudo...@huawei.com <radu.tudo...@huawei.com>* >> >> Mobile: +49 15209084330 >> >> Telephone: +49 891588344173 >> >> >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, >> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN >> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, >> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN >> >> This e-mail and its attachments contain confidential information from >> HUAWEI, which is intended only for the person or entity whose address is >> listed above. Any use of the information contained herein in any way >> (including, but not limited to, total or partial disclosure, reproduction, >> or dissemination) by persons other than the intended recipient(s) is >> prohibited. If you receive this e-mail in error, please notify the sender >> by phone or email immediately and delete it! >> >> >> >> *From:* Fabian Hueske [mailto:fhue...@gmail.com] >> *Sent:* Wednesday, October 26, 2016 10:11 PM >> *To:* user@flink.apache.org >> *Subject:* Re: TIMESTAMP TypeInformation >> >> >> >> Hi Radu, >> >> I might not have complete understood your problem, but if you do >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> val tEnv = TableEnvironment.getTableEnvironment(env) >> >> val ds = env.fromElements( (1, 1L, new Time(1,2,3)) ) >> val t = ds.toTable(tEnv, 'a, 'b, 'c) >> >> val results = t >> .select('c + 10.seconds) >> >> then field 'c will be of type SqlTimeTypeInfo and handled as such. >> >> Hope this helps, >> >> Fabian >> >> >> >> 2016-10-25 17:32 GMT+02:00 Radu Tudoran <radu.tudo...@huawei.com>: >> >> Re-hi, >> >> >> >> I actually realized that the problem comes from the fact that the >> datastream that I am registering does not create properly the types. >> >> >> >> I am using something like >> >> >> >> DataStream<Tuple> … .returns(“TupleX<,….,java.sql.Timestamp, >> java.sql.Time>”)…and I was expecting that these will be converted to >> SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could >> force the type to be recognize as a SqlTimeType? >> >> >> >> >> >> *From:* Radu Tudoran >> *Sent:* Tuesday, October 25, 2016 4:46 PM >> *To:* 'user@flink.apache.org' >> *Subject:* TIMESTAMP TypeInformation >> >> >> >> Hi, >> >> >> >> I would like to create a TIMESTAMP type from the data schema. I would >> need this to match against the FlinkTypeFactory (toTypeInfo()) >> >> >> >> *def* toTypeInfo(relDataType: RelDataType): TypeInformation[_] = >> relDataType.getSqlTypeName *match* { >> >> *case* BOOLEAN => BOOLEAN_TYPE_INFO >> >> *case* TINYINT => BYTE_TYPE_INFO >> >> *case* SMALLINT => SHORT_TYPE_INFO >> >> *case* INTEGER => INT_TYPE_INFO >> >> *case* BIGINT => LONG_TYPE_INFO >> >> *case* FLOAT => FLOAT_TYPE_INFO >> >> *case* DOUBLE => DOUBLE_TYPE_INFO >> >> *case* VARCHAR | CHAR => STRING_TYPE_INFO >> >> *case* DECIMAL => BIG_DEC_TYPE_INFO >> >> >> >> // date/time types >> >> *case* DATE => SqlTimeTypeInfo.DATE >> >> *case* TIME => SqlTimeTypeInfo.TIME >> >> *case* *TIMESTAMP** => SqlTimeTypeInfo.**TIMESTAMP* >> >> >> >> I tried to use create the TypeInformation by calling directly >> SqlTimeTypeInfo.TIMESTAMP . However, it seems that >> relDataType.getSqlTypeName match is of type ANY instead of being of type >> TIMESTAMP. >> >> >> >> Any thoughts of how to create the proper TIMESTAMP typeinformation? >> >> >> >> >> >> >> >> >> > >