[ https://issues.apache.org/jira/browse/SPARK-51586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-51586. ---------------------------------- Fix Version/s: 4.0.0 Resolution: Fixed Issue resolved by pull request 50348 [https://github.com/apache/spark/pull/50348] > Kafka continuous stream may go into infinite loop of reconfiguring > ------------------------------------------------------------------ > > Key: SPARK-51586 > URL: https://issues.apache.org/jira/browse/SPARK-51586 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 4.0.0, 4.1.0 > Reporter: Vlad Rozov > Assignee: Vlad Rozov > Priority: Major > Labels: pull-request-available > Fix For: 4.0.0 > > > After [-SPARK-45080-|https://issues.apache.org/jira/browse/SPARK-45080] and > [-PR 42823-|https://github.com/apache/spark/pull/42823] Kafka continuous > stream may go into infinite loop of reconfiguring. As {{KafkaScan}} columnar > support mode is hardcoded to {{UNSUPPORTED}} {{ContinuousScanExec}} does not > initialize {{inputPartitions}} in {{supportsColumnar}} as it did prior to the > above PR. So, there is no call to > {{KafkaContinuousStream.planInputPartitions}} during query planning. It > leaves {{KafkaContinuousStream.knownPartitions}} uninitialized, so call to > {{needsReconfiguration}} returns true. Now {{epochUpdateThread}} in > {{ContinuousExecution}} requests interruption of {{queryExecutionThread}}, so > that thread also can't initialize {{knownPartitions}} as it checks for > interrupts in {{KafkaOffsetReaderAdmin.withRetries}} and exits before > {{knownPartitions}} is assigned. > The problem can be identified in the logs by the following repeating pattern: > {noformat} > 25/03/21 09:38:09.709 stream execution thread for [id = > 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = > 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO CommitLog: BatchIds found from > listing: > 25/03/21 09:38:09.709 stream execution thread for [id = > 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = > 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO ContinuousExecution: Starting new > streaming query. > 25/03/21 09:38:09.710 stream execution thread for [id = > 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = > 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO KafkaContinuousStream: Initial > offsets: {"topic-0":{"0":1,"1":1,"2":1,"3":1,"4":1}} > 25/03/21 09:38:09.710 stream execution thread for [id = > 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = > 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO BaseSessionStateBuilder$$anon$2: > Optimization rule > 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded > from the optimizer. > 25/03/21 09:38:09.710 stream execution thread for [id = > 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = > 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO BaseSessionStateBuilder$$anon$2: > Optimization rule > 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded > from the optimizer. > 25/03/21 09:38:09.711 stream execution thread for [id = > 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = > 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO EpochCoordinatorRef: Registered > EpochCoordinator endpoint > 25/03/21 09:38:09.711 stream execution thread for [id = > 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = > 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO BaseSessionStateBuilder$$anon$2: > Optimization rule > 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded > from the optimizer. > 25/03/21 09:38:09.711 stream execution thread for [id = > 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = > 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO BaseSessionStateBuilder$$anon$2: > Optimization rule > 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded > from the optimizer. > 25/03/21 09:38:09.713 stream execution thread for [id = > 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = > 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO ContinuousExecution: Query > 085613ee-b0b0-4864-8ac9-2bbd7c885e13 ignoring exception from reconfiguring: > java.lang.InterruptedException > 25/03/21 09:38:09.713 stream execution thread for [id = > 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = > 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO DAGScheduler: Asked to cancel job > group 03a5bef3-f5e1-4d23-991d-a19873db9192 with cancelFutureJobs=false > 25/03/21 09:38:09.713 dag-scheduler-event-loop WARN DAGScheduler: Failed to > cancel job group 03a5bef3-f5e1-4d23-991d-a19873db9192. Cannot find active > jobs for it. > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org