Re: ***UNCHECKED*** Re: Checkpointing not working

2018-09-19 Thread Vijay Bhaskar
Can you please check the following document and verify whether you have
enough network bandwidth to support 30 seconds check point interval worth
of the streaming data?
https://data-artisans.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines

Regards
Bhaskar

On Wed, Sep 19, 2018 at 12:21 PM yuvraj singh <19yuvrajsing...@gmail.com>
wrote:

> log :: Checkpoint 58 of job 0efaa0e6db5c38bec81dfefb159402c0 expired
> before completing.
> I have a use case where i need to do the checkpointing frequently .
>
> i am using Kafka to read stream and making a window of 1 hour ,  which is
> having 50gb data always  and it can be more than that .
>
> i have seen there is no back pressure .
>
> Thanks
> Yubraj Singh
>
>
>
> On Wed, Sep 19, 2018 at 12:07 PM Jörn Franke  wrote:
>
>> What do the logfiles say?
>>
>> How does the source code looks like?
>>
>> Is it really needed to do checkpointing every 30 seconds?
>>
>> On 19. Sep 2018, at 08:25, yuvraj singh <19yuvrajsing...@gmail.com>
>> wrote:
>>
>> Hi ,
>>
>> I am doing checkpointing using s3 and rocksdb ,
>> i am doing checkpointing per 30 seconds and time out is 10 seconds .
>>
>> most of the time its failing by saying Failure Time: 11:53:17Cause:
>> Checkpoint expired before completing .
>> I  increases the timeout  as well still it not working for me .
>>
>> please suggest .
>>
>> Thanks
>> Yubraj Singh
>>
>>


S3 connector Hadoop class mismatch

2018-09-19 Thread Paul Lam
Hi, 

I’m using StreamingFileSink of 1.6.0 to write logs to S3 and encounter a 
classloader problem. It seems that there are conflicts in 
flink-shaded-hadoop2-uber-1.6.0.jar and flink-s3-fs-hadoop-1.6.0.jar, and maybe 
related to class loading orders. 

Did anyone meet this problem? Thanks a lot!

The stack traces are as below:

java.io.IOException: java.lang.RuntimeException: class 
org.apache.hadoop.security.LdapGroupsMapping not 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:62)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:111)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class 
org.apache.hadoop.security.LdapGroupsMapping not 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2246)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.(Groups.java:108)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.(Groups.java:102)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:450)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:309)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:276)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:832)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:802)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:675)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:177)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory.create(AbstractFileSystemFactory.java:57)
... 13 more
Caused by: java.lang.RuntimeException: class 
org.apache.hadoop.security.LdapGroupsMapping not 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.GroupMappingServiceProvider
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2240)
... 23 more


Best,
Paul Lam





***UNCHECKED*** Error while confirming Checkpoint

2018-09-19 Thread PedroMrChaves
Hello,

I have a running Flink job that reads data form one Kafka topic, applies
some transformations and writes data back into another Kafka topic. The job
sometimes restarts due to the following error:

/java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: checkpoint completed, but no
transaction pending
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:258)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:130)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:650)
at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1255)
... 5 more
2018-09-18 22:00:10,716 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not
restart the job Alert_Correlation (3c60b8670c81a629716bb2e42334edea) because
the restart strategy prevented it.
java.lang.RuntimeException: Error while confirming checkpoint
at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1260)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: checkpoint completed, but no
transaction pending
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:258)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyOfCompletedCheckpoint(AbstractUdfStreamOperator.java:130)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:650)
at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1255)
... 5 more/

My state is very small for this particular job, just a few KBs.


 


Flink Version: 1.4.2
State Backend: hadoop 2.8

Regards,
Pedro Chaves



-
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: questions about YARN deployment and HDFS integration

2018-09-19 Thread Chang Liu
Thanks for your answers :)

Best regards/祝好,

Chang Liu 刘畅


