Re: Powered by Flink

2015-10-19 Thread Timo Walther
+1 for adding it to the website instead of wiki. "Who is using Flink?" is always a question difficult to answer to interested users. On 19.10.2015 15:08, Suneel Marthi wrote: +1 to this. On Mon, Oct 19, 2015 at 3:00 PM, Fabian Hueske > wrote: Sounds good +1

Re: Powered by Flink

2015-10-19 Thread Timo Walther
Ah ok, sorry. I think linking to the wiki is also ok. On 19.10.2015 15:18, Fabian Hueske wrote: @Timo: The proposal was to keep the list in the wiki (can be easily extended) but link from the main website to the wiki page. 2015-10-19 15:16 GMT+02:00 Timo Walther : +1 for adding it to the

Re: ype of TypeVariable could not be determined

2016-03-08 Thread Timo Walther
Hi Radu, the exception can have multiple causes. It would be great if you could share some example code. In most cases the problem is the following: public class MapFunction { } new MapFunction(); The type WhatEverType is type erasured by Java. The type must not be declared in the "new"

Re: ype of TypeVariable could not be determined

2016-03-09 Thread Timo Walther
I think your problem is that you declared "TupleEvent2" as a TypeVariable in your code but I think you want to use a class that you defined, right? If so this is the correct declaration: MySourceFunction implements SourceFunction On 09.03.2016 09:28, Wang Yangjun wrote: Hello, I think in th

Re: Java 8 and keyBy in 1.0.0

2016-03-30 Thread Timo Walther
I will assign this issue to me and fix it soon, if that's ok? Regards, Timo On 30.03.2016 11:30, Stephan Ewen wrote: Looks like something we should fix though. Probably just needs a case distinction in the TypeExtractor. @Andrew, can you post the stack trace into the me linked issue? We'll

Re: [SQL DDL] How to extract timestamps from Kafka message's metadata

2020-08-11 Thread Timo Walther
Hi Dongwon, another possibility is to use DataStream API before. There you can extract the metadata and use DataStream.assignTimestampsAndWatermarks before converting the stream to a table. Regards, Timo On 11.08.20 09:41, Dongwon Kim wrote: Hi Dawid, I'll try your suggestion [2] and wait

Re: Event time based disconnection detection logic

2020-08-11 Thread Timo Walther
Hi Manas, at the first glance your code looks correct to me. I would investigate if your keys and watermarks are correct. Esp. the watermark frequency could be an issue. If watermarks are generated at the same time as the heartbeats itself, it might be the case that the timers fire first befo

Re: Batch version of StreamingFileSink.forRowFormat(...)

2020-08-11 Thread Timo Walther
Hi Dan, InputFormats are the connectors of the DataSet API. Yes, you can use either readFile, readCsvFile, readFileOfPrimitives etc. However, I would recommend to also give Table API a try. The unified TableEnvironment is able to perform batch processing and is integrated with a bunch of conn

Re: Proper way to do Integration Testing ?

2020-08-11 Thread Timo Walther
Hi Faye, Flink does not officially provide testing tools at the moment. However, you can use internal Flink tools if they solve your problem. The `flink-end-to-end-tests` module [1] shows some examples how we test Flink together with other systems. Many tests are still using plain bash scrip

Re: Event time based disconnection detection logic

2020-08-11 Thread Timo Walther
is produces the expected output. Also, I will assume that this is the best way to solve my problem - I can't use Flink's session windows. Let me know if anyone has any other ideas though! Thank you for your time and quick response! On Tue, Aug 11, 2020 at 1:45 PM Timo Walther <ma

Re: how to add a new runtime operator

2020-08-12 Thread Timo Walther
Hi Vincent, we don't have a step by step guide for adding new operators. Most of the important operations are exposed via DataStream API. Esp. ProcessFunction [1] fits for most complex use cases with access to the primitives such as time and state. What kind of operator is missing for your u

Re: Using Event Timestamp sink get's back with machine timezone

2020-08-12 Thread Timo Walther
Hi Faye, the problem lies in the wrong design of JDK's java.sql.Timestamp. You can also find a nice summary in the answer here [1]. java.sql.Timestamp is timezone dependent. Internally, we subtract/normalize the timezone and work with the UNIX timestamp. Beginning from Flink 1.9 we are using

