Re: Partitioning based on key flink kafka sink

2019-11-06 Thread vino yang
Hi Vishwas, You should pay attention to the other args. The constructor provided by you has a `KeyedSerializationSchema` arg, while the comments of the constructor which made you confused only has a `SerializationSchema` arg. That's their difference. Best, Vino Vishwas Siravara 于2019年11月6日周三 上

Re: Limit max cpu usage per TaskManager

2019-11-06 Thread vino yang
Hi Lu, When using Flink on YARN, it will rely on YARN's resource management capabilities, and Flink cannot currently limit CPU usage. Also, what version of Flink do you use? As far as I know, since Flink 1.8, the -yn parameter will not work. Best, Vino Lu Niu 于2019年11月6日周三 下午1:29写道: > Hi, > >

Re: Limit max cpu usage per TaskManager

2019-11-06 Thread Victor Wong
Hi Lu, You can check out which operator thread causes the high CPU usage, and set a unique slot sharing group name [1] to it to prevent too many operator threads running in the same TM. Hope this will be helpful😊 [1]. https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/

Flink savepoint(checkpoint) recovery dev debug

2019-11-06 Thread qq
Hi all. I want to simulation the shell command which “flink -s savepoint” , this command only can run with shell command, I want to debug it on dev, local development environment, anyone could help me ? Thanks very much. I only can use Savepoint.load to read the savepoint metadata and data.

Re: Limit max cpu usage per TaskManager

2019-11-06 Thread Yang Wang
If you want to limit the TaskManager container cpu usage, it is based on your yarn cluster configuration. By default, yarn only uses cpu share. You need to set `yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage=true` in yarn-site.xml of all yarn node managers. Best, Yang Vi

Side outputs in Async I/O

2019-11-06 Thread Romain Gilles
Hi flink, I would like to know if you think that providing access to the side output from Async I/O cloud be a good idea. Thanks in advance, Romain

need some advice comparing sliding window to a single unit

2019-11-06 Thread Avi Levi
Hi, I want to get the average of the last x hours and compare it to the sum of the current hour. I thought of using ProcessWindowFunction for 8 hours and do the calculation i.e consuming 8 hours of data and group it by the hour and do the math, but it seems very inefficient especially considering t

ctx.timestamp() returning null when using Processing Time

2019-11-06 Thread Komal Mariam
Dear all, I want to clear some of my variables in KeyedBroadcastProcessFunction after a certain time. I implemented the onTimer() function but even though I am using ProcessingTime like so: env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime), I am getting null when ctx.timestamp() i

Re: How to write type information for a Java Set and List inside a Tuple?

2019-11-06 Thread Komal Mariam
Thank you. I went with the first one. Have not encountered any problems during run time. On Fri, 1 Nov 2019 at 12:50, Jingsong Lee wrote: > Hi Komal: > I think snippet 1 is better, because it carry more information like > ListTypeInfo. > Consider snippet 2, now our type inference in TypeInformat

HadoopInputFormat

2019-11-06 Thread Dominik Wosiński
Hey, I wanted to ask if the *HadoopInputFormat* does currently support some custom partitioning scheme ? Say I have 200 files in HDFS each having the partitioning key in name, can we ATM use HadoopInputFormat to distribute reading to multiple TaskManagers using the key ?? Best Regards, Dom.

What metrics can I see the root cause of "Buffer pool is destroyed" message?

2019-11-06 Thread Felipe Gutierrez
Hi community, Looking at the code [1] it seems that it is related to not have availableMemorySegments anymore. I am looking at several metrics but it hasn't seemed to help me understand where I can measure the root cause of this error message. - flink_taskmanager_Status_Shuffle_Netty_AvailableMem

Re: How can I get the backpressure signals inside my function or operator?

2019-11-06 Thread Felipe Gutierrez
Does anyone know in which metric I can rely on to know if a given operator is activating the backpressure? Or how can I call the same java object that the Flink UI calls to give me the ratio of backpressure? Thanks, Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://f

Re: Flink savepoint(checkpoint) recovery dev debug

2019-11-06 Thread Yun Tang
Hi The entrance of restoring savepoint is CheckpointCoordinator#restoreSavepoint [1], hope this could help you. [1] https://github.com/apache/flink/blob/9b43f13a50848382fbd634081b82509f464e62ca/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L1173 Be

Re: ctx.timestamp() returning null when using Processing Time

2019-11-06 Thread Yun Tang
Hi Komal Please read carefully on the Javadoc of BaseContext#timeStamp [1], it would be null if your program is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. If you want to fetch current processing time stamp, please use `ctx# currentProcessingTime()`. [1]

