Re: Will that be a problem if POJO key type didn't override equals?

2018-02-05 Thread Tony Wei
Hi Timo, Thanks for your response. I will implement equals for my POJO directly. Is that be okay instead of wrap it into another class? Furthermore, I want to migrate the states from the previous job. Will it lead to state lost? I run my job on Flink 1.4.0. I used RocksDBStateBackend and only Valu

Re: Will that be a problem if POJO key type didn't override equals?

2018-02-05 Thread Timo Walther
Hi Tony, not having a proper equals() method might work for a keyBy() (partitioning operation) but it can lead to unexpected side effects when dealing with state. If not now, then maybe in the future. For example, heap-based state uses a hash table data structures such that your key might nev

Testing flink class loading

2018-02-05 Thread Data Engineer
I am trying to run the ClassLoaderTestProgram on flink. 1. I have started Flink in local mode with the following command: bin/jobmanager.sh start local 2. I ran the ClassLoaderTestProgram jar: bin/flink run ClassLoaderTestProgram.jar --resolve-first child --output out.txt I get a

Re: Reduce parallelism without network transfer.

2018-02-05 Thread Kien Truong
Thanks Piotr, it works. May I ask why default behavior when reducing parallelism is rebalance, and not rescale ? Regards, Kien ⁣Sent from TypeApp ​ On Feb 5, 2018, 15:28, at 15:28, Piotr Nowojski wrote: >Hi, > >It should work like this out of the box if you use rescale method: > >https://ci.ap

Will that be a problem if POJO key type didn't override equals?

2018-02-05 Thread Tony Wei
Hi all, I have defined a POJO class that override Object#hashCode and used it in keyBy(). The pipeline looks good (i.e. no exception that told me it is UNSUPPORTED key types), but I'm afraid that it will leads to a problem that elements that I think have the same key will not get the same state be

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-05 Thread xiaobin yan
Hi , You've got a point. I saw that method, but how can I make sure that all the subtasks checkpoint are finished, because I can only write _SUCCESS file at that time. Best, Ben > On 5 Feb 2018, at 6:34 PM, Fabian Hueske wrote: > > In case of a failure, Flink rolls back the job to th

Re: Joining data in Streaming

2018-02-05 Thread Steven Wu
There is also a discussion of side input https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API I would load the smaller data set as static reference data set. Then you can just do single source streaming of the larger data set. On Wed, Jan 31, 2018 at 1:09 AM, S

Re: Task Manager detached under load

2018-02-05 Thread Ashish Pokharel
Hi Till, Thanks for detailed response. I will try to gather some of this information during the week and follow up. — Ashish > On Feb 5, 2018, at 5:55 AM, Till Rohrmann wrote: > > Hi, > > this sounds like a serious regression wrt Flink 1.3.2 and we should > definitely find out what's causin

Re: ML and Stream

2018-02-05 Thread Fabian Hueske
That's correct. It's not possible to persist data in memory across jobs in Flink's batch API. Best, Fabian 2018-02-05 18:28 GMT+01:00 Christophe Jolif : > Fabian, > > Ok thanks for the update. Meanwhile I was looking at how I could still > leverage current FlinkML API, but as far as I can see, i

how to match external checkpoints with jobs during recovery

2018-02-05 Thread xiatao123
The external checkpoints are in the format of checkpoint_metadata-0057 which I have no idea which job this checkpoint metadata belongs to if I have multiple jobs running at the same time. If a job failed unexpected, I need to know which checkpoints belongs to the failed job. Is there API o

Re: ML and Stream

2018-02-05 Thread Christophe Jolif
Fabian, Ok thanks for the update. Meanwhile I was looking at how I could still leverage current FlinkML API, but as far as I can see, it misses the ability of being able to persist its own models? So even for pure batch it prevents running your (once built) model in several jobs? Or am I missing s

Re: ML and Stream

2018-02-05 Thread Fabian Hueske
Hi Christophe, it is true that FlinkML only targets batch workloads. Also, there has not been any development since a long time. In March last year, a discussion was started on the dev mailing list about different machine learning features for stream processing [1]. One result of this discussion

Re: Kafka and parallelism

2018-02-05 Thread Tzu-Li (Gordon) Tai
Yes, the answer to that would be no. If you do not explicitly set a parallelism for the consumer, the parallelism by default will be whatever the parallelism of the job is, and is independent of how many Kafka partitions there are. Cheers, Gordon On 5 February 2018 at 11:42:21 AM, Christophe J