Re: Is there a way to start a timer without ever receiving an event?

2020-08-13 Thread Timo Walther
What you can do is creating an initial control stream e.g. using `StreamExecutionEnivronment.fromElements()` and either use `union(controlStream, actualStream)` or use `actualStream.connect(controlStream)`. Regards, Timo On 12.08.20 18:15, Andrey Zagrebin wrote: I do not think so. Each timer

Re: Flink 1.11 SQL error: No operators defined in streaming topology. Cannot execute.

2020-08-13 Thread Timo Walther
Hi Lu, `env.execute("table api");` is not necessary after FLIP-84 [1]. Every method that has `execute` in its name will immediately execute a job. Therefore your `env.execute` has an empty pipeline. Regards, Timo [1] https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

Re: How to write a customer sink partitioner when using flinksql kafka-connector

2020-08-18 Thread Timo Walther
Hi Lei, you can check how the FlinkFixedPartitioner [1] or Tuple2FlinkPartitioner [2] are implemented. Since you are using SQL connectors of the newest generation, you should receive an instance of org.apache.flink.table.data.RowData in your partitioner. You can create a Maven project with a

Re: Flink SQL UDAF com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID

2020-08-18 Thread Timo Walther
Hi Forideal, luckily these problems will belong to the past in Flink 1.12 when UDAF are updated to the new type system [1]. Lists will be natively supported and registering custom KryoSerializers consistently as well. Until then, another workaround is to override getAccumulatorType() and def

Re: UT/IT Table API/SQL streaming job

2020-08-19 Thread Timo Walther
Hi, this might be helpful as well: https://lists.apache.org/thread.html/rfe3b45a10fc58cf19d2f71c6841515eb7175ba731d5055b06f236b3f%40%3Cuser.flink.apache.org%3E First of all, it is important to know if you are interested in end-to-end tests (incl. connectors) or excluding connectors. If you jus

Re: Editing Rowtime for SQL Table

2020-09-02 Thread Timo Walther
m On Tue, Sep 1, 2020 at 4:46 AM Timo Walther <mailto:t...@ververica.com>> wrote: Hi Satyam, Matthias is right. A rowtime attribute cannot be modified and needs to be passed "as is" through the pipeline. The only exceptions are if a newer rowtime is offered

Re:

2020-09-08 Thread Timo Walther
Hi Violeta, can you share your connector code with us? The plan looks quite complicated given the relatively simple query. Maybe there is some optimization potential. But before we dive deeper, I see a `Map(to: Row)` which indicates that we might work with a legacy sink connector. Did you tr

Re: FLINK DATASTREAM Processing Question

2020-09-08 Thread Timo Walther
Hi Vijay, one comment to add is that the performance might suffer with multiple map() calls. For safety reason, records between chained operators are serialized and deserialized in order to strictly don't influence each other. If all functions of a pipeline are guaranteed to not modify incomi

Re: Flink alert after database lookUp

2020-09-08 Thread Timo Walther
Hi Sunitha, what you are describing is a typical streaming enrichment. We need to enrich the stream with some data from a database. There are different strategies to handle this: 1) You are querying the database for every record. This is usually not what you want because it would slow down y

Re:

2020-09-08 Thread Timo Walther
You are using the old connectors. The new connectors are available via SQL DDL (and execute_sql() API) like documented here: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html Maybe this will give your some performance boost, but certainly not eno

Re: Flink alert after database lookUp

2020-09-09 Thread Timo Walther
ossible sample source code for reference to stream database. Please help me badly stuck. In the mail, I see you asked me to register. Are you referring to any training here or any other registration. Regards, Sunitha. On Tuesday, September 8, 2020, 08:19:49 PM GMT+5:30, Timo Walther wrote

Re: Slow Performance inquiry

2020-09-09 Thread Timo Walther
Hi Hazem, I guess your performance is mostly driven by the serialization overhead in this case. How do you declare your state type? Flink comes with different serializers. Not all of them are extracted automatically when using reflective extraction methods: - Note that `Serializable` decla