What is the slot vs cpu ratio?

2019-11-06 Thread srikanth flink
Hi there, I've 3 node cluster with 16cores each. How many slots could I utilize at max and how to I do the calculation? Thanks Srikanth

Job Distribution Strategy On Cluster.

2019-11-06 Thread srikanth flink
Hi there, I'm running Flink with 3 node cluster. While running my jobs(both SQL client and jar submission), the jobs are being assigned to single machine instead of distribution among the cluster. How could I achieve the job distribution to make use of the computation power? Thanks Srikanth

Re: Issue with writeAsText() to S3 bucket

2019-11-06 Thread Fabian Hueske
Hi Micheal, I'm not super familiar with S3 but from my understanding, files might not be visible to other services (such as a directory browser) immediately after they've been created. Did you wait for some time after you cancelled the job before checking for the files? Best, Fabian Am Mo., 28.

Flink vs Spark deployment modes on multi-node Cluster

2019-11-06 Thread Sheel Pancholi
In Spark, the three *cluster* (not local) deployment options that I am familiar with: - Standalone - Mesos - Yarn There might be more *cluster deployment* options but I am concerned with these three. All the three above support *client* and *cluster* modes of deployment. The *client* mod

RocksDB and local file system

2019-11-06 Thread Jaqie Chan
Hello I am using Flink rocksDb state backend, the documentation seems to imply i can use a regular file system such as: file:///data/flink/checkpoints, but the code javadoc only mentions hdfs or s3 option here. I am wondering if it's possible to use local file system with flink rocksdb backend.

Re: What metrics can I see the root cause of "Buffer pool is destroyed" message?

2019-11-06 Thread Zhijiang
Hi Felipe, "Buffer pool is destroyed" is mainly caused by canceling task. That means there are other tasks failure which would trigger canceling all the topology tasks by job master. So if you want to find the root cause, it is proper to check the job master log to find the first failure which

Re: What metrics can I see the root cause of "Buffer pool is destroyed" message?

2019-11-06 Thread Felipe Gutierrez
I guess it was happening because I canceled the old job and started it again. When I restarted my cluster it stoped to throw the error. But I still not sure which metric I can infer if backpressure is happening. Felipe *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipe

Re: How can I get the backpressure signals inside my function or operator?

2019-11-06 Thread Chesnay Schepler
I don't think there is a truly sane way to do this. I could envision a separate application triggering samples via the REST API, writing the results into kafka which your operator can read. This is probably the most reasonable solution I can come up with. Any attempt at accessing the TaskExec

Re: How can I get the backpressure signals inside my function or operator?

2019-11-06 Thread Felipe Gutierrez
If I can trigger the sample via rest API it is good for a POC. Then I can read from any in-memory storage using a separated thread within the operator. But what is the rest api that gives to me the ratio value from backpressure? Thanks *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--

flink's hard dependency on zookeeper for HA

2019-11-06 Thread Vishwas Siravara
Hi all, I am using flink 1.7.2 as a standalone cluster in high availability mode with zookeeper. I have noticed that all flink processes go down once zookeeper goes down ? Is this expected behavior since the leader election has already happened and the job has been running for several hours. Best

Documentation issue maybe

2019-11-06 Thread Romain Gilles
Hi all, I think the code example in following section has a thread safety issue: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html#junit-rule-miniclusterwithclientresource The class CollectSink is not thread safe as only the write to the values collection are sync

Re: Limit max cpu usage per TaskManager

2019-11-06 Thread Lu Niu
Hi, Thanks for replying! Basically I want to limit cpu usage so that different application don't affect each other. What's current best practice? Looks `yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage=true` is one way. How to set how many cpu resources to use? is it "yarn.c

StreamingFileSink to S3 failure to complete multipart upload

2019-11-06 Thread Harrison Xu
Hello, I'm seeing the following behavior in StreamingFileSink (1.9.1) uploading to S3. 2019-11-06 15:50:58,081 INFO com.quora.dataInfra.s3connector.flink.filesystem.Buckets -* Subtask 1 checkpointing for checkpoint with id=5025 (max part counter=3406).* 2019-11-06 15:50:58,448 INFO org.apac

Re: StreamingFileSink to S3 failure to complete multipart upload

2019-11-06 Thread Harrison Xu
To add to this, attempting to restore from the most recent manually triggered *savepoint* results in a similar, yet slightly different error: java.io.FileNotFoundException: upload part on *tmp/kafka/meta/ads_action_log_kafka_uncounted/dt=2019-11-06T00/partition_6/part-4-2158*: org.apache.flink.fs.

