Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread Chesnay Schepler
That's quite weird that it tries to us the local file-system. Maybe it derives the FS from state.backend.fs.checckpointdir, but uses it for savepoints.dir? What happens if you set state.backend.fs.checkpointdir also to HDFS? On 21.06.2018 08:07, Siew Wai Yow wrote: Thanks Chesnay, the appli

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Till Rohrmann
Hi, if the rest.address is different from the jobmanager.rpc.address, then you should specify that in the flink-conf.yaml and Flink will connect to rest.address. Only if rest.address is not specified, the system will fall back to use the jobmanager.rpc.address. Currently, the rest server endpoint

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread sihua zhou
Hi Yow, I had a look at the related code, I think this seems like a bug. Flink use Checkpoint path's FileSystem to create the output stream for the Savepoint, but in your case the checkpoint & savepoint are not using the same file system. A workaround is to use the same file system for both ch

Re: Strictly use TLSv1.2

2018-06-21 Thread Vinay Patil
Hi, I have deployed Flink 1.3.2 and enabled SSL settings. From the ssl debug logs it shows that Flink is using TLSv1.2. However based on the security scans we have observed that it also allows TLSv1.0 and TLSv1.1. In order to strictly use TLSv1.2 we have updated the following property of java.sec

Re:Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
Hi Garvit, > Now, let's say, we clear the state. Would the state data be removed from HDFS > too? The state data would not be removed from HDFS immediately, if you clear the state in your job. But after you clearing the state in your job, the later completed checkpoint won't contain the stat

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread sihua zhou
I just create a JIRA for this: https://issues.apache.org/jira/browse/FLINK-9633 On 06/21/2018 15:10,Chesnay Schepler wrote: That's quite weird that it tries to us the local file-system. Maybe it derives the FS from state.backend.fs.checckpointdir, but uses it for savepoints.dir? What happens

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread Chesnay Schepler
If you could open a JIRA this would be great. On 21.06.2018 09:07, sihua zhou wrote: Hi Yow, I had a look at the related code, I think this seems like a bug. Flink use Checkpoint path's FileSystem to create the output stream for the Savepoint, but in your case the checkpoint & savepoint are n

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
So, would it delete all the files in HDFS associated with the cleared state? On Thu, Jun 21, 2018 at 12:58 PM sihua zhou wrote: > Hi Garvit, > > > Now, let's say, we clear the state. Would the state data be removed from > HDFS too? > > The state data would not be removed from HDFS immediately, i

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
Hi Garvit, Let's say you clearing the state at timestamp t1, then the checkpoints completed before t1 will still contains the data you cleared. But the future checkpoints won't contain the cleared data again. But I'm not sure what you meaning by the cleared state, you can only clear a key-valu

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
I am maintaining state data for a key in ValueState. As per [0] I can clear() state for that key. [0] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/state.html Please let me know. Thanks, On Thu, Jun 21, 2018 at 1:19 PM sihua zhou wrote: > Hi Garvit, > > Let's s

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
Now, after clearing state for a key, I don't want that redundant data in the state backend. This is my concern. Please let me know if there are any gaps. Thanks, On Thu, Jun 21, 2018 at 1:31 PM Garvit Sharma wrote: > I am maintaining state data for a key in ValueState. As per [0] I can > clear

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread sihua zhou
Yes, you can clear the state for a key(the currently active key), if you clear it, it means that you have also cleaned it from the state backend, and the future checpoints won't contains the key anymore unless you add it again. Best, Sihua On 06/21/2018 16:04,Garvit Sharma wrote: Now, after

Re: Cleaning of state snapshot in state backend(HDFS)

2018-06-21 Thread Garvit Sharma
Thank you for the clarification. On Thu, Jun 21, 2018 at 1:36 PM sihua zhou wrote: > Yes, you can clear the state for a key(the currently active key), if you > clear it, it means that you have also cleaned it from the state backend, > and the future checpoints won't contains the key anymore unle

Re: A question about Kryo and Window State

2018-06-21 Thread Fabian Hueske
Hi Vishal, In general, Kryo serializers are not very upgrade friendly. Serializer compatibility [1] might be right approach here, but Gordon (in CC) might know more about this. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/custom_serialization.html

Re: Control insert database with dataset