Re: Slow Performance inquiry

2020-09-10 Thread Timo Walther
r way? Regards, Heidy ---- *From:* Timo Walther mailto:twal...@apache.org>> *Sent:* Wednesday, September 9, 2020 1:58 PM *To:* user@flink.apache.org <mailto:user@flink.apache.org>

Re:

2020-09-10 Thread Timo Walther
Hi Violeta, I just noticed that the plan might be generated from Flink's old planner instead of the new, more performant Blink planner. Which planner are you currently using? Regards, Timo On 08.09.20 17:51, Timo Walther wrote: You are using the old connectors. The new connector

Re: Backquote in SQL dialect

2020-09-17 Thread Timo Walther
Hi Satyam, this has historical reasons. In the beginning all SQL queries were embedded in Java programs and thus Java strings. So single quote was handy for declaring SQL strings in a Java string and backticks for escaping keywords. But I agree that we should make this configurable. Feel free

Re: Watermark advancement in late side output

2020-09-21 Thread Timo Walther
Hi Ori, first of all, watermarks are sent to all side outputs (this is tested here [1]). Thus, operators in the side output branch of the pipeline will work similar to operators in the main branch. When calling `assignTimestampsAndWatermarks`, the inserted operator will erase incoming waterm

Re: Problem with zookeeper and flink config

2020-09-21 Thread Timo Walther
Hi Saksham, if I understand you correctly, you are running Zookeeper and Flink locally on your machine? Are you using Docker or is this a bare metal setup? The exception indicates that your paths contain `hdfs:` as URL scheme. Are you sure you want to use HDFS? If yes, you need to add an HDFS

Re: How to disconnect taskmanager via rest api?

2020-09-21 Thread Timo Walther
Hi Luan, this sound more of a new feature request to me. Maybe you can already open an issue for it. I will loop in Chesnay in CC if there is some possibility to achieve this already? Regards, Timo On 21.09.20 06:37, Luan Cooper wrote: Hi We're running flink standalone cluster on k8s whe

Re: Zookeeper connection loss causing checkpoint corruption

2020-09-21 Thread Timo Walther
Hi Arpith, is there a JIRA ticket for this issue already? If not, it would be great if you can report it. This sounds like a critical priority issue to me. Thanks, Timo On 22.09.20 06:25, Arpith P wrote: Hi Peter, I have recently had a similar issue where I could not load from the checkpoi

Re: hourly counter

2020-09-22 Thread Timo Walther
Hi Lian, you are right that timers are not available in a ProcessWindowFunction but the state store can be accessed. So given that your window width is 1 min, you could maintain an additional state value for counting the minutes and updating your counter once this value reached 60. Otherwise

Re: Is there a way to avoid submit hive-udf's resources when we submit a job?

2020-09-22 Thread Timo Walther
Hi Husky, I guess https://issues.apache.org/jira/browse/FLINK-14055 is what is needed to make this feature possible. @Rui: Do you know more about this issue and current limitations. Regards, Timo On 18.09.20 09:11, Husky Zeng wrote: When we submit a job which use udf of hive , the job will

Re: RichFunctions in Flink's Table / SQL API

2020-09-23 Thread Timo Walther
Hi Piyush, unfortunately, UDFs have no direct access to Flink's state. Aggregate functions are the only type of functions that can be stateful at the moment. Aggregate functions store their state in an accumulator that is serialized/deserialized on access, but an accumulator field can be back

Re: Back pressure with multiple joins

2020-09-25 Thread Timo Walther
Hi Dan, could you share the plan with us using `TableEnvironment.explainSql()` for both queries? In general, views should not have an impact on the performance. They are a logical concept that gives a bunch of operations a name. The contained operations are inlined into the bigger query duri

Re: Flink SQL - can I force the planner to reuse a temporary view to be reused across multiple queries that access different fields?

2020-09-28 Thread Timo Walther
Hi Dan, unfortunetely, it is very difficult to read you plan? Maybe you can share a higher resolution and highlight which part of the pipeline is A, B etc. In general, the planner should be smart enough to reuse subplans where appropriate. Maybe this is a bug or shortcoming in the optimizer r

