[ 
https://issues.apache.org/jira/browse/FLINK-22416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17329266#comment-17329266
 ] 

Stephan Ewen edited comment on FLINK-22416 at 4/22/21, 4:44 PM:
----------------------------------------------------------------

At a first glance, I could not correlate this to any Operator Coordinator 
changes. They might be either legitimate timeouts in the test, or an issue with 
result fetching.

Everything switches properly to RUNNING and then the test times out.
{code}
11:15:25,353 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (2/4) 
(e76de9f0b5f935f2830579334640a391) switched from INITIALIZING to RUNNING.
11:15:25,355 INFO  
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=consumer-64, groupId=null] Resetting offset for partition 
key_partial_value_topic_csv-0 to offset 4.
11:15:25,355 INFO  org.apache.flink.runtime.taskmanager.Task                    
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (1/4)#0 
(fd516fced016ff6800bf67393a8f1caf) switched from INITIALIZING to RUNNING.
11:15:25,357 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (1/4) 
(fd516fced016ff6800bf67393a8f1caf) switched from INITIALIZING to RUNNING.
11:15:54,449 [                main] ERROR 
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase [] - 
Test testSourceSinkWithKeyAndPartialValue[format = 
csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase) 
failed with:
org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
{code}

The only striking thing relating to the result fetcher is the line below, but 
it looks like this regularly happens on startup of the fetcher and is not 
indicative of an error.

{code}
11:15:25,296 INFO  
org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - 
Invalid request. Received version = , offset = 0, while expected version = 
bde5cdc9-94e3-403c-8f10-827a65bd5c74, offset = 0
{code}

The second test failure looks like it is timing timing out while the job is 
waiting for slots. For some reason, the "after test cancel pending jobs" action 
doesn't seem to work properly after the first test timeout (maybe related to 
the execution with timeout?).

*Action:* I would suggest to remove the timeout rule from that test to make 
sure we have a thread dump on the next failure.


was (Author: stephanewen):
At a first glance, I could not correlate this to any Operator Coordinator 
changes. They might be either legitimate timeouts in the test, or an issue with 
result fetching.

Everything switches properly to RUNNING and then the test times out.
{code}
11:15:25,353 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (2/4) 
(e76de9f0b5f935f2830579334640a391) switched from INITIALIZING to RUNNING.
11:15:25,355 INFO  
org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer 
clientId=consumer-64, groupId=null] Resetting offset for partition 
key_partial_value_topic_csv-0 to offset 4.
11:15:25,355 INFO  org.apache.flink.runtime.taskmanager.Task                    
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (1/4)#0 
(fd516fced016ff6800bf67393a8f1caf) switched from INITIALIZING to RUNNING.
11:15:25,357 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       
[] - ChangelogNormalize(key=[k_event_id, k_user_id]) -> Calc(select=[k_user_id, 
name, CAST(timestamp) AS timestamp, k_event_id, user_id, payload]) -> 
NotNullEnforcer(fields=[k_user_id, k_event_id]) (1/4) 
(fd516fced016ff6800bf67393a8f1caf) switched from INITIALIZING to RUNNING.
11:15:54,449 [                main] ERROR 
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase [] - 
Test testSourceSinkWithKeyAndPartialValue[format = 
csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase) 
failed with:
org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
{code}

I was this line here, which is the only thing relating to the result fetcher, 
but it looks like this regularly happens on startup of the fetcher and is not 
indicative of an error.

{code}
11:15:25,296 INFO  
org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - 
Invalid request. Received version = , offset = 0, while expected version = 
bde5cdc9-94e3-403c-8f10-827a65bd5c74, offset = 0
{code}

The second test failure looks like it is timing timing out while the job is 
waiting for slots. For some reason, the "after test cancel pending jobs" action 
doesn't seem to work properly after the first test timeout (maybe related to 
the execution with timeout?).

*Action:* I would suggest to remove the timeout rule from that test to make 
sure we have a thread dump on the next failure.