ML and Stream

2018-02-05 Thread Christophe Jolif
Hi all, Sorry, this is me again with another question. Maybe I did not search deep enough, but it seems the FlinkML API is still pure batch. If I read https://cwiki.apache.org/confluence/display/FLINK/FlinkML%3A+Vision+and+Roadmap it seems there was the intend to "exploit the streaming nature of

Rebalance to subtasks in same TaskManager instance

2018-02-05 Thread johannes.barn...@clarivate.com
Hi, I have a streaming topology with source parallelism of M and a target operator parallelism of N. For optimum performance I have found that I need to choose M and N independently. Also, the source subtasks do not all produce the same number of records and therefor I have to rebalance to the tar

Fwd: Global window keyBy

2018-02-05 Thread miki haiat
yes . another question is how can i clear non trigger events after a period of time. is thire a way to configure some "timeout " thanks, allot . On Mon, Feb 5, 2018 at 10:40 AM, Piotr Nowojski wrote: > Hi, > > FIRE_AND_PURGE triggers `org.apache.flink.api.common.state.State#clear()` > call a

Re: Task Manager detached under load

2018-02-05 Thread Till Rohrmann
Hi, this sounds like a serious regression wrt Flink 1.3.2 and we should definitely find out what's causing this problem. Given from what I see in the logs, the following happens: For some time the JobManager seems to no longer receive heartbeats from the TaskManager. This could be, for example, d

Re: RocksDB / checkpoint questions

2018-02-05 Thread Christophe Jolif
Thanks a lot for the details Steffan. -- Christophe On Mon, Feb 5, 2018 at 11:31 AM, Stefan Richter wrote: > Hi, > > you are correct that RocksDB has a „working directory“ on local disk and > checkpoints + savepoints go to a distributed filesystem. > > - if I have 3 TaskManager I should expect

Re: Kafka and parallelism

2018-02-05 Thread Christophe Jolif
Thanks. It helps indeed. I guess the last point it does not explicitly answer is "does just creating a kafka consumer reading from multiple partition set the parallelism to the number of partitions". But reading between the lines I think this answer is clearly no. You have to set your parallelism

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

2018-02-05 Thread Fabian Hueske
In case of a failure, Flink rolls back the job to the last checkpoint and reprocesses all data since that checkpoint. Also the BucketingSink will truncate a file to the position of the last checkpoint if the file system supports truncate. If not, it writes a file with the valid length and starts a

Re: RocksDB / checkpoint questions

2018-02-05 Thread Stefan Richter
Hi, you are correct that RocksDB has a „working directory“ on local disk and checkpoints + savepoints go to a distributed filesystem. - if I have 3 TaskManager I should expect more or less (depending on how the tasks are balanced) to find a third of my overall state stored on disk on each of t

Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
Yes, but this issue is still a part of the FLIP-6 work. Piotrek > On 5 Feb 2018, at 11:01, mingleizhang wrote: > > I found a website: https://issues.apache.org/jira/browse/FLINK-4360 > implemented this before. > > Rice. > > > > > > At 20

Re:Re: Why does jobmanager running needs slot ?

2018-02-05 Thread mingleizhang
I found a website: https://issues.apache.org/jira/browse/FLINK-4360 implemented this before. Rice. At 2018-02-05 17:56:49, "Piotr Nowojski" wrote: It seems so - but I’m saying this only basing on a annotations when this method was added (in the last couple of months). I’m not that much

Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
It seems so - but I’m saying this only basing on a annotations when this method was added (in the last couple of months). I’m not that much familiar with those code parts. Piotrek > On 5 Feb 2018, at 10:51, mingleizhang wrote: > > Makes sense to me now. Is it a new design at FLIP6 ? > > Ric

Re:Re: Why does jobmanager running needs slot ?

2018-02-05 Thread mingleizhang
Makes sense to me now. Is it a new design at FLIP6 ? Rice. At 2018-02-05 17:49:05, "Piotr Nowojski" wrote: I might be wrong but I think it is other way around and the naming of this method is correct - it does exactly what it says. TaskManager comes with some predefined task slots and i

Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
I might be wrong but I think it is other way around and the naming of this method is correct - it does exactly what it says. TaskManager comes with some predefined task slots and it is the one that is offering them to a JobManager. JobManager can use those slots offers to (later!) schedule tasks

Re:Re: Why does jobmanager running needs slot ?