Re: Flink Batch Processing

2020-09-29 Thread Timo Walther
Hi Sunitha, currently, not every connector can be mixed with every API. I agree that it is confusing from time to time. The HBase connector is an InputFormat. DataSet, DataStream and Table API can work with InputFormats. The current Hbase input format might work best with Table API. If you li

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-05 Thread Timo Walther
es, it should also work for ingestion time. I am not entirely sure whether event time is preserved when converting a Table into a retract stream. It should be possible and if it is not working, then I guess it is a missing feature. But I am sure that @Timo Walther

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-05 Thread Timo Walther
Btw which planner are you using? Regards, Timo On 05.10.20 10:23, Timo Walther wrote: Hi Austin, could you share some details of your SQL query with us? The reason why I'm asking is because I guess that the rowtime field is not inserted into the `StreamRecord` of DataStream API. The ro

Re: Flink SQL - can I force the planner to reuse a temporary view to be reused across multiple queries that access different fields?

2020-10-05 Thread Timo Walther
28, 2020 at 6:41 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Dan, unfortunetely, it is very difficult to read you plan? Maybe you can share a higher resolution and highlight which part of the pipeline is A, B etc. In general, the planner should be smart

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-09 Thread Timo Walther
ub.com/austince/flink-1.10-sql-windowing-error On Mon, Oct 5, 2020 at 4:24 AM Timo Walther <mailto:twal...@apache.org>> wrote: Btw which planner are you using? Regards, Timo On 05.10.20 10:23, Timo Walther wrote: > Hi Austin, > > could you share

Re: sql/table configuration naming guide/style/spec

2020-10-09 Thread Timo Walther
Hi Luan, we haven't updated all config parameters to string-based options. This is still on going. The idle state retention will be configurable in 1.12: https://issues.apache.org/jira/browse/FLINK-18555 I hope this helps. Regards, Timo On 09.10.20 15:33, Luan Cooper wrote: Hi I've read

Re: Best way to test Table API and SQL

2020-10-09 Thread Timo Walther
Hi Rex, let me copy paste my answer from a similar thread 2 months ago: Hi, this might be helpful as well: https://lists.apache.org/thread.html/rfe3b45a10fc58cf19d2f71c6841515eb7175ba731d5055b06f236b3f%40%3Cuser.flink.apache.org%3E First of all, it is important to know if you are interested i

Re: Streaming SQL Job Switches to FINISHED before all records processed

2020-10-12 Thread Timo Walther
uplicates, but for records with no duplicates, I'd have to wait until no more records are coming -- am I missing something? Thanks so much, Austin On Fri, Oct 9, 2020 at 10:44 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Austin, if you don't want to worry a

Re: In 1.11.2/flinkSql/batch, tableEnv.getConfig.setNullCheck(false) seems to break group by-s

2020-10-16 Thread Timo Walther
Hi Jon, I would not recommend to use the configuration parameter. It is not deprecated yet but can be considered legacy code from before we reworked the type system. Regards, Timo On 16.10.20 13:23, Kurt Young wrote: Yes, I think this is a bug, feel free to open a jira and a pull request.

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Timo Walther
Hi Manas, you need to make sure to differentiate between what Flink calls "pre-flight phase" and "cluster phase". The pre-flight phase is were the pipeline is constructed and all functions are instantiated. They are then later serialized and send to the cluster. If you are reading your pro

Re: flink job will restart over and over again if a taskmanager's disk damages

2020-10-22 Thread Timo Walther
Hi, thanks for letting us know about this shortcoming. I will link someone from the runtime team in the JIRA issue. Let's continue the discussion there. Regards, Timo On 22.10.20 05:36, chenkaibit wrote: Hi everyone:  I met this Exception when a hard disk was damaged: https://issues.apache

Re: Configurable Parser

2020-10-22 Thread Timo Walther
Hi Theo, this is indeed a difficult use case. The KafkaDeserializationSchema is actually meant mostly for deserialization and should not contain more complex logic such as joining with a different topic. You would make KafkaDeserializationSchema stateful. But in your usecase, I see no better

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-22 Thread Timo Walther
CONFIG_TOPIC = s.get("CONFIG_TOPIC"); CONFIG_KAFKA = s.get("CONFIG_KAFKA"); } } This produces the same issue. With the easier solution that you listed, are you implying I use multiple instances or a singleton pattern of some sort? On Thu, Oct 22, 2020 at 1:2