2018-06-21 Thread Fabian Hueske
Hi Dulce, This functionality is not supported by the JDBCOutputFormat. Some database systems (AFAIK, MySQL) support Upsert writes, i.e., writes that insert if the primary key is not present or update the row if the PK exists. Not sure if that would meet your requirements. If you don't want to go

Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Fabian Hueske
Hi Garrett, I agree, there seems to be an issue and increasing the timeout should not be the right approach to solve it. Are you running streaming or batch jobs, i.e., do some of the tasks finish much earlier than others? I'm adding Till to this thread who's very familiar with scheduling and proc

Re: Debug job execution from savepoint

2018-06-21 Thread Fabian Hueske
Hi Manuel, I had a look and couldn't find a way to do it. However, this sounds like a very useful feature to me. Would you mind creating a Jira issue [1] for that? Thanks, Fabian [1] https://issues.apache.org/jira/projects/FLINK 2018-06-18 16:23 GMT+02:00 Haddadi Manuel : > Hi all, > > > I wo

Re: A question about Kryo and Window State

2018-06-21 Thread Tzu-Li (Gordon) Tai
Hi Vishal, Kryo has a serializer called `CompatibleFieldSerializer` that allows for simple backward compatibility changes, such as adding non-optional fields / removing fields. If using the KryoSerializer is a must, then a good thing to do is to register Kryo's `CompatibleFieldSerializer` as

Re: Flink 1.5 TooLongFrameException in cluster mode?

2018-06-21 Thread chrisr123
This looks related to using the -m option on CLI: This works: $FLINK_HOME/bin/flink run -c $CLASS $JARFILE but this causes the error: $FLINK_HOME/bin/flink run -m jobmanagermachine:6123 -c $CLASS $JARFILE I found this thread here from April 27 http://mail-archives.apache.org/mod_mbox/flink-

Re: Questions regarding to Flink 1.5.0 REST API change

2018-06-21 Thread Siew Wai Yow
Thanks @SihuaZhou, you are right that this is a bug. Just check the source code too. @Chesnay Schepler, Tested with both checkpoint and savepoint at same File system and it is working as expected. Thanks guys! -Yow From: Chesnay S

Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Till Rohrmann
Hi Garrett, killing of idle TaskManager should not affect the execution of the job. By definition a TaskManager only idles if it does not execute any tasks. Could you maybe share the complete logs (of the cluster entrypoint and all TaskManagers) with us? Cheers, Till On Thu, Jun 21, 2018 at 10:2

Re: Backpressure from producer with flink connector kinesis 1.4.2

2018-06-21 Thread Tzu-Li (Gordon) Tai
Hi, According to the description in [1], then yes, I think it is expected that eventually YARN containers running TMs that execute the producer sink subtasks will be killed due to memory problems. It seems like that KPL client is only a wrapper around a C++ daemon process, so it actually wouldn

Re: # of active session windows of a streaming job

2018-06-21 Thread Fabian Hueske
Hi Dongwon, Yes, the counter state should be stored in operator state which is not available on Triggers. Chesnay: Can a window function (like ProcessWindowFunction) access (read, write) the counter of its associated Trigger to checkpoint and restore it? Best, Fabian 2018-06-20 16:59 GMT+02:00 D

Re: How to use broadcast variables in data stream

2018-06-21 Thread Fabian Hueske
Hi, if the list is static and not too large, you can pass it as a parameter to the function. Function objects are serialized (using Java's default serialization) and shipped to the workers for execution. If the data is dynamic, you might want to have a look at Broadcast state [1]. Best, Fabian

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Sampath Bhat
hi Yes I've specified the rest.address for the flink client to connect to the rest.address and the rest.address is valid and working fine but my question is why am I supposed to give jobmanager.rpc.address for flink client to connect to flink cluster if flink client depends only on rest.address? O

Re: Breakage in Flink CLI in 1.5.0

2018-06-21 Thread Till Rohrmann
The reason why you still have to do it is because we still have to support the legacy mode where the client needs to know the JobManager RPC address. Once we remove the legacy mode, we could change the HighAvailabilityServices such that we have client facing HA services which only retrieve the rest

Re: How to use broadcast variables in data stream

2018-06-21 Thread zhen li
Thanks for your reply. But broadcast state seems not supported in Flink-1.3 . I found this in Flink-1.3: Broadcasting DataStream → DataStream Broadcasts elements to every partition. dataStream.broadcast(); But I don’t know how to convert it to list and get it in stream context . 在 2018年6月21日,

Re: Flink 1.5 TooLongFrameException in cluster mode?

2018-06-21 Thread Chesnay Schepler
Due to the job-submission changes in 1.5 you attempted to send REST requests to akka, which causes the exception you received. Instead you want to specify the REST port instead, which by default is 8081 (configurable via rest.port). On 21.06.2018 10:44, chrisr123 wrote: This looks related to

Re: How to use broadcast variables in data stream

2018-06-21 Thread Fabian Hueske
That's correct. Broadcast state was added with Flink 1.5. You can use DataStream.broadcast() in Flink 1.3 but it has a few limitations. For example, you cannot connect a keyed and a broadcasted stream. 2018-06-21 11:58 GMT+02:00 zhen li : > Thanks for your reply. > But broadcast state seems not

Re: Memory Leak in ProcessingTimeSessionWindow

2018-06-21 Thread ashish pok
Hi Stefan,  Thanks for outlining the steps and are similar to what we have been doing for OOM issues. However, I was looking for something more high level on whether state / key handling needs some sort of cleanup specifically that is not done by default. I am about 99% (nothing is certain:)) su

