Hyukjin Kwon created SPARK-50910: ------------------------------------ Summary: Fix flaky test KafkaMicroBatchV1SourceWithConsumerSuite Key: SPARK-50910 URL: https://issues.apache.org/jira/browse/SPARK-50910 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Hyukjin Kwon
https://github.com/apache/spark/actions/runs/12878420935/job/35904359608 {code} [info] KafkaMicroBatchV1SourceWithConsumerSuite: [info] - cannot stop Kafka stream (4 seconds, 575 milliseconds) [info] - assign from latest offsets (failOnDataLoss: true) (6 seconds, 515 milliseconds) [info] - assign from earliest offsets (failOnDataLoss: true) (4 seconds, 442 milliseconds) [info] - assign from specific offsets (failOnDataLoss: true) (3 seconds, 486 milliseconds) [info] - assign from specific timestamps (failOnDataLoss: true) (4 seconds, 913 milliseconds) [info] - assign from global timestamp per topic (failOnDataLoss: true) (5 seconds, 117 milliseconds) [info] - subscribing topic by name from latest offsets (failOnDataLoss: true) (6 seconds, 335 milliseconds) [info] - subscribing topic by name from earliest offsets (failOnDataLoss: true) (5 seconds, 964 milliseconds) [info] - subscribing topic by name from specific offsets (failOnDataLoss: true) (2 seconds, 937 milliseconds) [info] - subscribing topic by name from specific timestamps (failOnDataLoss: true) (5 seconds, 187 milliseconds) [info] - subscribing topic by name from global timestamp per topic (failOnDataLoss: true) (5 seconds, 92 milliseconds) [info] - subscribing topic by pattern from latest offsets (failOnDataLoss: true) (5 seconds, 537 milliseconds) [info] - subscribing topic by pattern from earliest offsets (failOnDataLoss: true) (5 seconds, 900 milliseconds) [info] - subscribing topic by pattern from specific offsets (failOnDataLoss: true) (2 seconds, 828 milliseconds) [info] - subscribing topic by pattern from specific timestamps (failOnDataLoss: true) (5 seconds, 79 milliseconds) [info] - subscribing topic by pattern from global timestamp per topic (failOnDataLoss: true) (4 seconds, 47 milliseconds) [info] - assign from latest offsets (failOnDataLoss: false) (4 seconds, 833 milliseconds) [info] - assign from earliest offsets (failOnDataLoss: false) (5 seconds, 196 milliseconds) [info] - assign from specific offsets (failOnDataLoss: false) (3 seconds, 211 milliseconds) [info] - assign from specific timestamps (failOnDataLoss: false) (4 seconds, 935 milliseconds) [info] - assign from global timestamp per topic (failOnDataLoss: false) (3 seconds, 996 milliseconds) [info] - subscribing topic by name from latest offsets (failOnDataLoss: false) (6 seconds, 68 milliseconds) [info] - subscribing topic by name from earliest offsets (failOnDataLoss: false) (4 seconds, 692 milliseconds) [info] - subscribing topic by name from specific offsets (failOnDataLoss: false) (3 seconds, 233 milliseconds) [info] - subscribing topic by name from specific timestamps (failOnDataLoss: false) (5 seconds, 35 milliseconds) [info] - subscribing topic by name from global timestamp per topic (failOnDataLoss: false) (5 seconds, 26 milliseconds) [info] - subscribing topic by pattern from latest offsets (failOnDataLoss: false) (5 seconds, 841 milliseconds) [info] - subscribing topic by pattern from earliest offsets (failOnDataLoss: false) (4 seconds, 673 milliseconds) [info] - subscribing topic by pattern from specific offsets (failOnDataLoss: false) (2 seconds, 633 milliseconds) [info] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:875) [info] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:393) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185) [info] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:363) [info] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:343) [info] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:343) [info] at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39) [info] at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37) [info] at org.apache.spark.sql.execution.streaming.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:59) [info] at org.apache.spark.sql.execution.streaming.MultiBatchExecutor.execute(TriggerExecutor.scala:64) [info] at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:343) [info] at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:791) [info] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311) [info] at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226) [info] at scala.Predef$.assert(Predef.scala:279) [info] at org.apache.spark.TestUtils$.assertExceptionMsg(TestUtils.scala:200) [info] at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$9(KafkaMicroBatchSourceSuite.scala:373) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) [info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) [info] at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282) [info] at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231) [info] at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230) [info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69) [info] at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) [info] at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) [info] at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227) [info] at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69) [info] at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) [info] at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) [info] at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) [info] at scala.collection.immutable.List.foreach(List.scala:334) [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) [info] at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) [info] at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564) [info] at org.scalatest.Suite.run(Suite.scala:1114) [info] at org.scalatest.Suite.run$(Suite.scala:1096) [info] at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564) [info] at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) [info] at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) [info] at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69) [info] at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69) [info] at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321) [info] at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517) [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414) [info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [info] at java.base/java.lang.Thread.run(Thread.java:840) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org