Re: Does savepoint reset the base for incremental checkpoint

2020-07-06 Thread Congxian Qiu
Hi checkpoint base is only used in the incremental checkpoint, the answer for the first question is checkpoint x. After restoring from a savepoint, there is no base for the first checkpoint. you can ref to the code[1][2] for more information. [1] https://github.com/apache/flink/blob/c14f9d2f9f6

Re: HELP ! ! ! When using Flink1.10 to define table with the string type, the query result is null

2020-07-06 Thread Benchao Li
Hi Jim, This is a known issue[1], could you verify that if this issue meets your requirements? [1] https://issues.apache.org/jira/browse/FLINK-18002 Jim Chen 于2020年7月6日周一 下午1:28写道: > Hi, everyone! > > When i use flink1.10 to define table, and i want to define the json array > as the string typ

How to ensure that job is restored from savepoint when using Flink SQL

2020-07-06 Thread shadowell
Hello, everyone, I have some unclear points when using Flink SQL. I hope to get an answer or tell me where I can find the answer. When using the DataStream API, in order to ensure that the job can recover the state from savepoint after adjustment, it is necessary to specify the

Stateful Functions: Deploying to existing Cluster

2020-07-06 Thread Jan Brusch
Hi, quick question about Deploying a Flink Stateful Functions Application to an existing cluster: The Documentation says to integrate "statefun-flink-distribution" as additional maven Dependency in the fat jar. (https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/deployment-a

[ANNOUNCE] Weekly Community Update 2020/26-27

2020-07-06 Thread Konstantin Knauf
Dear community, happy to share this (and last) week's community update. Flink 1.11 is finally ready to be released. Besides that a few design discussions in different areas of Apache Flink, like enhanced fan out for Flink's Kinesis Source or Temporal Table support in pure Flink SQL, and of course

Re: Timeout when using RockDB to handle large state in a stream app

2020-07-06 Thread Felipe Gutierrez
Hi all, I tested the two TPC-H query 03 [1] and 10 [2] using Datastream API on the cluster with RocksDB state backend. One thing that I did that improved a lot was to replace the List POJO to a List>. Then I could load a table of 200MB in memory as my state. However, the original table is 725MB, a

Re: Asynchronous I/O poor performance

2020-07-06 Thread Benchao Li
Hi Mark, According to your data, I think the config of AsyncOperator is OK. There is one more config that might affect the throughput of AsyncOperator, it's watermark. Because unordered async operator still keeps the order between watermarks, did you use event time in your job, and if yes, what's

Logging Flink metrics

2020-07-06 Thread Manish G
Hi, Is it possible to log Flink metrics in application logs apart from publishing it to Prometheus? With regards

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
Have you looked at the SLF4J reporter? https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter On 06/07/2020 13:49, Manish G wrote: Hi, Is it possible to log Flink metrics in application logs apart from publishing it t

Re: Stateful Functions: Deploying to existing Cluster

2020-07-06 Thread Igal Shilman
Hi Jan, Stateful functions would look at the java class path for the module.yaml, So one way would be including the module.yaml in your src/main/resources/ directory. Good luck, Igal. On Mon, Jul 6, 2020 at 12:39 PM Jan Brusch wrote: > Hi, > > quick question about Deploying a Flink Stateful F

Re: Flink Kafka connector in Python

2020-07-06 Thread Manas Kale
I also tried doing this by using a User Defined Function. class DataConverter(ScalarFunction): def eval(self, str_data): data = json.loads(str_data) return ?? # I want to return data['0001'] in field 'feature1', data['0002'] in field 'feature2' etc. t_env.register_

Re: Stateful Functions: Routing to remote functions

2020-07-06 Thread Igal Shilman
Hi Jan, Two followup questions: 1. Looking at the stack trace provided in your email, it does seem like the function type is unavailable, and I'd like to follow up on that: can you please share your Dockerfile, so we have the complete picture. If you are not comfortable sharing that, then you can

Re: can't exectue query when table type is datagen

2020-07-06 Thread Danny Chan
Dear xin Destiny ~ It seems that you use the legacy planner so the exception throws [1] ~ I agree that there needs a prompt here to indicate that it is a legacy planner, have fired an issue [2], Actually for legacy, it is a regression because before the change, the computed column is supported

Re: Stateful Functions: Routing to remote functions

2020-07-06 Thread Jan Brusch
Hi igal, thanks for your comprehensive reply! As for 1. I will try and create a minimal reproduction of the case and share the code with you. It might be a few days until I get around to do it. As for 2. I will definitely give this a try. From the looks of it this seems to be the solution an

Re: Stateful Functions: Deploying to existing Cluster

2020-07-06 Thread Jan Brusch
Hi Igal, thanks for the quick reply. That does make sense and I will give it a try. It might probably make sense to add that to the Documentation. Best regards and thanks! Jan On 06.07.20 14:02, Igal Shilman wrote: Hi Jan, Stateful functions would look at the java class path for the module

Re: Logging Flink metrics

2020-07-06 Thread Manish G
Hi, Thanks for this. I did the configuration as mentioned at the link(changes in flink-conf.yml, copying the jar in lib directory), and registered the Meter with metrics group and invoked markEvent() method in the target code. But I don't see any related logs. I am doing this all on my local comp

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
How long did the job run for, and what is the configured interval? On 06/07/2020 15:51, Manish G wrote: Hi, Thanks for this. I did the configuration as mentioned at the link(changes in flink-conf.yml, copying the jar in lib directory), and registered the Meter with metrics group and invoked

Re: Logging Flink metrics

2020-07-06 Thread Manish G
Job is an infinite streaming one, so it keeps going. Flink configuration is as: metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter metrics.reporter.slf4j.interval: 30 SECONDS On Mon, Jul 6, 2020 at 7:57 PM Chesnay Schepler wrote: > How long did the job run for, and wha

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
Please enable debug logging and search for warnings from the metric groups/registry/reporter. If you cannot find anything suspicious, you can also send the foll log to me directly. On 06/07/2020 16:29, Manish G wrote: Job is an infinite streaming one, so it keeps going. Flink configuration i

Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread David Magalhães
I've noticed that this FLINK-11501 was implemented in flink-connector-kafka-0.10 [1], but it wasn't in the current version of the flink-connector-kafka. There is any reason for this, and why should be the best solution to implement a rate limit functionality in the current Kafka consumer? Thanks,

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
You have explicitly configured a reporter list, resulting in the slf4j reporter being ignored: 2020-07-06 13:48:22,191 INFO org.apache.flink.configuration.GlobalConfiguration    - Loading configuration property: metrics.reporters, prom 2020-07-06 13:48:23,203 INFO org.apache.flink.run

Re: Logging Flink metrics

2020-07-06 Thread Manish G
Hi, So I have following in flink-conf.yml : // metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter metrics.reporter.prom.host: 127.0.0.1 metrics.reporter.prom.port: metrics.reporter.slf4j.class: org.apache.fli

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
You've said elsewhere that you do see some metrics in prometheus, which are those? Why are you configuring the host for the prometheus reporter? This option is only for the PrometheusPushGatewayReporter. On 06/07/2020 18:01, Manish G wrote: Hi, So I have following in flink-conf.yml : //

Re: Logging Flink metrics

2020-07-06 Thread Manish G
The metrics I see on prometheus is like: # HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp lastCheckpointRestoreTimestamp (scope: jobmanager_job) # TYPE flink_jobmanager_job_lastCheckpointRestoreTimestamp gauge flink_jobmanager_job_lastCheckpointRestoreTimestamp{job_id="58483036154d7f72ad

Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread Chen Qin
My two cents here, - flink job already has back pressure so rate limit can be done via setting parallelism to proper number in some use cases. There is an open issue of checkpointing reliability when back pressure, community seems working on it. - rate limit can be abused easily and cause lot o

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
These are all JobManager metrics; have you configured prometheus to also scrape the task manager processes? On 06/07/2020 18:35, Manish G wrote: The metrics I see on prometheus is like: # HELP flink_jobmanager_job_lastCheckpointRestoreTimestamp lastCheckpointRestoreTimestamp (scope: jobmanager

Flink Parallelism for various type of transformation

2020-07-06 Thread Prasanna kumar
Hi , I used t2.medium machines for the task manager nodes. It has 2 CPU and 4GB memory. But the task manager screen shows that there are 4 slots. Generally we should match the number of slots to the number of cores. [image: image.png] Our pipeline is Source -> Simple Transform -> Sink. What h

Re: Logging Flink metrics

2020-07-06 Thread Manish G
In flink-conf.yaml: *metrics.reporter.prom.port: 9250-9260* This is based on information provided here *port - (optional) the port the Prometheus expo

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
Are you running Flink is WSL by chance? On 06/07/2020 19:06, Manish G wrote: In flink-conf.yaml: *metrics.reporter.prom.port: 9250-9260* This is based on information provided here

Re: Logging Flink metrics

2020-07-06 Thread Manish G
Yes. On Mon, Jul 6, 2020 at 10:43 PM Chesnay Schepler wrote: > Are you running Flink is WSL by chance? > > On 06/07/2020 19:06, Manish G wrote: > > In flink-conf.yaml: > *metrics.reporter.prom.port: 9250-9260* > > This is based on information provided here >

Re: Logging Flink metrics

2020-07-06 Thread Chesnay Schepler
WSL is a bit buggy when it comes to allocating ports; it happily lets 2 processes create sockets on the same port, except that the latter one doesn't do anything. Super annying, and I haven't found a solution to that myself yet. You'll have to configure the ports explicitly for the JM/TM, which

Re: Logging Flink metrics

2020-07-06 Thread Manish G
Ok, got it. I would try to do it manually. Thanks a lot for your inputs and efforts. With regards On Mon, Jul 6, 2020 at 10:58 PM Chesnay Schepler wrote: > WSL is a bit buggy when it comes to allocating ports; it happily lets 2 > processes create sockets on the same port, except that the latte

Re: Asynchronous I/O poor performance

2020-07-06 Thread Arvid Heise
Hi Mark, could you please check if you can tune akka? Usually in async I/O, the used library uses a thread pool that becomes the actual bottleneck. If you configure async I/O to use a capacity of 100 and parallelism of 8 on one node, you also need to have ~800 threads in akka (500 might be enough

Re: Flink Parallelism for various type of transformation

2020-07-06 Thread Arvid Heise
Hi Prasanna, overcommitting cores was actually a recommended technique a while ago to counter-balance I/O. So it's not bad per se. However, with slot sharing each core is already doing the work for source, transform, sink, so it's not necessary. So I'd go with slots = cores and I rather strongly

Re: Asynchronous I/O poor performance

2020-07-06 Thread Mark Zitnik
Hi Benchao, i have run this in the code: println(env.getConfig.getAutoWatermarkInterval) and got 200 i do fully understand how watermarks and AsyncOperator operator works, but i have decided to make a simple test that should evaluate the time it takes to enter to the asyncInvoke method and it l

Re: Flink AskTimeoutException killing the jobs

2020-07-06 Thread M Singh
Thanks Xintong.  I will check the logs.   On Sunday, July 5, 2020, 09:29:31 PM EDT, Xintong Song wrote: As I already mentioned, I would suggest to look into the jobmanager logs and gc logs, see if there's any problem that prevent the process from handling the rpc messages timely. Th

Re: Asynchronous I/O poor performance

2020-07-06 Thread Arvid Heise
Hi Mark, Async wait operators cannot be chained to sources so the messages go through the network stack. Thus, having some latency is normal and cannot be avoided. It can be tuned though, but I don't think that this is the issue at hand as it should mostly impact latency and affect throughput less

SSL for QueryableStateClient

2020-07-06 Thread mail2so...@yahoo.co.in
Hello, I am running flink on Kubernetes, and from outside the Ingress to a proxy on Kubernetes is via SSL 443 PORT only. Can you please provide guidance on how to setup the SSL for  QueryableStateClient, the client to inquire the state.  Please let me know if any other details is needed. Thanks &

Decompressing Tar Files for Batch Processing

2020-07-06 Thread Austin Cawley-Edwards
Hey all, I need to ingest a tar file containing ~1GB of data in around 10 CSVs. The data is fairly connected and needs some cleaning, which I'd like to do with the Batch Table API + SQL (but have never used before). I've got a small prototype loading the uncompressed CSVs and applying the necessar

Re: Kafka Rate Limit in FlinkConsumer ?

2020-07-06 Thread David Magalhães
Thanks for the reply Chen. My use case is a "simple" get from Kafka into S3. The job can read very quickly from Kafka and S3 is having some issues keeping up. The backpressure don't have enough time to actuate in this case, and when it reaches the checkpoint time some errors like heartbeat timeout

Re: Heartbeat of TaskManager timed out.

2020-07-06 Thread Ori Popowski
Hi, I just wanted to update that the problem is now solved! I suspect that Scala's flatten() method has a memory problem on very large lists (> 2 billion elements). When using Scala Lists, the memory seems to leak but the app keeps running, and when using Scala Vectors, a weird IllegalArgumentExc