[ https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896252#comment-15896252 ]
ASF GitHub Bot commented on FLINK-3679: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3314#discussion_r104312079 --- Diff: flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java --- @@ -419,6 +424,164 @@ public void run() { assertFalse("fetcher threads did not properly finish", sourceContext.isStillBlocking()); } + @Test + public void testSkipCorruptedMessage() throws Exception { + + // ----- some test data ----- + + final String topic = "test-topic"; + final int partition = 3; + final byte[] payload = new byte[] {1, 2, 3, 4}; + + final List<ConsumerRecord<byte[], byte[]>> records = Arrays.asList( + new ConsumerRecord<>(topic, partition, 15, payload, payload), + new ConsumerRecord<>(topic, partition, 16, payload, payload), + new ConsumerRecord<>(topic, partition, 17, payload, "end".getBytes())); + + final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> data = new HashMap<>(); + data.put(new TopicPartition(topic, partition), records); + + final ConsumerRecords<byte[], byte[]> consumerRecords = new ConsumerRecords<>(data); + + // ----- the test consumer ----- + + final KafkaConsumer<?, ?> mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer<ConsumerRecords<?, ?>>() { + @Override + public ConsumerRecords<?, ?> answer(InvocationOnMock invocation) { + return consumerRecords; + } + }); + + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- build a fetcher ----- + + ArrayList<String> results = new ArrayList<>(); + SourceContext<String> sourceContext = new CollectingSourceContext<>(results, results); + Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets = + Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); + KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchema<String>() { + + @Override + public String deserialize(byte[] messageKey, byte[] message, + String topic, int partition, long offset) throws IOException { + return offset == 15 ? null : new String(message); + } + + @Override + public boolean isEndOfStream(String nextElement) { + return "end".equals(nextElement); + } + + @Override + public TypeInformation<String> getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + }; + + final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>( + sourceContext, + partitionsWithInitialOffsets, + null, /* periodic watermark extractor */ + null, /* punctuated watermark extractor */ + new TestProcessingTimeService(), + 10, /* watermark interval */ + this.getClass().getClassLoader(), + true, /* checkpointing */ + "task_name", + new UnregisteredMetricsGroup(), + schema, + new Properties(), + 0L, + false); + + + // ----- run the fetcher ----- + + fetcher.runFetchLoop(); + assertEquals(1, results.size()); + } + + @Test + public void testNullAsEOF() throws Exception { --- End diff -- I'm not sure if this test is necessary. It's essentially just testing that `isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the condition is `element == null` seems irrelevant to what's been tested. We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`. > Allow Kafka consumer to skip corrupted messages > ----------------------------------------------- > > Key: FLINK-3679 > URL: https://issues.apache.org/jira/browse/FLINK-3679 > Project: Flink > Issue Type: Bug > Components: DataStream API, Kafka Connector > Reporter: Jamie Grier > Assignee: Haohui Mai > > There are a couple of issues with the DeserializationSchema API that I think > should be improved. This request has come to me via an existing Flink user. > The main issue is simply that the API assumes that there is a one-to-one > mapping between input and outputs. In reality there are scenarios where one > input message (say from Kafka) might actually map to zero or more logical > elements in the pipeline. > Particularly important here is the case where you receive a message from a > source (such as Kafka) and say the raw bytes don't deserialize properly. > Right now the only recourse is to throw IOException and therefore fail the > job. > This is definitely not good since bad data is a reality and failing the job > is not the right option. If the job fails we'll just end up replaying the > bad data and the whole thing will start again. > Instead in this case it would be best if the user could just return the empty > set. > The other case is where one input message should logically be multiple output > messages. This case is probably less important since there are other ways to > do this but in general it might be good to make the > DeserializationSchema.deserialize() method return a collection rather than a > single element. > Maybe we need to support a DeserializationSchema variant that has semantics > more like that of FlatMap. -- This message was sent by Atlassian JIRA (v6.3.15#6346)