[ 
https://issues.apache.org/jira/browse/SPARK-51586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-51586.
----------------------------------
    Fix Version/s: 4.0.0
       Resolution: Fixed

Issue resolved by pull request 50348
[https://github.com/apache/spark/pull/50348]

> Kafka continuous stream may go into infinite loop of reconfiguring
> ------------------------------------------------------------------
>
>                 Key: SPARK-51586
>                 URL: https://issues.apache.org/jira/browse/SPARK-51586
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.0.0, 4.1.0
>            Reporter: Vlad Rozov
>            Assignee: Vlad Rozov
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.0.0
>
>
> After [-SPARK-45080-|https://issues.apache.org/jira/browse/SPARK-45080] and 
> [-PR 42823-|https://github.com/apache/spark/pull/42823] Kafka continuous 
> stream may go into infinite loop of reconfiguring. As {{KafkaScan}} columnar 
> support mode is hardcoded to {{UNSUPPORTED}} {{ContinuousScanExec}} does not 
> initialize {{inputPartitions}} in {{supportsColumnar}} as it did prior to the 
> above PR. So, there is no call to 
> {{KafkaContinuousStream.planInputPartitions}} during query planning. It 
> leaves {{KafkaContinuousStream.knownPartitions}} uninitialized, so call to 
> {{needsReconfiguration}} returns true. Now {{epochUpdateThread}} in 
> {{ContinuousExecution}} requests interruption of {{queryExecutionThread}}, so 
> that thread also can't initialize {{knownPartitions}} as it checks for 
> interrupts in {{KafkaOffsetReaderAdmin.withRetries}} and exits before 
> {{knownPartitions}}  is assigned.
> The problem can be identified in the logs by the following repeating pattern:
> {noformat}
> 25/03/21 09:38:09.709 stream execution thread for [id = 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = 
> 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO CommitLog: BatchIds found from 
> listing: 
> 25/03/21 09:38:09.709 stream execution thread for [id = 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = 
> 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO ContinuousExecution: Starting new 
> streaming query.
> 25/03/21 09:38:09.710 stream execution thread for [id = 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = 
> 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO KafkaContinuousStream: Initial 
> offsets: {"topic-0":{"0":1,"1":1,"2":1,"3":1,"4":1}}
> 25/03/21 09:38:09.710 stream execution thread for [id = 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = 
> 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO BaseSessionStateBuilder$$anon$2: 
> Optimization rule 
> 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded 
> from the optimizer.
> 25/03/21 09:38:09.710 stream execution thread for [id = 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = 
> 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO BaseSessionStateBuilder$$anon$2: 
> Optimization rule 
> 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded 
> from the optimizer.
> 25/03/21 09:38:09.711 stream execution thread for [id = 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = 
> 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO EpochCoordinatorRef: Registered 
> EpochCoordinator endpoint
> 25/03/21 09:38:09.711 stream execution thread for [id = 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = 
> 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO BaseSessionStateBuilder$$anon$2: 
> Optimization rule 
> 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded 
> from the optimizer.
> 25/03/21 09:38:09.711 stream execution thread for [id = 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = 
> 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO BaseSessionStateBuilder$$anon$2: 
> Optimization rule 
> 'org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation' is excluded 
> from the optimizer.
> 25/03/21 09:38:09.713 stream execution thread for [id = 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = 
> 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO ContinuousExecution: Query 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13 ignoring exception from reconfiguring: 
> java.lang.InterruptedException
> 25/03/21 09:38:09.713 stream execution thread for [id = 
> 085613ee-b0b0-4864-8ac9-2bbd7c885e13, runId = 
> 03a5bef3-f5e1-4d23-991d-a19873db9192] INFO DAGScheduler: Asked to cancel job 
> group 03a5bef3-f5e1-4d23-991d-a19873db9192 with cancelFutureJobs=false
> 25/03/21 09:38:09.713 dag-scheduler-event-loop WARN DAGScheduler: Failed to 
> cancel job group 03a5bef3-f5e1-4d23-991d-a19873db9192. Cannot find active 
> jobs for it.
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to