Hi Chet,
you should not see a 
org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation when querying an 
existing(!) key.
However, if you query a key the non-registered TaskManager is responsible for, 
I suppose this is the exception you will get. Unfortunately, the queryable 
state API still seems to be rough around the edges.

I suspect that the TaskManagers register their queryable state only after 
receiving data(?) and this causes the UnknownKvStateKeyGroupLocation instead 
of a UnknownKeyOrNamespace.


Nico

On Thursday, 4 May 2017 20:05:29 CEST Chet Masterson wrote:
> I found the issue. When parallelism = 3, my test data set was skewed such
> that data was only going to two of the three task managers (kafka partition
> = 3, number of flink nodes = 3, parallelism = 3). As soon as I created a
> test data set with enough keys that spread across all three task managers,
> queryable state started working as expected. That is why only two KVStates
> were registered with the job manager, instead of three. 
> my FINAL :-) question....should I be getting
> org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation in the event
> only N-1 task managers have data in a parallelism of N situation? 
> Thanks for all the help!
>  
>  
> 04.05.2017, 11:24, "Ufuk Celebi" <u...@apache.org>:
> Could you try KvStateRegistry#registerKvState please?
> 
> In the JM logs you should see something about the number of connected
> task managers and in the task manager logs that each one connects to a
> JM.
> 
> – Ufuk
> 
> 
> On Tue, May 2, 2017 at 2:53 PM, Chet Masterson
> <chet.master...@yandex.com> wrote:
> 
>  Can do. Any advice on where the trace prints should go in the task manager
>  source code?
> 
>  BTW - How do I know I have a correctly configured cluster? Is there a set
> of messages in the job / task manager logs that indicate all required
> connectivity is present? I know I use the UI to make sure all the task
> managers are present, and that the job is running on all of them, but is
> there some verbiage in the logs that indicates the job manager can talk to
> all the task managers, and vice versa?
> 
>  Thanks!
> 
> 
>  02.05.2017, 06:03, "Ufuk Celebi" <u...@apache.org>:
> 
>  Hey Chet! I'm wondering why you are only seeing 2 registration
>  messages for 3 task managers. Unfortunately, there is no log message
>  at the task managers when they send out the notification. Is it
>  possible for you to run a remote debugger with the task managers or
>  build a custom Flink version with the appropriate log messages on the
>  task manager side?
>  – Ufuk
> 
> 
>  On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson
>  <chet.master...@yandex.com> wrote:
> 
> 
> 
>   Any insight here? I've got a situation where a key value state on a task
>   manager is being registered with the job manager, but when I try to query
>   it, the job manager responds it doesn't know the location of the key value
> state...
> 
> 
>   26.04.2017, 12:11, "Chet Masterson" <chet.master...@yandex.com>:
> 
>   After setting the logging to DEBUG on the job manager, I learned four
>   things:
> 
>   (On the message formatting below, I have the Flink logs formatted into
> JSON so I can import them into Kibana)
> 
>   1. The appropriate key value state is registered in both parallelism = 1
>  and
>   parallelism = 3 environments. In parallelism = 1, I saw one registration
>   message in the log, in the parallelism = 3, I saw two registration
>  messages:
>   {"level":"DEBUG","time":"2017-04-26
> 
>  15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc
> ":"", "msg":"Key value state registered for job <job id> under name
> <statename>"}
> 
>   2. When I issued the query in both parallelism = 1 and parallelism = 3
>   environments, I saw "Lookup key-value state for job <job id> with
>   registration name <statename>". In parallelism = 1, I saw 1 log message,
> in parallelism = 3, I saw two identical messages.
> 
>   3. I saw no other messages in the job manager log that seemed relevant.
> 
>   4. When issuing the query in parallelism = 3, I continued to get the
> error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a
> message
>   of null.
> 
>   Thanks!
> 
> 
> 
> 
> 
>   26.04.2017, 09:52, "Ufuk Celebi" <u...@apache.org>:
> 
>   Thanks! Your config looks good to me.
> 
>   Could you please set the log level org.apache.flink.runtime.jobmanager to
>   DEBUG?
> 
>   log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG
> 
>   Then we can check whether the JobManager logs the registration of the
>   state instance with the respective name in the case of parallelism >
>   1?
> 
>   Expected output is something like this: "Key value state registered
>   for job ${msg.getJobId} under name ${msg.getRegistrationName}."
> 
>   – Ufuk
> 
>   On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson
>   <chet.master...@yandex.com> wrote:
> 
>    Ok...more information.
> 
>    1. Built a fresh cluster from the ground up. Started testing queryable
>   state
>    at each step.
>    2. When running under any configuration of task managers and job managers
> were parallelism = 1, the queries execute as expected.
>    3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job
> manager) feeding off a kafka topic partitioned three ways, queries will
> always fail, returning error
>    (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an
>    error message of null.
>    4. I do know my state is as expected on the cluster. Liberal use of trace
> prints show my state managed on the jobs is as I expect. However, I cannot
> query them external.
>    5. I am sending the query to jobmanager.rpc.port = 6123, which I
> confirmed is configured by using the job manager UI.
>    6. My flink-conf.yaml:
> 
>    jobmanager.rpc.address: flink01
>    jobmanager.rpc.port: 6123
>    jobmanager.heap.mb: 256
> 
>    taskmanager.heap.mb: 512
>    taskmanager.data.port: 6121
>    taskmanager.numberOfTaskSlots: 1
>    taskmanager.memory.preallocate: false
> 
>    parallelism.default: 1
>    blob.server.port: 6130
> 
>    jobmanager.web.port: 8081
>    query.server.enable: true
> 
>    7. I do know my job is indeed running in parallel, from trace prints
> going to the task manager logs.
> 
>    Do I need a backend configured when running in parallel for the queryable
> state? Do I need a shared temp directory on the task managers?
> 
>    THANKS!
> 
> 
>    25.04.2017, 04:24, "Ufuk Celebi" <u...@apache.org>:
> 
>    It's strange that the rpc port is set to 30000 when you use a
>    standalone cluster and configure 6123 as the port. I'm pretty sure
>    that the config has not been updated.
> 
>    But everything should work as you say when you point it to the correct
>    jobmanager address and port. Could you please post the complete
>    stacktrace you get instead of the message you log?
> 
> 
>    On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson
>    <chet.master...@yandex.com> wrote:
> 
> 
> 
>     More information:
> 
>     0. I did remove the query.server.port and query.server.enabled from all
>     flink-conf.yaml files, and restarted the cluster.
> 
>     1. The Akka error doesn't seem to have anything to do with the problem.
>  If
>    I
>     point my query client at an IP address with no Flink server running at
>   all,
>     I get that error. It seems to be a (side effect?) timeout for "no flink
>     service is listening on the port you told me to check"
> 
>     2. I did notice (using the Flink Web UI) even with the config file
>  changes
>     in 0, and no changes to the default flink-conf.yaml the
>   jobmanager.rpc.port
>     (6123), on my cluster, jobmanager.rpc.port is set to 30000.
> 
>     3. If I do send a query using the jobmanager.rpc.address and the
>     jobmanager.rpc.port as displayed in the Flink Web UI, the connection to
>    from
>     the client to Flink will be initiated and completed. When I try to
>  execute
>     the query (code below), it will fail, and will get trapped. When I look
>  at
>     the error message returned (e.getMessage() below), it is simply 'null':
> 
>     try {
>           byte[] serializedResult = Await.result(future, new
>     FiniteDuration(maxQueryTime, TimeUnit.SECONDS));
>           // de-serialize, commented out for testing
>           return null;
>             }
>             catch (Exception e) {
>                 logger.error("Queryable State Error:
>     "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage());
>                 return null;
>             }
> 
>     Should I be sending the query to the job manager on the the job
> manager's rpc port when flink is clustered?
> 
>     ALSO - I do know the state name I am trying to query exists, is
>  populated,
>     and the job id exists. I also know the task managers are communicating
>   with
>     the job managers (task managers data port: 6121) and processed the data
>    that
>     resulted in the state variable I am trying to query being populated. All
> this was logged.
> 
> 
>     24.04.2017, 10:34, "Ufuk Celebi" <u...@apache.org>:
> 
>     Hey Chet! You can remove
> 
>     query.server.port: 6123
>     query.server.enable: true
> 
>     That shouldn't cause the Exception we see here though. I'm actually
>     not sure what is causing the PduCodecException. Could this be related
>     to different Akka versions being used in Flink and your client code?
>     [1] Is it possible for you to check this?
> 
>     – Ufuk
> 
>     [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0

Attachment: signature.asc
Description: This is a digitally signed message part.

Reply via email to