Re: 回复: rename error in flink sql

2020-10-22 Thread Timo Walther
Hi, sorry for the late reply. I the problem was in the `tEnv.toAppendStream(result,Order.class).print();` right? You can also find a new example here: https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/Gett

Re: Flink - Kafka topic null error; happens only when running on cluster

2020-10-23 Thread Timo Walther
-flight and cluster phases? I couldn't find anything on ci.apache.org/projects/flink <http://ci.apache.org/projects/flink> and I think this behaviour should be documented as a warning/note. On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <mailto:twal...@apache.org>> wrote:

Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-04 Thread Timo Walther
Hi Rex, sorry for the late reply. POJOs will have much better support in the upcoming Flink versions because they have been fully integrated with the new table type system mentioned in FLIP-37 [1] (e.g. support for immutable POJOs and nested DataTypeHints etc). For queries, scalar, and table

Re: Error parsing annotations in flink-table-planner_blink_2.12-1.11.2

2020-11-05 Thread Timo Walther
Hi Yuval, this error is indeed weird. @Aljoscha: I think Calcite uses apiguardian. When I saw the initial error, it looked like there are different Apache Calcite versions in the classpath. I'm wondering if this is a pure SBT issue because I'm sure that other users would have reported this er

Re: LEGACY('STRUCTURED_TYPE' to pojo

2020-11-05 Thread Timo Walther
n the next version with this change might release? On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Rex, sorry for the late reply. POJOs will have much better support in the upcoming Flink versions because they have been fully integrate

Re: Best way to test Table API and SQL

2020-11-05 Thread Timo Walther
gular one? Thanks On Fri, Oct 9, 2020 at 7:55 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Rex, let me copy paste my answer from a similar thread 2 months ago: Hi, this might be helpful as well: https://lists.apache.o

Re: Filter By Value in List

2020-11-05 Thread Timo Walther
Hi Rex, as far as I know, the IN operator only works on tables or a list of literals where the latter one is just a shortcut for multiple OR operations. I would just go with a UDF for this case. In SQL you could do an UNNEST to convert the array into a table and then use the IN operator. But

Re: SQL aggregation functions inside the Table API

2020-11-09 Thread Timo Walther
Hi Ori, we might support SQL expressions soon in Table API. However, we might not support aggregate functions immediately. I would recommend to use `sqlQuery` for now. The following is supported: val table = tenv.fromDataStream(stream) val newTable = tenv.sqlQuery(s"SELECT ... FROM $table")

Re: ValidationException using DataTypeHint in Scalar Function

2020-11-09 Thread Timo Walther
Sorry for jumping in so late. I think Dawid gave a nice summary. As he said, integration of the DataStream <> Table integration is still under development. Until then I would suggest to option 3) which means don't upgrade the functions and use the old registration function `registerFunction`.

Re: Table SQL Filesystem CSV recursive directory traversal

2020-11-09 Thread Timo Walther
Hi Ruben, by looking at the code, it seems you should be able to do that. At least for batch workloads we are using org.apache.flink.formats.csv.CsvFileSystemFormatFactory.CsvInputFormat which is a FileInputFormat that supports the mentioned configuration option. The problem is that this mig

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Timo Walther
Hi Felipe, with non-deterministic Jark meant that you never know if the mini batch timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the execution. This depends how fast records arrive at the operator. In general, processing time can be considered non-deterministic, because 1

Re: Changing the topology while upgrading a job submitted by SQL

2020-11-10 Thread Timo Walther
Hi, unfortunately, we currently don't provide any upgrading guarantees for SQL. In theory we could add a possibility to add operator uids, however, this will not help much because the underlying SQL operators or better optimization rules that create a smarter pipeline could change the entire

Re: Flink Kafka Table API for python with JAAS