> On 17 Sep 2018, at 17:25, Kostas Kloudas  wrote:
> 
> Hi Chiang,
> 
> Some of the answers you can find in line:
> 
>> On Sep 17, 2018, at 3:47 PM, Chang Liu > > wrote:
>> 
>> Dear All,
>> 
>> I am helping my team setup a Flink cluster and we would like to have high 
>> availability and easy to scale.
>> 
>> We would like to setup a minimal cluster environment but can be easily 
>> scaled in the future. This is my simple proposal: 
>> 2 nodes
>> each node is running a Flink instance, a YARN, and a HDFS
>> Flink, YARN and HDFS are all running in cluster mode.
>> 
>> 
>> 
>> Based on it, my questions are:
>> By using HDFS as the file system, we can achieve fault tolerant (by 
>> recovering the checkpoint states when job fails). Question: so Flink itself 
>> is not capable of keeping and maintaining distributed state persistence just 
>> using local Linux file system, right?
>> Then, my follow-up is: if you have a Flink cluster (multiple nodes), and you 
>> use local Linux file system keeping the state checkpoints, what will happen 
>> if Flink job failed and Flink start to restart the job and recover the state 
>> from checkpoints?
> 
> For both the above:
> When a task fails, the whole job (all the tasks) are restarted, and are 
> rescheduled on different machines.
> If you use a local FS and you try to fetch state remotely upon recovery, how 
> would the new nodes be able to locate
> the state on a remote machine?
> 
> This is why Flink uses a distributed file system.
> 
>> If the Flink is deployed and managed on YARN, does that mean: if YARN is 
>> down, Flink is down?
> 
> Well, it depends on which component fails. And I am not sure about all of 
> them, but you could try it and see.
> 
>> If we have Flink managed by YARN, what is the purpose of using ZooKeeper? I 
>> did not really understand this part: 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/jobmanager_high_availability.html
>>  
>> .
>>  My question is: why YARN cannot provide this JobManager HA, and we have to 
>> add ZooKeeper?
> 
> YARN can make sure that a new job master starts, but that master will have to 
> fetch the state of the previous job master in order to know which jobs are 
> running, their
> progress, etc.
> 
>> How do you think I can keep different components of the architecture in 
>> different nodes (servers)? Do I keep every instance of Flink/YARN/HDFS on 
>> every single server, or I put each of them on completely different servers. 
>> Some of my considerations:
>> if we put them on different servers, there will be many latency over the 
>> network between Flink <-> HDFS, and YARN <-> HDFS
>> But if I each all of the 3 components Flink/YARN/HDFS on every server, they 
>> can also fight against each other for resources, right?
> 
> You are right that you have to consider the above before deciding on your 
> setup.
> 
>> Correct me if i am wrong: one thing for sure is that, for every new where 
>> there is a Flink instance running, there should be a YARN running right?
>> 
>> 
>> Many thanks in advance!
>> 
>> Best regards/祝好,
>> 
>> Chang Liu 刘畅
> 
> I hope this helps,
> Kostas



Question about Window Tigger

2018-09-19 Thread Chang Liu
Dear All,

I have a question about the Window Trigger: let’s say i would like like use the 
SlidingEventTimeWindow (60 seconds window size + 1 second window shift) to 
count the number of records per window. And I am using Event Time with periodic 
watermarking with certain maxOurOfOrderness time.

Sometimes, what happens is: during certain time, there is no incoming event, 
and then the watermark for triggering the window fire is not coming.  Then, the 
last several records will be just stayed in the window.  It will fire only when 
the window sees the watermark to trigger.

What I would like to achieve is: if there is no just watermark coming within 
certain time (maybe this time is system clock time?), I can still trigger the 
window to fire no matter whether there is new event coming or not. Then I can 
still get the window count for this window, without waiting the next event, 
which could be coming after a long time.

Do you have any idea how can I do this? Many Thanks :)

Best regards/祝好,

Chang Liu 刘畅




HA failing for 1.6.0 job cluster with docker-compose

2018-09-19 Thread Tzanko Matev
Dear all,

I am currently experimenting with a Flink 1.6.0 job cluster. The goal is to
run a streaming job on K8s. Right now I am using docker-compose to
experiment with the job cluster.

I am trying to set-up HA with Zookeeper, but I seem to fail. I have a
docker-compose file which contains the following services:
- Zookeeper
- Flink job manager
- Flink task manager

The containers are set up as per the documentation for docker-compose, but
I have also set up the necessary HA settings in the conf file. However,
when I kill the job manager container and start it again, the job being
processed does not recover but always starts from scratch. Instead I get
the following error:

> ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  -
Could not retrieve the redirect address.
>
> java.util.concurrent.CompletionException:
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing
token not set: Ignoring message
LocalFencedMessage(8c4887f5c13f6d907d82a55d97ac428f,
LocalRpcInvocation(requestRestAddress(Time))) sent to
akka.tcp://flink@blockprocessor-job-cluster:5/user/dispatcher because
the fencing token is null.

Am I missing something? Is HA implemented for job clusters at all?

Best wishes,
Tzanko Matev


Re: Migration to Flink 1.6.0, issues with StreamExecutionEnvironment.registerCachedFile

2018-09-19 Thread Dawid Wysakowicz
Hi Subramanya,

I could reproduce this behavior running a job in YARN cluster. This
works in standalone cluster just fine. We've changed a little bit how
the cache entries are distributed in 1.6.0. I am investigating this
problem right now. Would you like to create a JIRA bug for it?

Best,
Dawid

