Hi Chet, you should not see a org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation when querying an existing(!) key. However, if you query a key the non-registered TaskManager is responsible for, I suppose this is the exception you will get. Unfortunately, the queryable state API still seems to be rough around the edges.
I suspect that the TaskManagers register their queryable state only after receiving data(?) and this causes the UnknownKvStateKeyGroupLocation instead of a UnknownKeyOrNamespace. Nico On Thursday, 4 May 2017 20:05:29 CEST Chet Masterson wrote: > I found the issue. When parallelism = 3, my test data set was skewed such > that data was only going to two of the three task managers (kafka partition > = 3, number of flink nodes = 3, parallelism = 3). As soon as I created a > test data set with enough keys that spread across all three task managers, > queryable state started working as expected. That is why only two KVStates > were registered with the job manager, instead of three. > my FINAL :-) question....should I be getting > org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation in the event > only N-1 task managers have data in a parallelism of N situation? > Thanks for all the help! > > > 04.05.2017, 11:24, "Ufuk Celebi" <u...@apache.org>: > Could you try KvStateRegistry#registerKvState please? > > In the JM logs you should see something about the number of connected > task managers and in the task manager logs that each one connects to a > JM. > > – Ufuk > > > On Tue, May 2, 2017 at 2:53 PM, Chet Masterson > <chet.master...@yandex.com> wrote: > > Can do. Any advice on where the trace prints should go in the task manager > source code? > > BTW - How do I know I have a correctly configured cluster? Is there a set > of messages in the job / task manager logs that indicate all required > connectivity is present? I know I use the UI to make sure all the task > managers are present, and that the job is running on all of them, but is > there some verbiage in the logs that indicates the job manager can talk to > all the task managers, and vice versa? > > Thanks! > > > 02.05.2017, 06:03, "Ufuk Celebi" <u...@apache.org>: > > Hey Chet! I'm wondering why you are only seeing 2 registration > messages for 3 task managers. Unfortunately, there is no log message > at the task managers when they send out the notification. Is it > possible for you to run a remote debugger with the task managers or > build a custom Flink version with the appropriate log messages on the > task manager side? > – Ufuk > > > On Fri, Apr 28, 2017 at 2:20 PM, Chet Masterson > <chet.master...@yandex.com> wrote: > > > > Any insight here? I've got a situation where a key value state on a task > manager is being registered with the job manager, but when I try to query > it, the job manager responds it doesn't know the location of the key value > state... > > > 26.04.2017, 12:11, "Chet Masterson" <chet.master...@yandex.com>: > > After setting the logging to DEBUG on the job manager, I learned four > things: > > (On the message formatting below, I have the Flink logs formatted into > JSON so I can import them into Kibana) > > 1. The appropriate key value state is registered in both parallelism = 1 > and > parallelism = 3 environments. In parallelism = 1, I saw one registration > message in the log, in the parallelism = 3, I saw two registration > messages: > {"level":"DEBUG","time":"2017-04-26 > > 15:54:55,254","class":"org.apache.flink.runtime.jobmanager.JobManager","ndc > ":"", "msg":"Key value state registered for job <job id> under name > <statename>"} > > 2. When I issued the query in both parallelism = 1 and parallelism = 3 > environments, I saw "Lookup key-value state for job <job id> with > registration name <statename>". In parallelism = 1, I saw 1 log message, > in parallelism = 3, I saw two identical messages. > > 3. I saw no other messages in the job manager log that seemed relevant. > > 4. When issuing the query in parallelism = 3, I continued to get the > error: org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation with a > message > of null. > > Thanks! > > > > > > 26.04.2017, 09:52, "Ufuk Celebi" <u...@apache.org>: > > Thanks! Your config looks good to me. > > Could you please set the log level org.apache.flink.runtime.jobmanager to > DEBUG? > > log4j.logger.org.apache.flink.runtime.jobmanager=DEBUG > > Then we can check whether the JobManager logs the registration of the > state instance with the respective name in the case of parallelism > > 1? > > Expected output is something like this: "Key value state registered > for job ${msg.getJobId} under name ${msg.getRegistrationName}." > > – Ufuk > > On Wed, Apr 26, 2017 at 3:06 PM, Chet Masterson > <chet.master...@yandex.com> wrote: > > Ok...more information. > > 1. Built a fresh cluster from the ground up. Started testing queryable > state > at each step. > 2. When running under any configuration of task managers and job managers > were parallelism = 1, the queries execute as expected. > 3. As soon as I cross over to parallelism = 3 with 3 task managers (1 job > manager) feeding off a kafka topic partitioned three ways, queries will > always fail, returning error > (org.apache.flink.runtime.query.UnknownKvStateKeyGroupLocation) with an > error message of null. > 4. I do know my state is as expected on the cluster. Liberal use of trace > prints show my state managed on the jobs is as I expect. However, I cannot > query them external. > 5. I am sending the query to jobmanager.rpc.port = 6123, which I > confirmed is configured by using the job manager UI. > 6. My flink-conf.yaml: > > jobmanager.rpc.address: flink01 > jobmanager.rpc.port: 6123 > jobmanager.heap.mb: 256 > > taskmanager.heap.mb: 512 > taskmanager.data.port: 6121 > taskmanager.numberOfTaskSlots: 1 > taskmanager.memory.preallocate: false > > parallelism.default: 1 > blob.server.port: 6130 > > jobmanager.web.port: 8081 > query.server.enable: true > > 7. I do know my job is indeed running in parallel, from trace prints > going to the task manager logs. > > Do I need a backend configured when running in parallel for the queryable > state? Do I need a shared temp directory on the task managers? > > THANKS! > > > 25.04.2017, 04:24, "Ufuk Celebi" <u...@apache.org>: > > It's strange that the rpc port is set to 30000 when you use a > standalone cluster and configure 6123 as the port. I'm pretty sure > that the config has not been updated. > > But everything should work as you say when you point it to the correct > jobmanager address and port. Could you please post the complete > stacktrace you get instead of the message you log? > > > On Mon, Apr 24, 2017 at 5:31 PM, Chet Masterson > <chet.master...@yandex.com> wrote: > > > > More information: > > 0. I did remove the query.server.port and query.server.enabled from all > flink-conf.yaml files, and restarted the cluster. > > 1. The Akka error doesn't seem to have anything to do with the problem. > If > I > point my query client at an IP address with no Flink server running at > all, > I get that error. It seems to be a (side effect?) timeout for "no flink > service is listening on the port you told me to check" > > 2. I did notice (using the Flink Web UI) even with the config file > changes > in 0, and no changes to the default flink-conf.yaml the > jobmanager.rpc.port > (6123), on my cluster, jobmanager.rpc.port is set to 30000. > > 3. If I do send a query using the jobmanager.rpc.address and the > jobmanager.rpc.port as displayed in the Flink Web UI, the connection to > from > the client to Flink will be initiated and completed. When I try to > execute > the query (code below), it will fail, and will get trapped. When I look > at > the error message returned (e.getMessage() below), it is simply 'null': > > try { > byte[] serializedResult = Await.result(future, new > FiniteDuration(maxQueryTime, TimeUnit.SECONDS)); > // de-serialize, commented out for testing > return null; > } > catch (Exception e) { > logger.error("Queryable State Error: > "+key+"-"+flinkJobID+"-"+stateName+" Error: "+e.getMessage()); > return null; > } > > Should I be sending the query to the job manager on the the job > manager's rpc port when flink is clustered? > > ALSO - I do know the state name I am trying to query exists, is > populated, > and the job id exists. I also know the task managers are communicating > with > the job managers (task managers data port: 6121) and processed the data > that > resulted in the state variable I am trying to query being populated. All > this was logged. > > > 24.04.2017, 10:34, "Ufuk Celebi" <u...@apache.org>: > > Hey Chet! You can remove > > query.server.port: 6123 > query.server.enable: true > > That shouldn't cause the Exception we see here though. I'm actually > not sure what is causing the PduCodecException. Could this be related > to different Akka versions being used in Flink and your client code? > [1] Is it possible for you to check this? > > – Ufuk > > [1] https://groups.google.com/forum/#!topic/akka-user/vr1uXsf9gW0
signature.asc
Description: This is a digitally signed message part.