> UpsertKafkaTableITCase hangs when collecting results
> ----------------------------------------------------
>
>                 Key: FLINK-22416
>                 URL: https://issues.apache.org/jira/browse/FLINK-22416
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Table SQL / Ecosystem
>    Affects Versions: 1.13.0
>            Reporter: Dawid Wysakowicz
>            Priority: Critical
>              Labels: test-stability
>             Fix For: 1.13.0
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17037&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5&l=7002
> {code}
> 2021-04-22T11:16:35.6812919Z Apr 22 11:16:35 [ERROR] 
> testSourceSinkWithKeyAndPartialValue[format = 
> csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase)
>   Time elapsed: 30.01 s  <<< ERROR!
> 2021-04-22T11:16:35.6814151Z Apr 22 11:16:35 
> org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
> 2021-04-22T11:16:35.6814781Z Apr 22 11:16:35  at 
> java.lang.Thread.sleep(Native Method)
> 2021-04-22T11:16:35.6815444Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sleepBeforeRetry(CollectResultFetcher.java:237)
> 2021-04-22T11:16:35.6816250Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:113)
> 2021-04-22T11:16:35.6817033Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> 2021-04-22T11:16:35.6817719Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> 2021-04-22T11:16:35.6818351Z Apr 22 11:16:35  at 
> org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
> 2021-04-22T11:16:35.6818980Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows(KafkaTableTestUtils.java:52)
> 2021-04-22T11:16:35.6819978Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.testSourceSinkWithKeyAndPartialValue(UpsertKafkaTableITCase.java:147)
> 2021-04-22T11:16:35.6820803Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-22T11:16:35.6821365Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-22T11:16:35.6822072Z Apr 22 11:16:35  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-22T11:16:35.6822656Z Apr 22 11:16:35  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-22T11:16:35.6823124Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-04-22T11:16:35.6823672Z Apr 22 11:16:35  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-04-22T11:16:35.6824202Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-04-22T11:16:35.6824709Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-04-22T11:16:35.6825230Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-04-22T11:16:35.6825716Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-04-22T11:16:35.6826204Z Apr 22 11:16:35  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-04-22T11:16:35.6826807Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2021-04-22T11:16:35.6827378Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2021-04-22T11:16:35.6827926Z Apr 22 11:16:35  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-04-22T11:16:35.6828331Z Apr 22 11:16:35  at 
> java.lang.Thread.run(Thread.java:748)
> 2021-04-22T11:16:35.6828600Z Apr 22 11:16:35 
> 2021-04-22T11:16:35.6829073Z Apr 22 11:16:35 [ERROR] testAggregate[format = 
> csv](org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase)
>   Time elapsed: 30.001 s  <<< ERROR!
> 2021-04-22T11:16:35.6829689Z Apr 22 11:16:35 
> org.junit.runners.model.TestTimedOutException: test timed out after 30 seconds
> 2021-04-22T11:16:35.6830073Z Apr 22 11:16:35  at sun.misc.Unsafe.park(Native 
> Method)
> 2021-04-22T11:16:35.6830468Z Apr 22 11:16:35  at 
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> 2021-04-22T11:16:35.6831165Z Apr 22 11:16:35  at 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
> 2021-04-22T11:16:35.6832071Z Apr 22 11:16:35  at 
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> 2021-04-22T11:16:35.6832927Z Apr 22 11:16:35  at 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
> 2021-04-22T11:16:35.6833427Z Apr 22 11:16:35  at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> 2021-04-22T11:16:35.6833930Z Apr 22 11:16:35  at 
> org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:129)
> 2021-04-22T11:16:35.6834497Z Apr 22 11:16:35  at 
> org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:92)
> 2021-04-22T11:16:35.6835331Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.wordCountToUpsertKafka(UpsertKafkaTableITCase.java:340)
> 2021-04-22T11:16:35.6836104Z Apr 22 11:16:35  at 
> org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaTableITCase.testAggregate(UpsertKafkaTableITCase.java:72)
> 2021-04-22T11:16:35.6836728Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-22T11:16:35.6837269Z Apr 22 11:16:35  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-22T11:16:35.6837837Z Apr 22 11:16:35  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-22T11:16:35.6838311Z Apr 22 11:16:35  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-22T11:16:35.6838945Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-04-22T11:16:35.6839507Z Apr 22 11:16:35  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-04-22T11:16:35.6840092Z Apr 22 11:16:35  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-04-22T11:16:35.6840595Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-04-22T11:16:35.6841105Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2021-04-22T11:16:35.6841738Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> 2021-04-22T11:16:35.6842236Z Apr 22 11:16:35  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> 2021-04-22T11:16:35.6842861Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2021-04-22T11:16:35.6843436Z Apr 22 11:16:35  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2021-04-22T11:16:35.6843939Z Apr 22 11:16:35  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2021-04-22T11:16:35.6844335Z Apr 22 11:16:35  at 
> java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to