On 19/09/18 06:41, Subramanya Suresh wrote:
> Yes it works locally for me as well. We are running on YARN where it
> fails on 1.6.0 though (works fine with 1.4.2). 
>
> Regards, 
>
> On Tue, Sep 18, 2018 at 1:47 AM, Fabian Hueske  > wrote:
>
> Hi,
>
> The functionality of the SQL ScalarFunction is backed by Flink's
> distributed cache and just passes on the function call.
> I tried it locally on my machine and it works for me.
>
> What is your setup? Are you running on Yarn?
>
> Maybe Chesnay or Dawid (added to CC) can help to track the problem
> down.
>
> Best, Fabian
>
> 2018-09-18 6:10 GMT+02:00 Subramanya Suresh
> mailto:ssur...@salesforce.com>>:
>
> Hi, 
> We are running into some trouble with
> StreamExecutionEnvironment.registerCachedFile (works perfectly
> fine in 1.4.2). 
>
>   * We register some CSV files in HDFS with
> executionEnvironment.registerCachedFile("hdfs:///myPath/myCsv", 
> myCSV.csv)
>   * In a UDF (ScalarFunction), in the open function, we do a
> FunctionContext.getCachedFile("myCSV) to load the CSV in a
> singleton. 
>
> We are running into a 
>
> java.lang.IllegalArgumentException: File with name 'myCSV.csv' is not 
> available. Did you forget to register the file?
>
> Sincerely, 
>
>
> -- 
>
> 
>
>
>
>
>
> -- 
>
> 



signature.asc
Description: OpenPGP digital signature


Re: ***UNCHECKED*** Re: Standalone cluster instability

2018-09-19 Thread Piotr Nowojski
Hi,

JobManager is not responsible and have no means to restart TaskManager in case 
of TaskManager process being killed (it would need to have ssh into the machine 
and restart it…). I don’t know, but from your description of the problem I 
presume that Flink’s bash startup scripts do not contain a watchdog, that 
restarts the process in case of failure. In that case just google “bash 
watchdog" how to do it: for example https://stackoverflow.com/a/697064/8149051 


Probably better way would be to use yarn or other resource manager. Flink’s 
JobManager would then redeploy/reschedule new TaskManager after a failure.

Piotrek

> On 19 Sep 2018, at 09:35, Shailesh Jain  wrote:
> 
> Hi Piotrek,
> 
> We've hit the same issue again, kernel is repeatedly killing the task manager 
> process (we've hit it 3 times in the past one week).
> We suspect we're hitting this bug in the kernel: 
> https://bugs.launchpad.net/ubuntu/+source/linux/+bug/1655842 
> 
> 
> One question I have is that why is the job manager not able to restart the 
> task manager process when it discovers that it has been lost? It reports that 
> there are no active task managers and available slots are 0. We're running on 
> flink version 1.4.2.
> 
> I've attached the syslog and jobmanager log, the crash happened at Sep 18 
> 23:31:14.
> 
> Thanks,
> Shailesh
> 
> On Thu, Aug 16, 2018 at 5:40 PM Piotr Nowojski  > wrote:
> Hi,
> 
> I’m not aware of such rules of thumb. Memory consumption is highly 
> application and workload specific. It depends on how much things you allocate 
> in your user code and how much memory do you keep on state (in case of heap 
> state backend). Basically just as with most java applications, you have to 
> use trial and error method.
> 
> One good practice is to before any deployment, test your Flink application on 
> a testing cluster, that is identical to production cluster, by (re)processing 
> some of the production workload/backlog/data (in parallel to production 
> cluster).
> 
> Piotrek 
> 
>> On 16 Aug 2018, at 13:23, Shailesh Jain > > wrote:
>> 
>> Thank you for your help Piotrek.
>> 
>> I think it was a combination of a. other processes taking up available 
>> memory and b. flink processes consuming all the memory allocated to them, 
>> that resulted in kernel running out of memory.
>> 
>> Are there any heuristics or best practices which you (or anyone in the 
>> community) recommend to benchmark memory requirements of a particular flink 
>> job?
>> 
>> Thanks,
>> Shailesh
>> 
>> 
>> On Tue, Aug 14, 2018 at 6:08 PM, Piotr Nowojski > > wrote:
>> Hi,
>> 
>> Good that we are more or less on track with this problem :) But the problem 
>> here is not that heap size is too small, bot that your kernel is running out 
>> of memory and starts killing processes. Either:
>> 
>> 1. some other process is using the available memory 
>> 2. Increase memory allocation on your machine/virtual 
>> machine/container/cgroup
>> 3. Decrease the heap size of Flink’s JVM or non heap size (decrease network 
>> memory buffer pool). Of course for any given job/state 
>> size/configuration/cluster size there is some minimal reasonable memory size 
>> that you have to assign to Flink, otherwise you will have poor performance 
>> and/or constant garbage collections and/or you will start getting OOM errors 
>> from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on 
>> a different level).
>> 
>> Piotrek
>> 
>> 
>>> On 14 Aug 2018, at 07:36, Shailesh Jain >> > wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> Thanks for your reply. I checked through the syslogs for that time, and I 
>>> see this:
>>> 
>>> Aug  8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill 
>>> process 2305 (java) score 468 or sacrifice child
>>> Aug  8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 
>>> (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kB
>>> 
>>> As you pointed out, kernel killed the task manager process.
>>> 
>>> If I had already set the max heap size for the JVM (to 3GB in this case), 
>>> and the memory usage stats showed 2329MB being used 90 seconds earlier, it 
>>> seems a bit unlikely for operators to consume 700 MB heap space in that 
>>> short time, because our events ingestion rate is not that high (close to 10 
>>> events per minute).
>>> 
>>> 2018-08-08 13:19:23,341 INFO  
>>> org.apache.flink.runtime.taskmanager.TaskManager  - Memory 
>>> usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB 
>>> (used/committed/max)]
>>> 
>>> Is it possible to log individual operator's memory consumption? This would 
>>> help in narrowing down on the root cause. There were around 50 operators 
>>> running (~8 kafka source/sink, ~8 Window operators, 

How to use checkpoint in flink1.5.3

2018-09-19 Thread spoon_lz
 I used flink1.3.2 before, and recently upgraded to flink1.5.3. I found
that the new version has adjusted checkpoint, and there was a problem after
modifying the code
My code like :

   * RocksDBStateBackend rocksDBStateBackend = new
RocksDBStateBackend("hdfs://hdfs_1/demo/demo-fs-checkpoints/cpk/1_3",
true);*

My jar is normal when it is submitted to the cluster run, then stops,
and then resumes like :

*/home/flink-1.5.3/bin/flink run -d -m yarn-cluster --yarnname test01 -ytm
4096 -yjm 1024 -s
hdfs://hdfs_1/demo/demo-fs-checkpoints/cpk/1_3/ff31023d23e568290d5596a34aa3cad6/chk-3
 
/home/flink/jar/test01.jar*

this job can be submit ,but runs for error:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:197)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:715)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:230)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:160)
... 5 more
2018-09-19 19:44:26,518 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job test01 
(f1960bf42cdb35b7f3ee958e06d9e3cf) switched from state RUNNING to FAILING.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:197)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:231)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:715)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.internalTimeServiceManager(StreamTaskStateInitializerImpl.java:230)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:160)

  The source code has not found the cause of the error, and the official