Stateful functions

2019-11-06 Thread Dan Pettersson
Hello, I've started to play around with Stateful functions and I like it a lot :-) Also Thanks for the comprehensive documentation and your very good talk Igal. I would appreciate if you could give some hints/ideas over how to structure an application with the following criteria: One kafka ingr

Re: ctx.timestamp() returning null when using Processing Time

2019-11-06 Thread Komal Mariam
The ctx.currentProcessingTIme() escaped my notice. Thank you for pointing it out Yun Tang. I now set my processing Timer using ctx.timerService().registerProcessingTimeTimer(ctx.currentProcessingTime()+2000); and it works. On Wed, 6 Nov 2019 at 21:57, Yun Tang wrote: > Hi Komal > > > > Please

Does Flink merge streams deterministically?

2019-11-06 Thread amran dean
The Flink Kafka consumer's watermark extractor claims:. "The watermark extractor will run per Kafka partition, watermarks will be merged across partitions in the same way as in the Flink runtime, when streams are merged." Suppose we have simple situation where two streams merge into one. Is the o

Re: RocksDB and local file system

2019-11-06 Thread vino yang
Hi Jaqie, For testing, you can use the local file system pattern (e.g. "file:///"). Technically speaking, it's OK to specify the string path provided by you. However, in the production environment, we do not recommend using the local file system. Because it does not provide high availability. Be

Flink SQL dynamic configuration

2019-11-06 Thread Jaqie Chan
Hello, I use Flink SQL API to process a data stream from Kafka. To process these data, I use some configurations loaded from an HTTP endpoint once at initialization. The configuration is loaded only once at job initialization. So it works well with a static configuration, but do not handle dynami

Re: How can I get the backpressure signals inside my function or operator?

2019-11-06 Thread Zhijiang
You can refer to this document [1] for the rest API details. Actually the backpreesure uri refers to "/jobs/:jobid/vertices/:vertexid/backpressure". But I am not sure whether it is easy to get the jobid and vertexid. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/res

Till Rohrmann - Can you please share your code for FF - SF - Flink as a lib

2019-11-06 Thread arpit8622
https://www.youtube.com/watch?v=WeHuTRwicSw Basically i wanted to check the job/task manager k8s yaml you used for above demo. It will be very helpful for the community. If its already commited somewhere can you please direct me to the link. --

Re: What is the slot vs cpu ratio?

2019-11-06 Thread vino yang
Hi srikanth, Referred from the official document: "Each Flink TaskManager provides processing slots in the cluster. The number of slots is typically proportional to the number of available CPU cores of each TaskManager. As a general recommendation, the number of available CPU cores is a good defa

When using udaf, the startup job has a “Cannot determine simple type name 'com' ” exception(Flink version 1.7.2)

2019-11-06 Thread mailtolrl
My flink streaming job use a udaf, set 60 parallelisms,submit job in yarn cluster mode,and then happens every time I start.

Re: Documentation issue maybe

2019-11-06 Thread Zhenghua Gao
Your are right that it's not thread-safety. I think we can use Collections.synchronizedList() to get a thread-safety list[1]. And remove the synchronized keyword from the invoke interface. I have created a ticket to track this[2], please feel free to fix it by make a pull request. [1] https://doc

Re: How can I get the backpressure signals inside my function or operator?

2019-11-06 Thread Felipe Gutierrez
cool! I got to use it. Now I have to get the jobID and vertice ID inside the operator. I forgot to mention. I am using Flink 1.9.1 Thanks! *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com * On Thu, Nov

Re: When using udaf, the startup job has a “Cannot determine simple type name 'com' ” exception(Flink version 1.7.2)

2019-11-06 Thread vino yang
Hi mailtolrl, Can you share more context about your program and UDAF. Best, Vino mailtolrl 于2019年11月7日周四 下午3:05写道: > My flink streaming job use a udaf, set 60 parallelisms,submit job in yarn > cluster mode,and then happens every time I start. > > > >

How long is the flink sql task state default ttl?

2019-11-06 Thread LakeShen
Hi community, as I know I can use idle state retention time to clear the flink sql task state,I have a question is that how long the flink sql task state default ttl is . Thanks

Re:Re: When using udaf, the startup job has a “Cannot determine simple type name 'com' ” exception(Flink version 1.7.2)

2019-11-06 Thread mailtolrl
Hi, vino: Thanks for your answer. When I set 20 parallelism, I can run job succeed every time. When setting 40 parallelism, sometimes I can submit it successfully, sometimes throw the exception. When setting 60 parallelism at the time, it has not been submitted successfully, always this exce