[ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657655#comment-15657655 ]
ASF GitHub Bot commented on FLINK-5048: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2789#discussion_r87626228 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java --- @@ -66,36 +57,15 @@ /** The schema to convert between Kafka's byte messages, and Flink's objects */ private final KeyedDeserializationSchema<T> deserializer; - /** The configuration for the Kafka consumer */ - private final Properties kafkaProperties; + /** The handover of data and exceptions between the consumer thread and the task thread */ + private final Handover handover; - /** The maximum number of milliseconds to wait for a fetch batch */ - private final long pollTimeout; - - /** The next offsets that the main thread should commit */ - private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit; - - /** The callback invoked by Kafka once an offset commit is complete */ - private final OffsetCommitCallback offsetCommitCallback; - - /** Reference to the Kafka consumer, once it is created */ - private volatile KafkaConsumer<byte[], byte[]> consumer; - - /** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */ - private volatile ExceptionProxy errorHandler; + /** The thread that runs the proper KafkaConsumer and hand the record batches to this fetcher */ --- End diff -- nit: 'proper' confused me a bit at first. Perhaps 'actual'? > Kafka Consumer (0.9/0.10) threading model leads problematic cancellation > behavior > --------------------------------------------------------------------------------- > > Key: FLINK-5048 > URL: https://issues.apache.org/jira/browse/FLINK-5048 > Project: Flink > Issue Type: Bug > Components: Kafka Connector > Affects Versions: 1.1.3 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.2.0 > > > The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that > operates the KafkaConsumer. That thread is shielded from interrupts, because > the Kafka Consumer has not been handling thread interrupts well. > Since that thread is also the thread that emits records, it may block in the > network stack (backpressure) or in chained operators. The later case leads to > situations where cancellations get very slow unless that thread would be > interrupted (which it cannot be). > I propose to change the thread model as follows: > - A spawned consumer thread pull from the KafkaConsumer and pushes its > pulled batch of records into a blocking queue (size one) > - The main thread of the task will pull the record batches from the > blocking queue and emit the records. > This allows actually for some additional I/O overlay while limiting the > additional memory consumption - only two batches are ever held, one being > fetched and one being emitted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)