document does not have too many instructions for the use of checkpoint. 
  Can anyone help me






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to use checkpoint in flink1.5.3

2018-09-19 Thread spoon_lz
The stop order is:

/home/flink-1.5.3/bin/flink cancel ff31023d23e568290d5596a34aa3cad6 -yid
application_1526888270443_0336



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink 1.5.2 process keeps reference to deleted blob files.

2018-09-19 Thread Piotr Szczepanek
Hello,

we are using YarnClusterClient for job submission. After successful/failed
job execution it looks like blob file for that job is deleted, but there is
still some handle from Flink process to that file. As a result the file is
not removed from machine and we faced no space felt on device error.
Restarting Flink cluster moved situation back to normal, but we are
submitting quite huge number of jobs and often cluster restarts is not a
solution.

Results of lsof are:
During job execution:
lsof /flinkDir | grep job_dbafb671b0d60ed8a8ec2651fe59303b
java11883  yarn  memREG  253,2 112384928 109973177
/flinkDir/yarn/../application_1536668870638_/blobStore-a1bcdbd4-5388-4c56-8052-6051f5af38dd/job_dbafb671b0d60ed8a8ec2651fe59303b/blob_p-8771d9ccac35e28d8571ac8957feaaecdebaeadd-7748aec7fe7369ca26181d0f94b1a578
java11883  yarn 1837r   REG  253,2 112384928 109973177
/flinkDir/yarn/../application_1536668870638_/blobStore-a1bcdbd4-5388-4c56-8052-6051f5af38dd/job_dbafb671b0d60ed8a8ec2651fe59303b/blob_p-8771d9ccac35e28d8571ac8957feaaecdebaeadd-7748aec7fe7369ca26181d0f94b1a578

After job execution:
lsof /flinkDir | grep job_dbafb671b0d60ed8a8ec2651fe59303b
java11883  yarn  DELREG  253,2   109973177
/flinkDir/yarn/../application_1536668870638_/blobStore-a1bcdbd4-5388-4c56-8052-6051f5af38dd/job_dbafb671b0d60ed8a8ec2651fe59303b/blob_p-8771d9ccac35e28d8571ac8957feaaecdebaeadd-7748aec7fe7369ca26181d0f94b1a578
java11883  yarn 1837r   REG  253,2 112384928 109973177
/flinkDir/yarn/../application_1536668870638_/blobStore-a1bcdbd4-5388-4c56-8052-6051f5af38dd/job_dbafb671b0d60ed8a8ec2651fe59303b/blob_p-8771d9ccac35e28d8571ac8957feaaecdebaeadd-7748aec7fe7369ca26181d0f94b1a578
*(deleted)*

So the blob file is marked as deleted but it's still present as there is
still some handle from Flink container process. 
Can you please advice, how can we avoid that situation, or if is it cause by
some bug in Flink?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Accessing Global State when processing KeyedStreams

2018-09-19 Thread Scott Sue
Hi,

In our application, we receive Orders and Prices via two KafkaSources.  What
I want to do is to perform calculations for a given Order against a stream
of Prices for the same securityId, i.e. same identifier between the Order
and stream of Prices.  Naturally this is a perfect fit for a KeyedStream
against the securityId for both Orders and Prices.

I have currently connected these two streams together and then processing by
ordersKeyStream.connect(pricesKeyStream).process(MyCoProcessFunction) and
this fits for 95% of the time.  However part of my requirement is for
certain Orders, I need to be able to connect prices from a different
securityId (aka different key) to perform more calculations.  From what I
can see, by the time I get to my CoProcessFunction, I am only able to see
the Orders and Prices for a single securityId, I won't be able to cross over
to another KeyedStream of Prices for me to perform this extra calculation. 
In terms of this extra calculation, it is not a hard requirement to be able
to cross over to another KeyedStream of Prices, this is more ideal.  

Things that I have thought about to get around this as it would be
acceptable to have a slightly older price for the securityId I require so:
1) I could connect to an external source of information to get this Price,
or
2) Periodically broadcast out a price that the ProcessFunction could consume
to perform this extra calculation.

This seems like something Flink should be easily able to handle, I just feel
as though I'm missing something here to allow this.