2020-11-10 Thread Timo Walther
Hi, are you using the SQL jars or do you build the dependency jar file yourself? It might be the case that the SQL jar for Kafka does not include this module as the exception indicates. You might need to build a custom Kafka jar with Maven and all dependencies you need. (including correct MET

Re: How to convert Int to Date

2020-11-17 Thread Timo Walther
Hi Rex, the classes mentioned in the documentation such as `int` and `java.lang.Integer` are only used when you leave the SQL world to a UDF or to a Java implementation in a sink. But as a SQL user you only need to pay attention to the logical data type. Those must match entirely or be a sup

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread Timo Walther
Hi Fuyao, sorry for not replying earlier. You posted a lot of questions. I scanned the thread quickly, let me try to answer some of them and feel free to ask further questions afterwards. "is it possible to configure the parallelism for Table operation at operator level" No this is not pos

Re: Print on screen DataStream content

2020-11-24 Thread Timo Walther
Hi Simone, if you are just executing DataStream pipelines locally in your IDE while prototyping. You should be able to use `DataStream#print()` which just prints to standard out [1] (It might be hidden between the log messages). For debugging locally, you can also just set breakpoints in your

Re: Question: How to avoid local execution being terminated before session window closes

2020-11-24 Thread Timo Walther
Hi Klemens, what you are observing are reasons why event-time should be preferred over processing-time. Event-time uses the timestamp of your data while processing-time is to basic for many use cases. Esp. when you want to reprocess historic data, you want to do that at full speed instead of

Re: Job Manager logs

2020-11-24 Thread Timo Walther
Hi Saksham, could you tell us a bit more about your deployement where you run Flink. This seems to be the root exception: 2020-11-24 11:11:16,296 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler [] - Failed to transfer file from TaskExecutor f0dc0ae680e65a

Re: Question: How to avoid local execution being terminated before session window closes

2020-11-24 Thread Timo Walther
integrate it with an Apache Kafka Service. Output is written to a Postgres-Database-System. I'll have a look at your proposal and let you know if it worked, after having finished a few prerequisite parts. Regards     Klemens Am 24.11.20 um 12:59 schrieb Timo Walther: Hi Klemens, wha

Re: Dynamic ad hoc query deployment strategy

2020-11-24 Thread Timo Walther
I agree with Dawid. Maybe one thing to add is that reusing parts of the pipeline is possible via StatementSets in TableEnvironment. They allow you to add multiple queries that consume from a common part of the pipeline (for example a common source). But all of that is compiled into one big job

Re: Learn flink source code book recommendation

2020-11-24 Thread Timo Walther
Hi, one advice I can give you is to checkout the code and execute some of the examples in debugging mode. Esp. within Flink's functions e.g. MapFunction or ProcessFunction you can set a breakpoint and look at the stack trace. This gives you a good overview about the Flink stack in general.

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-24 Thread Timo Walther
ic-watermarkgenerator Best, Fuyao On 11/20/20 08:55, Timo Walther wrote: Hi Fuyao, sorry for not replying earlier. You posted a lot of questions. I scanned the thread quickly, let me try to answer some of them and feel free to ask further questions afterwards. "is it possible to con

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-26 Thread Timo Walther
/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins>) Thanks! Fuyao On Tue, Nov 24, 2020 at 9:06 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Fuyao, great that you could make progre

Re: queryLogicalType != sinkLogicalType when UDAF returns List

2020-11-27 Thread Timo Walther
Hi, first of all we don't support ListTypeInfo in Table API. Therefore, it is treated as a RAW type. The exception during exception creation is a bug that should be fixed in future version. But the mismatch is valid: ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`. Can you

Re: How do I pass the aggregated results of an SQL TUMBLING Window as one set of data to a Process Function?

2020-12-14 Thread Timo Walther
Hi Marco, sorry for the late reply. Have you looked into user-defined aggregate functions for SQL? I think your requirements can be easily implemented there. You can declare multiple aggregate functions per window. There is also the built-in function LISTAGG that might help for your use case.

Re: Is there any way to directly convert org.apache.flink.table.api.Table into my POJO.

