[ 
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896252#comment-15896252
 ] 

ASF GitHub Bot commented on FLINK-3679:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3314#discussion_r104312079
  
    --- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
 ---
    @@ -419,6 +424,164 @@ public void run() {
                assertFalse("fetcher threads did not properly finish", 
sourceContext.isStillBlocking());
        }
     
    +   @Test
    +   public void testSkipCorruptedMessage() throws Exception {
    +
    +           // ----- some test data -----
    +
    +           final String topic = "test-topic";
    +           final int partition = 3;
    +           final byte[] payload = new byte[] {1, 2, 3, 4};
    +
    +           final List<ConsumerRecord<byte[], byte[]>> records = 
Arrays.asList(
    +                   new ConsumerRecord<>(topic, partition, 15, payload, 
payload),
    +                   new ConsumerRecord<>(topic, partition, 16, payload, 
payload),
    +                   new ConsumerRecord<>(topic, partition, 17, payload, 
"end".getBytes()));
    +
    +           final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
data = new HashMap<>();
    +           data.put(new TopicPartition(topic, partition), records);
    +
    +           final ConsumerRecords<byte[], byte[]> consumerRecords = new 
ConsumerRecords<>(data);
    +
    +           // ----- the test consumer -----
    +
    +           final KafkaConsumer<?, ?> mockConsumer = 
mock(KafkaConsumer.class);
    +           when(mockConsumer.poll(anyLong())).thenAnswer(new 
Answer<ConsumerRecords<?, ?>>() {
    +                   @Override
    +                   public ConsumerRecords<?, ?> answer(InvocationOnMock 
invocation) {
    +                           return consumerRecords;
    +                   }
    +           });
    +
    +           
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
    +
    +           // ----- build a fetcher -----
    +
    +           ArrayList<String> results = new ArrayList<>();
    +           SourceContext<String> sourceContext = new 
CollectingSourceContext<>(results, results);
    +           Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
    +                   Collections.singletonMap(new KafkaTopicPartition(topic, 
partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
    +           KeyedDeserializationSchema<String> schema = new 
KeyedDeserializationSchema<String>() {
    +
    +                   @Override
    +                   public String deserialize(byte[] messageKey, byte[] 
message,
    +                                                                     
String topic, int partition, long offset) throws IOException {
    +                           return offset == 15 ? null : new 
String(message);
    +                   }
    +
    +                   @Override
    +                   public boolean isEndOfStream(String nextElement) {
    +                           return "end".equals(nextElement);
    +                   }
    +
    +                   @Override
    +                   public TypeInformation<String> getProducedType() {
    +                           return BasicTypeInfo.STRING_TYPE_INFO;
    +                   }
    +           };
    +
    +           final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
    +                   sourceContext,
    +                   partitionsWithInitialOffsets,
    +                   null, /* periodic watermark extractor */
    +                   null, /* punctuated watermark extractor */
    +                   new TestProcessingTimeService(),
    +                   10, /* watermark interval */
    +                   this.getClass().getClassLoader(),
    +                   true, /* checkpointing */
    +                   "task_name",
    +                   new UnregisteredMetricsGroup(),
    +                   schema,
    +                   new Properties(),
    +                   0L,
    +                   false);
    +
    +
    +           // ----- run the fetcher -----
    +
    +           fetcher.runFetchLoop();
    +           assertEquals(1, results.size());
    +   }
    +
    +   @Test
    +   public void testNullAsEOF() throws Exception {
    --- End diff --
    
    I'm not sure if this test is necessary. It's essentially just testing that 
`isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the 
condition is `element == null` seems irrelevant to what's been tested.
    
    We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`.
    



> Allow Kafka consumer to skip corrupted messages
> -----------------------------------------------
>
>                 Key: FLINK-3679
>                 URL: https://issues.apache.org/jira/browse/FLINK-3679
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API, Kafka Connector
>            Reporter: Jamie Grier
>            Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think 
> should be improved.  This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one 
> mapping between input and outputs.  In reality there are scenarios where one 
> input message (say from Kafka) might actually map to zero or more logical 
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a 
> source (such as Kafka) and say the raw bytes don't deserialize properly.  
> Right now the only recourse is to throw IOException and therefore fail the 
> job.  
> This is definitely not good since bad data is a reality and failing the job 
> is not the right option.  If the job fails we'll just end up replaying the 
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty 
> set.
> The other case is where one input message should logically be multiple output 
> messages.  This case is probably less important since there are other ways to 
> do this but in general it might be good to make the 
> DeserializationSchema.deserialize() method return a collection rather than a 
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics 
> more like that of FlatMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to