[ 
https://issues.apache.org/jira/browse/FLINK-14370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16955667#comment-16955667
 ] 

Jiangjie Qin commented on FLINK-14370:
--------------------------------------

Thanks [~arvid.he...@gmail.com] for finding the problem. The cause was that 
when all the records are processed before a snapshot was taken, the records 
that could not be sent out trigger the snapshot to fail. That snapshot failure 
will not cause the job to exit. However, all the records in the KafkaProducer 
are already expired after the snapshot failure. So when the producer closes, 
there will be no more exception thrown. Thus the job finished successfully.

It looks that the expected behavior from connector is to report exception in 
{{close()}} method as long as there was a record that could not be sent. On the 
other hand, exception thrown from {{CheckpointedFunction.snapshotState()}} 
might be ignored. Not sure if this is reasonable, but this expectation is not 
super clear from the connector implementation's perspective.

In terms of immediate fix, [~arvid.he...@gmail.com] proposes to always throw 
exception as long as there has been a record sending failure. I agree that is 
the right fix per current expected behavior on the connectors.

> KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink
>  fails on Travis
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14370
>                 URL: https://issues.apache.org/jira/browse/FLINK-14370
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.10.0
>            Reporter: Till Rohrmann
>            Assignee: Jiangjie Qin
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> The 
> {{KafkaProducerAtLeastOnceITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink}}
>  fails on Travis with
> {code}
> Test 
> testOneToOneAtLeastOnceRegularSink(org.apache.flink.streaming.connectors.kafka.KafkaProducerAtLeastOnceITCase)
>  failed with:
> java.lang.AssertionError: Job should fail!
>       at org.junit.Assert.fail(Assert.java:88)
>       at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnce(KafkaProducerTestBase.java:280)
>       at 
> org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink(KafkaProducerTestBase.java:206)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>       at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
>       at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
>       at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> {code}
> https://api.travis-ci.com/v3/job/244297223/log.txt



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to