Re: Help needed to increase throughput of simple flink app

2020-11-10 Thread ashwinkonale
Hey, I am reading messages with schema id and using confluent schema registry to deserialize to Genericrecord. After this point, pipelineline will have this objects moving across. Can you give me some examples of `special handling of avro messages` you mentioned ? -- Sent from: http://apache-fli

Table API, accessing nested fields

2020-11-10 Thread Ori Popowski
How can I access nested fields e.g. in select statements? For example, this won't work: val table = tenv .fromDataStream(stream) .select($"context.url", $"name") What is the correct way? Thanks.

Changing the topology while upgrading a job submitted by SQL

2020-11-10 Thread Yi Tang
Hi folks, A question about changing the topology while upgrading a job submitted by SQL. Is it possible for now? Looks like if we want to recover a job from a savepoint, it requires the uid of the operator matches the corresponding one in the state. The automatically generated uid depends largely

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Felipe Gutierrez
Hi Jark, thanks for your reply. Indeed, I forgot to write DISTINCT on the query and now the query plan is splitting into two hash partition phases. what do you mean by deterministic time? Why only the window aggregate is deterministic? If I implement the ProcessingTimeCallback [1] interface is it

Re: Table API, accessing nested fields

2020-11-10 Thread Dawid Wysakowicz
You should use the get method: val table = tenv       .fromDataStream(stream)       .select($"context".get("url"), $"name") Best, Dawid On 10/11/2020 10:15, Ori Popowski wrote: > > How can I access nested fields e.g. in select statements? > > For example, this won't work: > > val tabl

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Timo Walther
Hi Felipe, with non-deterministic Jark meant that you never know if the mini batch timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the execution. This depends how fast records arrive at the operator. In general, processing time can be considered non-deterministic, because 1

Job crash in job cluster mode

2020-11-10 Thread Tim Eckhardt
Hi there, I have a problem with running a flink job in job cluster mode using flink 1.11.1 (also tried 1.11.2). The same job is running well using the session cluster mode as well as using flink 1.10.0 in job cluster mode. The job starts running and is running for quite some time but it

Re: Changing the topology while upgrading a job submitted by SQL

2020-11-10 Thread Timo Walther
Hi, unfortunately, we currently don't provide any upgrading guarantees for SQL. In theory we could add a possibility to add operator uids, however, this will not help much because the underlying SQL operators or better optimization rules that create a smarter pipeline could change the entire

Re: Flink Kafka Table API for python with JAAS