Just as something as a more non functional requirement.  The number of
prices I receive per second can reach 10's of 000's per second, so that is
also something that I am very wary of as well

Is there anything that could be suggested to help me out on this?


Thanks in advance!
Scott



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about Window Tigger

2018-09-19 Thread Rong Rong
Hi Chang,

There were some previous discussion regarding how to debug watermark and
window triggers[1].
Basically if there's no data for some partitions there's no way to advance
watermark. As it would not be able to determine whether this is due to
network failure or actually there's no data arriving at the source.
I think your use case is better of using SlidingProcessingTimeWindow.

Thanks,
Rong

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/watermark-does-not-progress-td22315.html

On Wed, Sep 19, 2018 at 1:48 AM Chang Liu  wrote:

> Dear All,
>
> I have a question about the Window Trigger: let’s say i would like like
> use the SlidingEventTimeWindow (60 seconds window size + 1 second window
> shift) to count the number of records per window. And I am using Event Time
> with periodic watermarking with certain maxOurOfOrderness time.
>
> Sometimes, what happens is: during certain time, there is no incoming
> event, and then the watermark for triggering the window fire is not
> coming.  Then, the last several records will be just stayed in the window.
> It will fire only when the window sees the watermark to trigger.
>
> What I would like to achieve is: if there is no just watermark coming
> within certain time (maybe this time is system clock time?), I can still
> trigger the window to fire no matter whether there is new event coming or
> not. Then I can still get the window count for this window, without waiting
> the next event, which could be coming after a long time.
>
> Do you have any idea how can I do this? Many Thanks :)
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
>


Re: Accessing Global State when processing KeyedStreams

2018-09-19 Thread Rong Rong
Hi Scott,

Your use case seems to be a perfect fit for the Broadcast state pattern
[1].

--
Rong

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html


On Wed, Sep 19, 2018 at 7:11 AM Scott Sue  wrote:

> Hi,
>
> In our application, we receive Orders and Prices via two KafkaSources.
> What
> I want to do is to perform calculations for a given Order against a stream
> of Prices for the same securityId, i.e. same identifier between the Order
> and stream of Prices.  Naturally this is a perfect fit for a KeyedStream
> against the securityId for both Orders and Prices.
>
> I have currently connected these two streams together and then processing
> by
> ordersKeyStream.connect(pricesKeyStream).process(MyCoProcessFunction) and
> this fits for 95% of the time.  However part of my requirement is for
> certain Orders, I need to be able to connect prices from a different
> securityId (aka different key) to perform more calculations.  From what I
> can see, by the time I get to my CoProcessFunction, I am only able to see
> the Orders and Prices for a single securityId, I won't be able to cross
> over
> to another KeyedStream of Prices for me to perform this extra calculation.
> In terms of this extra calculation, it is not a hard requirement to be able
> to cross over to another KeyedStream of Prices, this is more ideal.
>
> Things that I have thought about to get around this as it would be
> acceptable to have a slightly older price for the securityId I require so:
> 1) I could connect to an external source of information to get this Price,
> or
> 2) Periodically broadcast out a price that the ProcessFunction could
> consume
> to perform this extra calculation.
>
> This seems like something Flink should be easily able to handle, I just
> feel
> as though I'm missing something here to allow this.
>
> Just as something as a more non functional requirement.  The number of
> prices I receive per second can reach 10's of 000's per second, so that is
> also something that I am very wary of as well
>
> Is there anything that could be suggested to help me out on this?
>
>
> Thanks in advance!
> Scott
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Trying to figure out why a slot takes a long time to checkpoint

2018-09-19 Thread Julio Biason
Hey guys,

So, switching to Ceph/S3 didn't shine any new lights on the issue. Although
the times are a bit higher, just a few slots are taking a magnitude longer
to save. So I changed the logs for DEBUG.

The problem is: I'm not seeing anything that seems relevant; only pings
from ZooKeeper, heartbeats and the S3 disconnecting from being idle.

Is there anything else that I should change to DEBUG? Akka? Kafka? Haoop?
ZooKeeper? (Those are, by the default config, bumped to INFO)

All of those?

On Tue, Sep 18, 2018 at 12:34 PM, Julio Biason 
wrote:

> Hey TIll (and others),
>
> We don't have debug logs yet, but we decided to remove a related
> component: HDFS.
>
> We are moving the storage to our Ceph install (using S3), which is running
> for longer than our HDFS install and we know, for sure, it runs without any
> problems (specially 'cause we have more people that understand Ceph than
> people that know HDFS at this point).
>
> If, for some reason, the problem persists, we know it's not the underlying
> storage and may be something with our pipeline itself. I'll enable debug
> logs, then.
>
> On Tue, Sep 18, 2018 at 4:20 AM, Till Rohrmann 
> wrote:
>
>> This behavior seems very odd Julio. Could you indeed share the debug logs
>> of all Flink processes in order to see why things are taking so long?
>>
>> The checkpoint size of task #8 is twice as big as the second biggest
>> checkpoint. But this should not cause an increase in checkpoint time of a
>> factor of 8.
>>
>> Cheers,
>> Till
>>
>> On Mon, Sep 17, 2018 at 5:25 AM Renjie Liu 
>> wrote:
>>
>>> Hi, Julio:
>>> This happens frequently? What state backend do you use? The async
>>> checkpoint duration and sync checkpoint duration seems normal compared to
>>> others, it seems that most of the time are spent acking the checkpoint.
>>>
>>> On Sun, Sep 16, 2018 at 9:24 AM vino yang  wrote:
>>>
 Hi Julio,

 Yes, it seems that fifty-five minutes is really long.
 However, it is linear with the time and size of the previous task
 adjacent to it in the diagram.
 I think your real application is concerned about why Flink accesses
 HDFS so slowly.
 You can call the DEBUG log to see if you can find any clues, or post
 the log to the mailing list to help others analyze the problem for you.

 Thanks, vino.

 Julio Biason  于2018年9月15日周六 上午7:03写道:

> (Just an addendum: Although it's not a huge problem -- we can always
> increase the checkpoint timeout time -- this anomalous situation makes me
> think there is something wrong in our pipeline or in our cluster, and that
> is what is making the checkpoint creation go crazy.)
>
> On Fri, Sep 14, 2018 at 8:00 PM, Julio Biason 
> wrote:
>
>> Hey guys,
>>
>> On our pipeline, we have a single slot that it's taking longer to
>> create the checkpoint compared to other slots and we are wondering what
>> could be causing it.
>>
>> The operator in question is the window metric -- the only element in
>> the pipeline that actually uses the state. While the other slots take 7
>> mins to create the checkpoint, this one -- and only this one -- takes
>> 55mins.
>>
>> Is there something I should look at to understand what's going on?
>>
>> (We are storing all checkpoints in HDFS, in case that helps.)
>>
>> --
>> *Julio Biason*, Sofware Engineer
>> *AZION*  |  Deliver. Accelerate. Protect.
>> Office: +55 51 3083 8101   |  Mobile: +55 51
>> *99907 0554*
>>
>
>
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>
 --
>>> Liu, Renjie
>>> Software Engineer, MVAD
>>>
>>
>
>
> --
> *Julio Biason*, Sofware Engineer
> *AZION*  |  Deliver. Accelerate. Protect.
> Office: +55 51 3083 8101   |  Mobile: +55 51
> *99907 0554*
>



-- 
*Julio Biason*, Sofware Engineer
*AZION*  |  Deliver. Accelerate. Protect.
Office: +55 51 3083 8101   |  Mobile: +55 51
*99907 0554*


Errors in QueryableState sample code?

2018-09-19 Thread Ken Krugler
Hi all,

I was looking at the Example 

 section of Querying State 
:

QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);

// the state descriptor of the state to be fetched.
ValueStateDescriptor> descriptor =
new ValueStateDescriptor<>(
  "average",
  TypeInformation.of(new TypeHint>() {}));

CompletableFuture>> resultFuture =
client.getKvState(jobId, "query-name", key, 
BasicTypeInfo.LONG_TYPE_INFO, descriptor);

// now handle the returned value
resultFuture.thenAccept(response -> {
try {
Tuple2 res = response.get();
} catch (Exception e) {
e.printStackTrace();
}
});
The two issues I ran into while trying out this code snippet were:

1. The query request would fail with a NPE, because the client had a null 
ExecutionConfig.

If I added:

client.setExecutionConfig(new ExecutionConfig());

Then everything seemed OK, but this doesn’t feel like the right way to solve 
that problem :)

2. The call to response.get() returns a ValueState>, not the 
Tuple2 itself.

So it seems like there’s a missing “.value()”.

Regards,

— Ken

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Utilising EMR's master node

2018-09-19 Thread Averell
Hi Gary,
Thanks for your help.

Regarding TM configurations, in term of performance, when my 2 servers have
16 vcores each, should I have 2 TMs with 16GB mem, 16 task slots each, or 8
TMs with 4GB mem and 4 task slots each?

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Accessing Global State when processing KeyedStreams

2018-09-19 Thread Scott Sue
Hi Rong,

Thanks for your suggestion, I'll give that a go.  I just found a great
article on this as well that explains the functionality

https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink


Regards,
Scott



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Which mode to choose flink on Yarn.

2018-09-19 Thread weilongxing
There are two methods to deploy flink applications on yarn. The first one is 
use yarn-session and all flink applications are deployed in the session. The 
second method is each flink application deploy on yarn as a yarn application.

My question is what's the difference between these two methods? Which one to 
choose in product environment?

I can't find any material about this.

I think the first method will save resources since only need one 
jobmanager(yarn application master).  While it is also the disadvantage since 
the only jobmanager can be the bottleneck while flink applications getting more 
and more.

Re: WriteTimeoutException in Cassandra sink kill the job

2018-09-19 Thread Jayant Ameta
https://issues.apache.org/jira/browse/FLINK-10310?focusedCommentId=16618447&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16618447


On Tue, Sep 11, 2018 at 10:15 AM Jayant Ameta  wrote:

> Hi Till,
> I've opened a JIRA issue:
> https://issues.apache.org/jira/browse/FLINK-10310.
> Can we discuss it?
>
> Jayant Ameta
>
>
> On Thu, Aug 30, 2018 at 4:35 PM Till Rohrmann 
> wrote:
>
>> Hi Jayant,
>>
>> afaik it is currently not possible to control how failures are handled in
>> the Cassandra Sink. What would be the desired behaviour? The best thing is
>> to open a JIRA issue to discuss potential improvements.
>>
>> Cheers,
>> Till
>>
>> On Thu, Aug 30, 2018 at 12:15 PM Jayant Ameta 
>> wrote:
>>
>>> Hi,
>>> During high volumes, cassandra sink fails with the following error:
>>> com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra
>>> timeout during write query at consistency SERIAL (2 replica were required
>>> but only 1 acknowledged the write)
>>>
>>> Is there a way to configure the sink to ignore/handle this error?
>>>
>>> Jayant
>>>
>>


Re: Which mode to choose flink on Yarn.

2018-09-19 Thread vino yang
Hi weilong,

As you said, there are advantages and disadvantages to each of the two
approaches.
However, I hope you know that the "single job" mode has a huge advantage
over the "YARN flink session" mode in that it provides job-level isolation
(whether JM or TM).
This will allow the Job to be more fine-grained, and the refactoring of
Flink's FLIP-6-based deployment model tends to be "single job" mode.
But it will start more JM (appmaster) and take up more resources.
But in the end, how to choose also requires you to evaluate and weigh.

Thanks, vino.

weilongxing  于2018年9月20日周四 上午10:27写道:

> There are two methods to deploy flink applications on yarn. The first one
> is use yarn-session and all flink applications are deployed in the session.
> The second method is each flink application deploy on yarn as a yarn
> application.
>
> My question is what's the difference between these two methods? Which one
> to choose in product environment?
>
> I can't find any material about this.
>
> I think the first method will save resources since only need one
> jobmanager(yarn application master).  While it is also the disadvantage
> since the only jobmanager can be the bottleneck while flink applications
> getting more and more.
>


Re: Which mode to choose flink on Yarn.

2018-09-19 Thread 陈梓立
Hi weilong,

As vino said, the main advantage of per job mode is that it provides
job-level isolation, and that of session mode is that it set up a
persistent session and accept job, which means the overhead of resource
request/setup would loose. In addition, per job mode calculate resource
that the job required, while session mode require you config a static
config of that persistent session.

As an advice by experience, prefer per job mode for large jobs, and session
mode for a serious of small jobs.

Best,
tison.


vino yang  于2018年9月20日周四 下午2:17写道:

> Hi weilong,
>
> As you said, there are advantages and disadvantages to each of the two
> approaches.
> However, I hope you know that the "single job" mode has a huge advantage
> over the "YARN flink session" mode in that it provides job-level isolation
> (whether JM or TM).
> This will allow the Job to be more fine-grained, and the refactoring of
> Flink's FLIP-6-based deployment model tends to be "single job" mode.
> But it will start more JM (appmaster) and take up more resources.
> But in the end, how to choose also requires you to evaluate and weigh.
>
> Thanks, vino.
>
> weilongxing  于2018年9月20日周四 上午10:27写道:
>
>> There are two methods to deploy flink applications on yarn. The first one
>> is use yarn-session and all flink applications are deployed in the session.
>> The second method is each flink application deploy on yarn as a yarn
>> application.
>>
>> My question is what's the difference between these two methods? Which one
>> to choose in product environment?
>>
>> I can't find any material about this.
>>
>> I think the first method will save resources since only need one
>> jobmanager(yarn application master).  While it is also the disadvantage
>> since the only jobmanager can be the bottleneck while flink applications
>> getting more and more.
>>
>


Re: Which mode to choose flink on Yarn.

2018-09-19 Thread weilongxing
Thanks.

I am wondering whether the job manager will be the bottleneck and how many  
jobs could a job manager support in session mode. I tried to find the 
bottleneck in test environment but failed.


> 在 2018年9月20日,下午2:16,vino yang  写道:
> 
> Hi weilong,
> 
> As you said, there are advantages and disadvantages to each of the two 
> approaches.
> However, I hope you know that the "single job" mode has a huge advantage over 
> the "YARN flink session" mode in that it provides job-level isolation 
> (whether JM or TM). 
> This will allow the Job to be more fine-grained, and the refactoring of 
> Flink's FLIP-6-based deployment model tends to be "single job" mode. 
> But it will start more JM (appmaster) and take up more resources.
> But in the end, how to choose also requires you to evaluate and weigh.
> 
> Thanks, vino.
> 
> weilongxing mailto:weilongx...@aicaigroup.com>> 
> 于2018年9月20日周四 上午10:27写道:
> There are two methods to deploy flink applications on yarn. The first one is 
> use yarn-session and all flink applications are deployed in the session. The 
> second method is each flink application deploy on yarn as a yarn application.
> 
> My question is what's the difference between these two methods? Which one to 
> choose in product environment?
> 
> I can't find any material about this.
> 
> I think the first method will save resources since only need one 
> jobmanager(yarn application master).  While it is also the disadvantage since 
> the only jobmanager can be the bottleneck while flink applications getting 
> more and more.
> 



Re: Which mode to choose flink on Yarn.

2018-09-19 Thread weilongxing
>  In addition, per job mode calculate resource that the job required, while 
> session mode require you config a static config of that persistent session.

I tested using flink 1.5.2 and found that session mode can also support dynamic 
resource. You don’t have to config static config.


I am wondering combine 2 mode together. I can start a session for each project 
and all jobs in a project submitted to a specific session.