2020-12-14 Thread Timo Walther
Hi, first, we should clarify "continue to be put into the Flink table": A Flink Table object does not physically store the data. It is basically a view that contains a transformation pipeline. When you are calling `collect()` the pipeline is executed and all results from the cluster are stre

Re: How does Flink cache values that also do not exist in the database?

2020-12-14 Thread Timo Walther
Hi Marco, when you say "database" are you refering to the JDBC connector or would you like to perform a JDBC query within some UDF? In the latter case, I would recommend to use Flink's ProcessFunction because you can store the cache hits in state (and thus keep them forever). SQL/Table API doe

Re: state inside functions

2020-12-17 Thread Timo Walther
Hi, if you would like to dynamically adjust configuration of your streaming job, it might be a good approach to consider the configuration as a stream itself. The connect() API can be used to connect a main stream with a control stream. In any case the configuration should be persisted in st

Re: Changing application configuration when restoring from checkpoint/savepoint

2020-12-17 Thread Timo Walther
Hi, I gave some answers in the other mail thread. Some additional comment: In general I think even configuration can be considered as state in this case. If state is not set, the job can be considered as a fresh start. Once the state is set, it would basically be just a configuration update.

Re: Set TimeZone of Flink Streaming job

2020-12-17 Thread Timo Walther
Hi, Flink does not support time zones currently. However, all time operations work on Java `long` values. It can be up to the user what this long value represents. It must not be UTC but can also be adjusted for another time zone. Since DataStream API supports arbirary Java objects, users can

Re: Flink - sending clicks+impressions to AWS Personalize

2020-12-17 Thread Timo Walther
Hi Dan, the exception that you get is a very frequent limitation in Flink SQL at the moment. I tried to summarize the issue recently here: https://stackoverflow.com/questions/64445207/rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join-despite-usi/64500296#64500296 The query i

Re: Flink - Create Temporary View and "Rowtime attributes must not be in the input rows of a regular join"

2020-12-17 Thread Timo Walther
Hi Dan, are you intending to use interval joins, regular joins, or a mixture of both? For regular joins you must ensure to cast a rowtime attribute to timestamp as early as possible. For interval joins, you need to make sure that the rowtime attribute is unmodified. Currently, I see COALE

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

2020-12-28 Thread Timo Walther
Hi Yuval, the legacy type has no string representation that can be used in a SQL DDL statement. The current string representation LEGACY(...) is only a temporary work around to persist the old types in catalogs. Until FLIP-136 is fully implemented, toAppendStream/toRetractStream support only

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

2020-12-28 Thread Timo Walther
id type of the table. Hope that clarifies a bit, since the pipeline is rather complex I can't really share a MVCE of it. On Mon, Dec 28, 2020 at 11:08 AM Timo Walther mailto:twal...@apache.org>> wrote: Hi Yuval, the legacy type has no string representa

Re: Registering RawType with LegacyTypeInfo via Flink CREATE TABLE DDL

2020-12-28 Thread Timo Walther
P-136 to resolve the issue around legacy types? Will it's implementation allow to register LEGACY types? or a new variation of them? On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <mailto:twal...@apache.org>> wrote: I would recommend to use the old UDF stack for now.

Re: Flink SQL, temporal joins and backfilling data

2021-01-05 Thread Timo Walther
Hi Dan, are you sure that your watermarks are still correct during reprocessing? As far as I know, idle state retention is not used for temporal joins. The watermark indicates when state can be removed in this case. Maybe you can give us some more details about which kind of temporal join yo

Re: UDTAGG and SQL

2021-01-05 Thread Timo Walther
Hi Marco, nesting aggregated functions is not allowed in SQL. The exception could be improved though. I guess the planner searches for a scalar function called `MyUDTAGG` in your example and cannot find one. Maybe the built-in function `COLLECT` or `LISTAGG`is what you are looking for? htt

Re: UDTAGG and SQL

2021-01-05 Thread Timo Walther
, ie, SELECT FROM (SELECT FROM)? On Jan 5, 2021, at 6:10 AM, Timo Walther wrote: Hi Marco, nesting aggregated functions is not allowed in SQL. The exception could be improved though. I guess the planner searches for a scalar function called `MyUDTAGG` in your example and cannot find one