2020-11-10 Thread Timo Walther
Hi, are you using the SQL jars or do you build the dependency jar file yourself? It might be the case that the SQL jar for Kafka does not include this module as the exception indicates. You might need to build a custom Kafka jar with Maven and all dependencies you need. (including correct MET

Re: CLI help, documentation is confusing...

2020-11-10 Thread Kostas Kloudas
Hi Marco, I agree with you that the -m help message is misleading but I do not think it has changed between releases. You can specify the address of the jobmanager or, for example, you can put "-m yarn-cluster" and depending on your environment setup Flink will pick up a session cluster or will cr

FlinkSQL kafka->dedup->kafka

2020-11-10 Thread Laurent Exsteens
Hello, I'm getting an error in Flink SQL when reading from kafka, deduplicating records and sending them back to Kafka. The behavior I want is the following: *input:* | client_number | address | | --- | --- | | 1 | addr1 | | 1

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-10 Thread Aljoscha Krettek
On 10.11.20 11:53, Tim Josefsson wrote: Also when checking my logs I see the following message: 11:41:56,345 INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 [omitted for brevity] transaction.timeout.ms = 90 transactional.id = Sour

Re: debug statefun

2020-11-10 Thread Tzu-Li (Gordon) Tai
Hi, StateFun provide's a Harness utility exactly for that, allowing you to test a StateFun application in the IDE / setting breakpoints etc. You can take a look at this example on how to use the harness: https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-10 Thread Tim Josefsson
Hey Aljoscha, I'm setting the transaction.timeout.ms when I create the FlinkKafkaProducer: I create a Properties object and then set the property and finally add those properties when creating the producer. Properties producerProps = new Properties(); producerProps.setProperty("transaction.timeo

Re: Job crash in job cluster mode

2020-11-10 Thread Matthias Pohl
Hi Tim, I'm not aware of any memory-related issues being related to the deployment mode used. Have you checked the logs for hints? Additionally, you could try to extract a heap dump. That might help you in analyzing the cause of the memory consumption. The TaskManager and JobManager are logging th

error in using package SubnetUtils

2020-11-10 Thread Diwakar Jha
Hello, I'm migrating from Flink 1.8 to Flink 1.11 on an EMR cluster and I get this error message for using package subnetUtils. Its working fine for Flink 1.8. [javac] import org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hadoop.$internal.org.apache.commons.net.util.SubnetUtils; [ja

Re: debug statefun

2020-11-10 Thread Igal Shilman
Hi Lian, If you are using the statefun-sdk directly (an embedded mode) then, most likely is that you are missing a META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule file that would point to your modules class. We are using Java SPI [1] to load all the stateful functions mo

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-10 Thread Felipe Gutierrez
I see, thanks Timo -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Tue, Nov 10, 2020 at 3:22 PM Timo Walther wrote: > > Hi Felipe, > > with non-deterministic Jark meant that you never know if the mini batch > timer (every 3 s) or the mini batch thr

Re: debug statefun

2020-11-10 Thread Lian Jiang
Igal, I am using AutoService and I don't need to add auto-service-annotations since it is provided by statefun-flink-core. Otherwise, my project cannot even build. I did exactly the same as https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-greeter-example/src/main/ja

Re: Stateful Functions: java.lang.IllegalStateException: There are no routers defined

2020-11-10 Thread Puneet Kinra
Hi All I am facing the problem while running the Harness Runner Test.I have defined the AutoService annotation in the Function Module java.lang.IllegalStateException: There are no routers defined. at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFuncti

Re: debug statefun

2020-11-10 Thread Puneet Kinra
Hi Gordan I have tried the harness utility , I am getting the below error even though @*autoservice *annotation is there in function Module . java.lang.IllegalStateException: There are no routers defined. at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulF

BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-10 Thread fuyao . li
Hi Experts, I am trying to use to implement a KeyedProcessFunction with onTimer() callback. I need to use event time and I meet some problems with making the watermark available to my operator. I meet some strange behaviors. I have a joined retracted stream without watermark or timestamp inf

Re: Changing the topology while upgrading a job submitted by SQL

2020-11-10 Thread Yi Tang
Hi Timo, Got it, thanks Timo Walther 于 2020年11月10日周二 22:39写道: > Hi, > > unfortunately, we currently don't provide any upgrading guarantees for > SQL. In theory we could add a possibility to add operator uids, however, > this will not help much because the underlying SQL operators or better > op

Re: Flink Kafka Table API for python with JAAS

2020-11-10 Thread Xingbo Huang
Hi, You can use the following API to add all the dependent jar packages you need: table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar") For more related content, you can refer to the pyflink doc[1] [1] https://ci.

timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
Hi, I have source json data like: {"ts": "2020-11-09T20:26:10.368123Z", "user_id": "user1", "action": "click"} ... my sql is: create table t ( user_id string, action string, ts timestamp, watermark for ts as ts - interval '5' second ) with ( 'connector' = 'kafka', 'topic' = 'test', 'json.timestam

Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
i also tried: ts TIMESTAMP WITH LOCAL TIME ZONE but it failed with Rowtime attribute 'ts' must be of type TIMESTAMP but is of type 'TIMESTAMP(6) WITH LOCAL TIME ZONE'. On Tue, Nov 10, 2020 at 5:42 PM Fanbin Bu wrote: > Hi, > > I have source json data like: > {"ts": "2020-11-09T20:26:10.368123Z"

Re: FlinkSQL kafka->dedup->kafka

2020-11-10 Thread Jark Wu
Hi Laurent, This is because the deduplicate node generates an updating stream, however Kafka currently only supports append-only stream. This can be addressed in release-1.12, because we introduce a new connector "upsert-kafka" which supports writing updating streams into Kafka compacted topics.

Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
In the `computed column` section of [1], i saw some related doc: ``` On the other hand, computed column can be used to derive event time column because an event time column may need to be derived from existing fields, e.g. the original field is not TIMESTAMP(3) type or is nested in a JSON string.

Re: timestamp parsing in create table statement

2020-11-10 Thread Jark Wu
Hi Fanbin, The example you gave is correct: create table t ( user_id string, action string, ts string, transform_ts_format(ts) as new_ts, watermark for new_ts as new_ts - interval '5' second ) with ( ... ) You can use "TO_TIMESTAMP" built-in function instead of the UDF, e.g. TO_TIMEST

Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
Jark, Thanks for the quick response. I tried to_timestamp(ts, ...), but got the following error: Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "(" at line looks like it complains about the second `(` in create table t (... to_timestamp(..

Re: timestamp parsing in create table statement

2020-11-10 Thread Jark Wu
Oh, sorry, the example above is wrong. The column name should come first. So the full example should be: create table t ( user_id string, action string, ts string, new_ts AS TO_TIMESTAMP(ts, '-MM-dd''T''HH:mm:ss.SSS''Z'''), watermark for new_ts as new_ts - interval '5' second ) with

Re: timestamp parsing in create table statement

2020-11-10 Thread Fanbin Bu
Thanks Jark, I confused it with the normal sql syntax. now it works (after changing it to HH:mm:ss.SS...) Fanbin On Tue, Nov 10, 2020 at 7:24 PM Jark Wu wrote: > Oh, sorry, the example above is wrong. The column name should come first. > So the full example should be: > > create table t (

Re: SSL setup for YARN deployment when hostnames are unknown.

2020-11-10 Thread Jiahui Jiang
Ping on this 🙂 It there anyway I can run a script or implement some interface to run before the Dispatcher service starts up to dynamically generate the keystore? Thank you! From: Jiahui Jiang Sent: Monday, November 9, 2020 3:19 PM To: user@flink.apache.org Su

Re: debug statefun

2020-11-10 Thread Tzu-Li (Gordon) Tai
Hi Lian, Sorry, I didn't realize that the issue you were bumping into was caused by the module not being discovered. You're right, the harness utility would not help here. As for the module discovery problem: - Have you looked at the contents of your jar, and see that a META-INF/services/o

Re: debug statefun

2020-11-10 Thread Lian Jiang
Thanks Gordon. After better understanding how autoservice work, I resolved the issue by adding below into my build.gradle file: annotationProcessor 'com.google.auto.service:auto-service:1.0-rc6' Without this, the project can compile but the autoservice class cannot be generated appropriately. So

Re: debug statefun

2020-11-10 Thread Tzu-Li (Gordon) Tai
On Wed, Nov 11, 2020 at 1:44 PM Tzu-Li (Gordon) Tai wrote: > Hi Lian, > > Sorry, I didn't realize that the issue you were bumping into was caused by > the module not being discovered. > You're right, the harness utility would not help here. > Actually, scratch this comment. The Harness utility a

Re: Rules of Thumb for Setting Parallelism

2020-11-10 Thread Rex Fenley
Awesome, thanks! On Sat, Nov 7, 2020 at 6:43 AM Till Rohrmann wrote: > Hi Rex, > > You should configure the number of slots per TaskManager to be the number > of cores of a machine/node. In total you will then have a cluster with > #slots = #cores per machine x #machines. > > If you have a clust

Re: Join Bottleneck

2020-11-10 Thread Rex Fenley
Thank you for the clarification. On Sat, Nov 7, 2020 at 7:37 AM Till Rohrmann wrote: > Hi Rex, > > "HasUniqueKey" means that the left input has a unique key. > "JoinKeyContainsUniqueKey" means that the join key of the right side > contains the unique key of this relation. Hence, it looks normal

Re: Upsert UDFs

2020-11-10 Thread Rex Fenley
Thanks! We did give that a shot and ran into the bug that I reported here https://issues.apache.org/jira/browse/FLINK-20036 . I'm also seeing this function public void emitUpdateWithRetract(ACC accumulator, RetractableCollector out); // OPTIONAL and it says it's more performant in some cases v

CREATE TABLE LIKE clause from different catalog or database

2020-11-10 Thread Dongwon Kim
Hi, Is it disallowed to refer to a table from different databases or catalogs when someone creates a table? According to [1], there's no way to refer to tables belonging to different databases or catalogs. [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#creat

Checkpoint growth

2020-11-10 Thread Rex Fenley
Hello, I'm reading the docs/blog on incremental checkpoints and it says: >You can also no longer delete old checkpoints as newer checkpoints need them, and the history of differences between checkpoints can grow indefinitely over time. You need to plan for larger distributed storage to maintain t

Re: Checkpoint growth

2020-11-10 Thread Akshay Aggarwal
Hi Rex, As per my understanding there are multiple levels of compactions (with RocksDB), and files which are not compacted recently would remain in older checkpoint directories, and there will be references to those files in the current checkpoint. There is no clear way of identifying these refere