2018-02-05 Thread mingleizhang
Yes. Thanks Piotrek. Of course. So, TaskExecutor#offerSlotsToJobManager sounds confuse to me. It might be better to rename it to requestSlotsFromJobManager. I dont know whether it is sounds OKay for that. I just feel like offerSlotToJobManager sounds strange.. What do you think of this ? Rice.

Re: Multiple Elasticsearch sinks not working in Flink

2018-02-05 Thread Fabian Hueske
Great, thanks for the feedback! Best, Fabian 2018-02-03 9:37 GMT+01:00 Teena Kappen // BPRISE : > Hi Fabian, > > > > We tried the fix that was merged and the sinks are working correctly now. > Thank you for resolving the issue. > > > > Regards, > > Teena > > > > *From:* Teena Kappen // BPRISE >

Re: Checkpoint is not triggering as per configuration

2018-02-05 Thread Piotr Nowojski
Hi, Did you check task manager and job manager logs for any problems? Piotrek > On 5 Feb 2018, at 03:19, syed wrote: > > Hi > I am new to the flink world, and trying to understand. Currently, I am using > Flink 1.3.2 on a small cluster of 4 nodes, > I have configured checkpoint directory at H

Re: Why does jobmanager running needs slot ?

2018-02-05 Thread Piotr Nowojski
org.apache.flink.runtime.jobmaster.JobMaster#offerSlots is a receiver side of an RPC call that is being initiated on the sender side: org.apache.flink.runtime.taskexecutor.TaskExecutor#offerSlotsToJobManager. In other words, JobMasterGateway.offerSlots is called by a TaskManager and it is a way

Re: Spurious warning in logs about flink-queryable-state-runtime

2018-02-05 Thread Fabian Hueske
Hmm, this seems indeed strange. Thanks for reporting the issue Ken. @Kostas: Do you know what is happening here? Can it be avoided or should we file a JIRA for this? Thanks, Fabian 2018-01-31 23:13 GMT+01:00 Ken Krugler : > Hi all, > > In unit tests that use the LocalFilinkMiniCluster, with Fli

Re: Reading bounded data from Kafka in Flink job

2018-02-05 Thread Fabian Hueske
Hi Hayden, as far as I know, an end offset is not supported by Flink's Kafka consumer. You could extend Flink's consumer. As you said, there is already code to set the starting offset (per partition), so you might be able to just piggyback on that. Gordon (in CC) who has worked a lot on the Kafka

Re: queryable state API

2018-02-05 Thread Fabian Hueske
Hi Maciek, AFAIK, there is some ongoing work to integrate queryable state with the new FLIP-6 mode. Maybe Kostas (in CC) who has worked on the queryable state API can help to answer your questions. Best, Fabian 2018-02-01 9:19 GMT+01:00 Maciek Próchniak : > Hello, > > Currently (1.4) to be able

Re: Getting Key from keyBy() in ProcessFunction

2018-02-05 Thread Piotr Nowojski
I think now it’s not easily possible, however it might be a valid suggestion to add `OnTimerContext#getCurrentKey()` method. Besides using ValueState as you discussed before, as a some kind of a walk around you could copy and modify KeyedProcessOperator to suits your needs, but this would be m

Re: Kafka and parallelism

2018-02-05 Thread Tzu-Li (Gordon) Tai
Hi Christophe, You can set the parallelism of the FlinkKafkaConsumer independently of the total number of Kafka partitions (across all subscribed streams, including newly created streams that match a subscribed pattern). The consumer deterministically assigns each partition to a single consumer

Re: Global window keyBy

2018-02-05 Thread Piotr Nowojski
Hi, FIRE_AND_PURGE triggers `org.apache.flink.api.common.state.State#clear()` call and it "Removes the value mapped under the current key.”. So other keys should remain unmodified. I hope this solves your problem/question? Piotrek > On 4 Feb 2018, at 15:39, miki haiat wrote: > > Im using t

Re: Reduce parallelism without network transfer.

2018-02-05 Thread Piotr Nowojski
Hi, It should work like this out of the box if you use rescale method: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#physical-partitioning

Re: Flink not writing last few elements to disk

2018-02-05 Thread Piotr Nowojski
Hi, FileProcessMode.PROCESS_CONTINUOUSLY processes the file continuously - the stream will not end. Simple `writeAsCsv(…)` on the other hand only flushes the output file on a stream end (see `OutputFormatSinkFunction`). You can either use `PROCESS_ONCE` mode or use more advanced data sink: -