Re: # of active session windows of a streaming job

2018-06-21 Thread Chesnay Schepler
Without modifications to Flink? No. By design nothing can intercept or retrieve metrics with the metrics API. For this pattern the usual recommendation is to explicitly pass the metric to components that require it. If modifications are an option, what you could do is * define a counter in the

Re: Backpressure from producer with flink connector kinesis 1.4.2

2018-06-21 Thread Liu, Gavin (CAI - Atlanta)
Thanks, Gordon. Glad to hear you confirm on this. I learned a lot from the open pr btw. I wonder except adding back pressure support in the producer, is there any other way to protect yarn from crashing, e.g., through configuration? From: "Tzu-Li (Gordon) Tai" Date: Thursday, June 21, 2018 at

Re: # of active session windows of a streaming job

2018-06-21 Thread Dongwon Kim
Hi Fabian and Chesnay, Thank you guys. Fabian : Unfortunately, as Chesnay said, MetricGroup doesn't allow for ProcessWindowFunction to access to a counter defined in Trigger. Chesnay : I'm going to follow your advice on how to modify Flink. Thank you very much! Best, - Dongwon On Thu, Jun 21,

How to set log level using Flink Docker Image

2018-06-21 Thread Guilherme Nobre
Hi everyone, I have a Flink Cluster created from Flink's Docker Image. I'm trying to set log level to DEBUG but couldn't find how. I know where logback configuration files are as per documentation: "The conf directory contains a logback.xml file which can be modified and is used if Flink is start

Re: How to set log level using Flink Docker Image

2018-06-21 Thread Dominik Wosiński
You can for example mount the *conf* directory using docker volumes. You would need to have *logback.xml* and then mount it as *conf/logback.xml *inside the image. Locally You could do this using *docker-compose.yml*, for mounting volumes in kubernetes refer to this page: https://kubernetes.io/doc

Re: How to set log level using Flink Docker Image

2018-06-21 Thread Guilherme Nobre
Thanks, that sounds reasonable. I'll try it out tomorrow :) Cheers, G On Thu, Jun 21, 2018 at 6:56 PM Dominik Wosiński wrote: > You can for example mount the *conf* directory using docker volumes. > > You would need to have *logback.xml* and then mount it as *conf/logback.xml > *inside the imag

Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Garrett Barton
Thank you all for the reply! I am running batch jobs, I read in a handful of files from HDFS and output to HBase, HDFS, and Kafka. I run into this when I have partial usage of the cluster as the job runs. So right now I spin up 20 nodes with 3 slots, my job at peak uses all 60 slots, but by the

Re: Flink 1.5 Yarn Connection unexpectedly closed

2018-06-21 Thread Garrett Barton
Actually, random thought, could yarn preemption be causing this? What is the failure scenario should a working task manager go down in yarn that is doing real work? The docs make it sound like it should fire up another TM and get back to work out of the box, but I'm not seeing that. On Thu, Jun

Need a map-like state in an operator state

2018-06-21 Thread xsheng
Hi All, I'm sorry if I'm double posting, but I posted it before subscribing and I don't see it in my post lists. So I'm posting it again. The Flink app we are trying to build is as such: read from kafka, sort the messages according to some dependency rules, and only send messages that have satis

Re: Need a map-like state in an operator state

2018-06-21 Thread xsheng
Solved it by using a key selector that returns a constant, so that creates a "pseudo" keyedStream with only one partition. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

SQL Do Not Support Custom Trigger

2018-06-21 Thread YennieChen88
I found that flink SQL use the specific default trigger, which will not triggered until the window closes. But sometimes, we need to trigger before window closes. As the class *WindowAssigner *provides method *getDefaultTrigger *with parameter *StreamExecutionEnvironment*, how about passing a c