Re: Using key.fields in 1.12

2021-01-07 Thread Timo Walther
Hi Aeden, we updated the connector property design in 1.11 [1]. The old translation layer exists for backwards compatibility and is indicated by `connector.type=kafka`. However, `connector = kafka` indicates the new property design and `key.fields` is only available there. Please check all p

Re: Using key.fields in 1.12

2021-01-07 Thread Timo Walther
tor-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160 Does format.avro-schema need to be specified differently? Thank you, Aeden On Thu, Jan 7, 2021 at 12:15 AM Timo Walther wrote: Hi Aeden, we updated the connector property design in 1.11 [1]

Re: Using key.fields in 1.12

2021-01-11 Thread Timo Walther
me in handy. I was looking through docs hoping there was a way to still specify the schema with no luck. Does such an option exist? On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <mailto:twal...@apache.org>> wrote: Hi Aeden, `format.avro-schema` is not required anymore in the n

Re: FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread Timo Walther
Hi, it seems this is a bug that is located in the Apache Calcite code. I will open an issue for it. Thanks for reporting this. Regards, Timo On 12.01.21 11:08, jy l wrote: Hi: Flink SQL filter data throw an exception, code: def main(args: Array[String]): Unit = {     val env = StreamExecut

Re: FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread Timo Walther
See here: https://issues.apache.org/jira/browse/FLINK-20942 On 12.01.21 16:04, Timo Walther wrote: Hi, it seems this is a bug that is located in the Apache Calcite code. I will open an issue for it. Thanks for reporting this. Regards, Timo On 12.01.21 11:08, jy l wrote: Hi: Flink SQL

Re: Flink 1.12 Kryo Serialization Error

2021-01-13 Thread Timo Walther
Hi Yuval, could you share a reproducible example with us? I see you are using SQL / Table API with a RAW type. I could imagine that the KryoSerializer is configured differently when serializing and when deserializing. This might be due to `ExecutionConfig` not shipped (or copied) through the

Re: How to Iterate over results in Table API version 1.12.0

2021-01-15 Thread Timo Walther
Hi Robert, could you send us the error/stacktrace that is printed? An example how it should work is shown here: https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java https://github.c

Re: How to Iterate over results in Table API version 1.12.0

2021-01-15 Thread Timo Walther
ping SlotPool. 2021-01-15 16:52:08,468 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager 0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68 for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource m

Re: How to Iterate over results in Table API version 1.12.0

2021-01-15 Thread Timo Walther
maybe Godfrey in CC knows more? On 15.01.21 18:10, Timo Walther wrote: How are you running the Flink cluster? What is your deplyment? The exception clearly indicates that you found a bug. Could you open an ticket in Flink's JIRA? We need details how to reproduce it. Thanks, Timo

Re: Why use ListView?

2021-01-18 Thread Timo Walther
Hi Rex, ListView and MapView have been part of Flink for years. However, they were considered as an internal feature and therefore not well documented. MapView is used internally to make distinct aggregates work. Because we reworked the type inference of aggregate functions, we also added ba

Re: Computed Columns In Stream to Table Conversion

2021-01-18 Thread Timo Walther
Hi Aeden, computed columns on a DataStrem input are currently not supported. I am currently working on making this possible. Have a look at FLIP-136 for more information [1]. However, you can simply add a projection before you register a view: tEnv.createTemporaryView("myTable", dataStream);

Re: Flink SQL and checkpoints and savepoints

2021-01-18 Thread Timo Walther
Hi Dan, currently, we cannot provide any savepoint guarantees between releases. Because of the nature of SQL that abstracts away runtime operators, it might be that a future execution plan will look completely different and thus we cannot map state anymore. This is not avoidable because the o

Re: How to Iterate over results in Table API version 1.12.0

2021-01-18 Thread Timo Walther
Mode). What's the link to Flink's JIRA? On Fri, Jan 15, 2021 at 12:19 PM Timo Walther <mailto:twal...@apache.org>> wrote: maybe Godfrey in CC knows more? On 15.01.21 18:10, Timo Walther wrote: > How are you running the Flink cluster? What is your deplyment?

  1   2   3   4   5   6   7   >