> 在 2018年9月20日,下午2:23,陈梓立  写道:
> 
> Hi weilong,
> 
> As vino said, the main advantage of per job mode is that it provides 
> job-level isolation, and that of session mode is that it set up a persistent 
> session and accept job, which means the overhead of resource request/setup 
> would loose. In addition, per job mode calculate resource that the job 
> required, while session mode require you config a static config of that 
> persistent session.
> 
> As an advice by experience, prefer per job mode for large jobs, and session 
> mode for a serious of small jobs.
> 
> Best,
> tison.
> 
> 
> vino yang mailto:yanghua1...@gmail.com>> 
> 于2018年9月20日周四 下午2:17写道:
> Hi weilong,
> 
> As you said, there are advantages and disadvantages to each of the two 
> approaches.
> However, I hope you know that the "single job" mode has a huge advantage over 
> the "YARN flink session" mode in that it provides job-level isolation 
> (whether JM or TM). 
> This will allow the Job to be more fine-grained, and the refactoring of 
> Flink's FLIP-6-based deployment model tends to be "single job" mode. 
> But it will start more JM (appmaster) and take up more resources.
> But in the end, how to choose also requires you to evaluate and weigh.
> 
> Thanks, vino.
> 
> weilongxing mailto:weilongx...@aicaigroup.com>> 
> 于2018年9月20日周四 上午10:27写道:
> There are two methods to deploy flink applications on yarn. The first one is 
> use yarn-session and all flink applications are deployed in the session. The 
> second method is each flink application deploy on yarn as a yarn application.
> 
> My question is what's the difference between these two methods? Which one to 
> choose in product environment?
> 
> I can't find any material about this.
> 
> I think the first method will save resources since only need one 
> jobmanager(yarn application master).  While it is also the disadvantage since 
> the only jobmanager can be the bottleneck while flink applications getting 
> more and more.
> 



Re: Which mode to choose flink on Yarn.

2018-09-19 Thread 陈梓立
That mainly depends on how much parallelism of your job.

The main bottleneck of job manager usually because it is busy to handle rpc
requests and gc. At most time you can set larger jm memory to address it by
pass `-jm 4096` to `yarn-session.sh start`.

Best,
tison.


weilongxing  于2018年9月20日周四 下午2:29写道:

> Thanks.
>
> I am wondering whether the job manager will be the bottleneck and how many
>  jobs could a job manager support in session mode. I tried to find the
> bottleneck in test environment but failed.
>
>
> 在 2018年9月20日,下午2:16,vino yang  写道:
>
> Hi weilong,
>
> As you said, there are advantages and disadvantages to each of the two
> approaches.
> However, I hope you know that the "single job" mode has a huge advantage
> over the "YARN flink session" mode in that it provides job-level isolation
> (whether JM or TM).
> This will allow the Job to be more fine-grained, and the refactoring of
> Flink's FLIP-6-based deployment model tends to be "single job" mode.
> But it will start more JM (appmaster) and take up more resources.
> But in the end, how to choose also requires you to evaluate and weigh.
>
> Thanks, vino.
>
> weilongxing  于2018年9月20日周四 上午10:27写道:
>
>> There are two methods to deploy flink applications on yarn. The first one
>> is use yarn-session and all flink applications are deployed in the session.
>> The second method is each flink application deploy on yarn as a yarn
>> application.
>>
>> My question is what's the difference between these two methods? Which one
>> to choose in product environment?
>>
>> I can't find any material about this.
>>
>> I think the first method will save resources since only need one
>> jobmanager(yarn application master).  While it is also the disadvantage
>> since the only jobmanager can be the bottleneck while flink applications
>> getting more and more.
>>
>
>


Re: Errors in QueryableState sample code?

2018-09-19 Thread vino yang
Hi Ken,

About the first question, the way of fixing is right.
About the second question, you are right, the "response.get()" missed
'ValueState<>'.
Can you create a JIRA issue or fix it if you want.

Thanks, vino.



Ken Krugler  于2018年9月20日周四 上午6:27写道:

> Hi all,
>
> I was looking at the Example
> 
>  section
> of Querying State
> 
> :
>
> QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
> // the state descriptor of the state to be 
> fetched.ValueStateDescriptor> descriptor =
> new ValueStateDescriptor<>(
>   "average",
>   TypeInformation.of(new TypeHint>() {}));
> CompletableFuture>> resultFuture =
> client.getKvState(jobId, "query-name", key, 
> BasicTypeInfo.LONG_TYPE_INFO, descriptor);
> // now handle the returned valueresultFuture.thenAccept(response -> {
> try {
> Tuple2 res = response.get();
> } catch (Exception e) {
> e.printStackTrace();
> }});
>
> The two issues I ran into while trying out this code snippet were:
>
> 1. The query request would fail with a NPE, because the client had a null
> ExecutionConfig.
>
> If I added:
>
> client.setExecutionConfig(new ExecutionConfig());
>
> Then everything seemed OK, but this doesn’t feel like the right way to
> solve that problem :)
>
> 2. The call to response.get() returns a ValueState>,
> not the Tuple2 itself.
>
> So it seems like there’s a missing “.value()”.
>
> Regards,
>
> — Ken
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>