[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67295348 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("this is", result.productElement(0)); +// assertEquals(new Integer(1), result.productElement(1)); +// assertEquals(new Double(2.0), result.productElement(2)); +// +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("a test", result.productEl
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333241#comment-15333241 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67295348 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("this is", result.productElement(0)); +// assertEquals(new Integer(1), result.productElement(1));
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67295515 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67295498 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333244#comment-15333244 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67295498 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); --- End diff -- ok > Create a RowCsvInputFormat to use as default CSV IF in Table API > > > Key: FLINK-3901 > URL: https://issues.apache.org/jira/browse/FLINK-3901 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier >Priority: Minor > Labels: csv, null-values, row, tuple > > At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that > has the big limitation of 25 fields and null handling. > A new IF producing Row object is indeed necessary to avoid those limitations -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333246#comment-15333246 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67295515 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); --- End diff -- ok > Create a RowCsvInputFormat to use as default CSV IF in Table API > > > Key: FLINK-3901 > URL: https://issues.apache.org/jira/browse/FLINK-3901 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier >Priority: Minor > Labels: csv, null-value
[jira] [Updated] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Knauf updated FLINK-4027: Priority: Major (was: Critical) > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Knauf updated FLINK-4027: Priority: Critical (was: Major) > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67295853 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { + if (reuse == null) { + reuse = new Row(rowSerializer.getLength()); + } + for (int i = 0; i < parsedValues.length; i++) { + reuse.setField(i, parsedValues[i]); +
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333253#comment-15333253 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67295853 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { +
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67296019 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { + if (reuse == null) { + reuse = new Row(rowSerializer.getLength()); + } + for (int i = 0; i < parsedValues.length; i++) { + reuse.setField(i, parsedValues[i]); +
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333255#comment-15333255 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67296019 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { +
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333260#comment-15333260 ] ASF GitHub Bot commented on FLINK-4027: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67296678 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -276,6 +314,41 @@ public void close() throws Exception { checkErroneous(); } + // --- Logic for handling checkpoint flushing -- // + + private void acknowledgeMessage() { + if(!flushOnCheckpoint) { + // the logic is disabled + return; + } + pendingRecords--; + } + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + if(flushOnCheckpoint) { + // flushing is activated: We need to wait until pendingRecords is 0 + while(pendingRecords > 0) { + try { + Thread.sleep(10); --- End diff -- The problem is that the `flush()` method is only implemented by the Kafka 0.9 producer, not by the 0.8 implementation. As you can see from the classname, its the shared base class between the two version specific implementations. I think for the 0.8 producer, there is no way around the waiting approach. I'll update the pull request to call `flush()` on the 0.9 producer. > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67296678 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -276,6 +314,41 @@ public void close() throws Exception { checkErroneous(); } + // --- Logic for handling checkpoint flushing -- // + + private void acknowledgeMessage() { + if(!flushOnCheckpoint) { + // the logic is disabled + return; + } + pendingRecords--; + } + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + if(flushOnCheckpoint) { + // flushing is activated: We need to wait until pendingRecords is 0 + while(pendingRecords > 0) { + try { + Thread.sleep(10); --- End diff -- The problem is that the `flush()` method is only implemented by the Kafka 0.9 producer, not by the 0.8 implementation. As you can see from the classname, its the shared base class between the two version specific implementations. I think for the 0.8 producer, there is no way around the waiting approach. I'll update the pull request to call `flush()` on the 0.9 producer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67297138 --- Diff: flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, Int]]() --- End diff -- The line should be ``` private val emptyTupleData = List[(Int, Long, String, Long, Int)]() ``` by `List[X]` you define the type to be a list of type `X`. `(Int, Long, String)` is a shortcut to define the type for a `scala.Tuple3[Int, Long, String]`. The final `()` creates an empty list. You can also remove the `new CustomType()` from the next line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333266#comment-15333266 ] ASF GitHub Bot commented on FLINK-3650: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67297138 --- Diff: flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, Int]]() --- End diff -- The line should be ``` private val emptyTupleData = List[(Int, Long, String, Long, Int)]() ``` by `List[X]` you define the type to be a list of type `X`. `(Int, Long, String)` is a shortcut to define the type for a `scala.Tuple3[Int, Long, String]`. The final `()` creates an empty list. You can also remove the `new CustomType()` from the next line. > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67297990 --- Diff: flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, Int]]() --- End diff -- Yes. I just now updated it reading some where. One more test is failing checking that and then wil push the PR. I followed the steps that you told. So now all my squashed commits are in a new branch and I will push that forcefully. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333278#comment-15333278 ] ASF GitHub Bot commented on FLINK-3650: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/1856#discussion_r67297990 --- Diff: flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.operator + +import org.apache.flink.api.common.InvalidProgramException +import org.apache.flink.api.scala._ +import org.junit.Test +import org.junit.Assert + +class MaxByOperatorTest { + private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, Int]]() --- End diff -- Yes. I just now updated it reading some where. One more test is failing checking that and then wil push the PR. I followed the steps that you told. So now all my squashed commits are in a new branch and I will push that forcefully. > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67298280 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { + if (reuse == null) { + reuse = new Row(rowSerializer.getLength()); + } + for (int i = 0; i < parsedValues.length; i++) { + reuse.setField(i, parsedValues[i]); + }
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333284#comment-15333284 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67298280 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { +
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333290#comment-15333290 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67299243 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("this is", result.productElement(0)); +// assertEquals(new Integer(1), result.productElement(1)); +//
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67299243 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("this is", result.productElement(0)); +// assertEquals(new Integer(1), result.productElement(1)); +// assertEquals(new Double(2.0), result.productElement(2)); +// +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("a test", result.productElement
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67299396 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("this is", result.productElement(0)); +// assertEquals(new Integer(1), result.productElement(1)); +// assertEquals(new Double(2.0), result.productElement(2)); +// +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("a test", result.productEl
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67299477 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { + if (reuse == null) { + reuse = new Row(rowSerializer.getLength()); + } + for (int i = 0; i < parsedValues.length; i++) { + reuse.setField(i, parsedValues[i]); + }
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333292#comment-15333292 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67299396 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("this is", result.productElement(0)); +// assertEquals(new Integer(1), result.productElement(1));
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333293#comment-15333293 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67299477 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowSerializer; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.FieldParser.ParseErrorState; + +@Internal +public class RowCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private RowSerializer rowSerializer; + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, createDefaultMask(rowTypeInfo.getArity())); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity()) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask); + } + + public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, rowTypeInfo, includedFieldsMask); + } + + public RowCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, RowTypeInfo rowTypeInfo, + boolean[] includedFieldsMask) { + super(filePath); + configure(lineDelimiter, fieldDelimiter, rowTypeInfo, includedFieldsMask); + } + + private void configure(String lineDelimiter, String fieldDelimiter, + RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) { + + if (rowTypeInfo.getArity() == 0) { + throw new IllegalArgumentException("Row arity must be greater than 0."); + } + + if (includedFieldsMask == null) { + includedFieldsMask = createDefaultMask(rowTypeInfo.getArity()); + } + + rowSerializer = (RowSerializer) rowTypeInfo.createSerializer(new ExecutionConfig()); + + setDelimiter(lineDelimiter); + setFieldDelimiter(fieldDelimiter); + + Class[] classes = new Class[rowTypeInfo.getArity()]; + + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldsGeneric(includedFieldsMask, classes); + } + + @Override + public Row fillRecord(Row reuse, Object[] parsedValues) { +
[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2100 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2007: [FLINK-3908] Fixed Parser's error state reset
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2007 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle
[ https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333298#comment-15333298 ] ASF GitHub Bot commented on FLINK-4024: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2100 > FileSourceFunction not adjusted to new IF lifecycle > --- > > Key: FLINK-4024 > URL: https://issues.apache.org/jira/browse/FLINK-4024 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas >Priority: Critical > Fix For: 1.1.0 > > > The InputFormat lifecycle was extended in > ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional > open-/closeInputFormat() methods. > The streaming FileSourceFunction was not adjusted for this change, and thus > will fail for every InputFormat that leverages these new methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3908) FieldParsers error state is not reset correctly to NONE
[ https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333299#comment-15333299 ] ASF GitHub Bot commented on FLINK-3908: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2007 > FieldParsers error state is not reset correctly to NONE > --- > > Key: FLINK-3908 > URL: https://issues.apache.org/jira/browse/FLINK-3908 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: parser > > If during the parse of a csv there's a parse error (for example when in a > integer column there are non-int values) the errorState is not reset > correctly in the next parseField call. A simple fix would be to add as a > first statement of the {{parseField()}} function a call to > {{setErrorState(ParseErrorState.NONE)}} but it is something that should be > handled better (by default) for every subclass of {{FieldParser}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle
[ https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske resolved FLINK-4024. -- Resolution: Fixed Fixed with fe0eb602d2e908d116d71018bb141e5c748c780b > FileSourceFunction not adjusted to new IF lifecycle > --- > > Key: FLINK-4024 > URL: https://issues.apache.org/jira/browse/FLINK-4024 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas >Priority: Critical > Fix For: 1.1.0 > > > The InputFormat lifecycle was extended in > ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional > open-/closeInputFormat() methods. > The streaming FileSourceFunction was not adjusted for this change, and thus > will fail for every InputFormat that leverages these new methods. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3908) FieldParsers error state is not reset correctly to NONE
[ https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-3908. Resolution: Fixed Fix Version/s: 1.1.0 Fixed with d8747978386c0aacce1dc2631e1c5602ec9b2a00 > FieldParsers error state is not reset correctly to NONE > --- > > Key: FLINK-3908 > URL: https://issues.apache.org/jira/browse/FLINK-3908 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Flavio Pompermaier > Labels: parser > Fix For: 1.1.0 > > > If during the parse of a csv there's a parse error (for example when in a > integer column there are non-int values) the errorState is not reset > correctly in the next parseField call. A simple fix would be to add as a > first statement of the {{parseField()}} function a call to > {{setErrorState(ParseErrorState.NONE)}} but it is something that should be > handled better (by default) for every subclass of {{FieldParser}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...
GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/2109 [FLINK-3677] FileInputFormat: Allow to specify include/exclude file name patterns Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink file-include Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2109.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2109 commit f45dadf90322cc180b1afe1dc91d83a3aced7e22 Author: Ivan Mushketyk Date: 2016-06-14T21:44:19Z [FLINK-3677] Add FileMatcher class commit 8d4a2a72020ee19fe70d06ded953af67b6ed487d Author: Ivan Mushketyk Date: 2016-06-15T20:47:43Z [FLINK-3677] FileInputFormat: Allow to specify include/exclude file name patterns commit 350527bb6c1bf7a6e9d252bb921125249103635c Author: Ivan Mushketyk Date: 2016-06-15T20:48:13Z [FLINK-3677] Rename test commit 6045fe79e192aca6e97f568351927a17e0c64d09 Author: Ivan Mushketyk Date: 2016-06-15T21:07:55Z [FLINK-3677] Tests refactoring --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns
[ https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1507#comment-1507 ] ASF GitHub Bot commented on FLINK-3677: --- GitHub user mushketyk opened a pull request: https://github.com/apache/flink/pull/2109 [FLINK-3677] FileInputFormat: Allow to specify include/exclude file name patterns Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/mushketyk/flink file-include Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2109.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2109 commit f45dadf90322cc180b1afe1dc91d83a3aced7e22 Author: Ivan Mushketyk Date: 2016-06-14T21:44:19Z [FLINK-3677] Add FileMatcher class commit 8d4a2a72020ee19fe70d06ded953af67b6ed487d Author: Ivan Mushketyk Date: 2016-06-15T20:47:43Z [FLINK-3677] FileInputFormat: Allow to specify include/exclude file name patterns commit 350527bb6c1bf7a6e9d252bb921125249103635c Author: Ivan Mushketyk Date: 2016-06-15T20:48:13Z [FLINK-3677] Rename test commit 6045fe79e192aca6e97f568351927a17e0c64d09 Author: Ivan Mushketyk Date: 2016-06-15T21:07:55Z [FLINK-3677] Tests refactoring > FileInputFormat: Allow to specify include/exclude file name patterns > > > Key: FLINK-3677 > URL: https://issues.apache.org/jira/browse/FLINK-3677 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Maximilian Michels >Assignee: Ivan Mushketyk >Priority: Minor > Labels: starter > > It would be nice to be able to specify a regular expression to filter files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4053) Return value from Connection should be checked against null
[ https://issues.apache.org/jira/browse/FLINK-4053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1523#comment-1523 ] Ivan Mushketyk commented on FLINK-4053: --- I'll fix this. > Return value from Connection should be checked against null > --- > > Key: FLINK-4053 > URL: https://issues.apache.org/jira/browse/FLINK-4053 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Ivan Mushketyk >Priority: Minor > > In RMQSource.java and RMQSink.java, there is code in the following pattern: > {code} > connection = factory.newConnection(); > channel = connection.createChannel(); > {code} > According to > https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#createChannel() > : > {code} > Returns: > a new channel descriptor, or null if none is available > {code} > The return value should be checked against null. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4053) Return value from Connection should be checked against null
[ https://issues.apache.org/jira/browse/FLINK-4053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Mushketyk reassigned FLINK-4053: - Assignee: Ivan Mushketyk > Return value from Connection should be checked against null > --- > > Key: FLINK-4053 > URL: https://issues.apache.org/jira/browse/FLINK-4053 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Assignee: Ivan Mushketyk >Priority: Minor > > In RMQSource.java and RMQSink.java, there is code in the following pattern: > {code} > connection = factory.newConnection(); > channel = connection.createChannel(); > {code} > According to > https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#createChannel() > : > {code} > Returns: > a new channel descriptor, or null if none is available > {code} > The return value should be checked against null. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)
[ https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1526#comment-1526 ] Ivan Mushketyk commented on FLINK-3943: --- I'll work on this. > Add support for EXCEPT (set minus) > -- > > Key: FLINK-3943 > URL: https://issues.apache.org/jira/browse/FLINK-3943 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk >Priority: Minor > > Currently, the Table API and SQL do not support EXCEPT. > EXCEPT can be executed as a coGroup on all fields that forwards records of > the first input if the second input is empty. > In order to add support for EXCEPT to the Table API and SQL we need to: > - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet > API program using a coGroup on all fields. > - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} > into a {{DataSetMinus}}. > - Extend the Table API (and validation phase) to provide an except() method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3943) Add support for EXCEPT (set minus)
[ https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Mushketyk reassigned FLINK-3943: - Assignee: Ivan Mushketyk > Add support for EXCEPT (set minus) > -- > > Key: FLINK-3943 > URL: https://issues.apache.org/jira/browse/FLINK-3943 > Project: Flink > Issue Type: New Feature > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk >Priority: Minor > > Currently, the Table API and SQL do not support EXCEPT. > EXCEPT can be executed as a coGroup on all fields that forwards records of > the first input if the second input is empty. > In order to add support for EXCEPT to the Table API and SQL we need to: > - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet > API program using a coGroup on all fields. > - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} > into a {{DataSetMinus}}. > - Extend the Table API (and validation phase) to provide an except() method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API
[ https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1586#comment-1586 ] ASF GitHub Bot commented on FLINK-3650: --- Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/1856 Force pushed as per your advice @fhueske. Ran mvn clean verify and there are no warnings generated. > Add maxBy/minBy to Scala DataSet API > > > Key: FLINK-3650 > URL: https://issues.apache.org/jira/browse/FLINK-3650 > Project: Flink > Issue Type: Improvement > Components: Java API, Scala API >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: ramkrishna.s.vasudevan > > The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. > These methods are not supported by the Scala DataSet API. These methods > should be added in order to have a consistent API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API
Github user ramkrish86 commented on the issue: https://github.com/apache/flink/pull/1856 Force pushed as per your advice @fhueske. Ran mvn clean verify and there are no warnings generated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3344) EventTimeWindowCheckpointingITCase.testPreAggregatedTumblingTimeWindow
[ https://issues.apache.org/jira/browse/FLINK-3344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333416#comment-15333416 ] Till Rohrmann commented on FLINK-3344: -- Another instance, but this time the error was: {{Caused by: java.lang.AssertionError: Window start: 600 end: 700 expected:<64950> but was:<21872>}} https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748030/log.txt > EventTimeWindowCheckpointingITCase.testPreAggregatedTumblingTimeWindow > -- > > Key: FLINK-3344 > URL: https://issues.apache.org/jira/browse/FLINK-3344 > Project: Flink > Issue Type: Bug > Components: Tests >Reporter: Matthias J. Sax >Assignee: Aljoscha Krettek >Priority: Critical > Labels: test-stability > > https://travis-ci.org/apache/flink/jobs/107198388 > https://travis-ci.org/mjsax/flink/jobs/107198383 > {noformat} > Maven produced no output for 300 seconds. > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators
[ https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333422#comment-15333422 ] ASF GitHub Bot commented on FLINK-3974: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2110 [FLINK-3974] Fix object reuse with multi-chaining Before, a job would fail if object reuse was enabled and multiple operators were chained to one upstream operator. Now, we always create a shallow copy of the StreamRecord in OperatorChain.ChainingOutput because downstream operations change/reuse the StreamRecord. This fix was contributed by @wanderingbort (if this is the right github handle) as a patch on the Flink Jira. I can change the commit to attribute it to him but so far he didn't respond to my question about this on Jira. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink chaining/fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2110.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2110 commit 092f350cccbda32331f527c4eaf7ad3304fa1811 Author: Aljoscha Krettek Date: 2016-06-14T10:18:35Z [FLINK-3974] Fix object reuse with multi-chaining Before, a job would fail if object reuse was enabled and multiple operators were chained to one upstream operator. Now, we always create a shallow copy of the StreamRecord in OperatorChain.ChainingOutput because downstream operations change/reuse the StreamRecord. > enableObjectReuse fails when an operator chains to multiple downstream > operators > > > Key: FLINK-3974 > URL: https://issues.apache.org/jira/browse/FLINK-3974 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.0.3 >Reporter: B Wyatt > Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch > > > Given a topology that looks like this: > {code:java} > DataStream input = ... > input > .map(MapFunction...) > .addSink(...); > input > .map(MapFunction...) > ​.addSink(...); > {code} > enableObjectReuse() will cause an exception in the form of > {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown. > It looks like the input operator calls {{Output>.collect}} > which attempts to loop over the downstream operators and process them. > However, the first map operation will call {{StreamRecord<>.replace}} which > mutates the value stored in the StreamRecord<>. > As a result, when the {{Output>.collect}} call passes the > {{StreamRecord}} to the second map operation it is actually a > {{StreamRecord}} and behaves as if the two map operations were serial > instead of parallel. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4075) ContinuousFileProcessingCheckpointITCase failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-4075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333420#comment-15333420 ] Till Rohrmann commented on FLINK-4075: -- Another instance: https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748033/log.txt > ContinuousFileProcessingCheckpointITCase failed on Travis > - > > Key: FLINK-4075 > URL: https://issues.apache.org/jira/browse/FLINK-4075 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The test case {{ContinuousFileProcessingCheckpointITCase}} failed on Travis. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748004/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2110 [FLINK-3974] Fix object reuse with multi-chaining Before, a job would fail if object reuse was enabled and multiple operators were chained to one upstream operator. Now, we always create a shallow copy of the StreamRecord in OperatorChain.ChainingOutput because downstream operations change/reuse the StreamRecord. This fix was contributed by @wanderingbort (if this is the right github handle) as a patch on the Flink Jira. I can change the commit to attribute it to him but so far he didn't respond to my question about this on Jira. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink chaining/fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2110.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2110 commit 092f350cccbda32331f527c4eaf7ad3304fa1811 Author: Aljoscha Krettek Date: 2016-06-14T10:18:35Z [FLINK-3974] Fix object reuse with multi-chaining Before, a job would fail if object reuse was enabled and multiple operators were chained to one upstream operator. Now, we always create a shallow copy of the StreamRecord in OperatorChain.ChainingOutput because downstream operations change/reuse the StreamRecord. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3813) YARNSessionFIFOITCase.testDetachedMode failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333424#comment-15333424 ] Till Rohrmann commented on FLINK-3813: -- Another instance: https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748041/log.txt > YARNSessionFIFOITCase.testDetachedMode failed on Travis > --- > > Key: FLINK-3813 > URL: https://issues.apache.org/jira/browse/FLINK-3813 > Project: Flink > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{YARNSessionFIFOITCase.testDetachedMode}} failed on Travis. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/125560038/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333425#comment-15333425 ] Till Rohrmann commented on FLINK-2392: -- More instances of failing yarn tests: https://s3.amazonaws.com/archive.travis-ci.org/jobs/137801198/log.txt https://s3.amazonaws.com/archive.travis-ci.org/jobs/137801197/log.txt > Instable test in flink-yarn-tests > - > > Key: FLINK-2392 > URL: https://issues.apache.org/jira/browse/FLINK-2392 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10.0 >Reporter: Matthias J. Sax >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.0.0 > > > The test YARNSessionFIFOITCase fails from time to time on an irregular basis. > For example see: https://travis-ci.org/apache/flink/jobs/72019690 > {noformat} > Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) > Time elapsed: 60.651 sec <<< FAILURE! > java.lang.AssertionError: During the timeout period of 60 seconds the > expected string did not show up > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435) > Results : > Failed tests: > > YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478 > During the timeout period of 60 seconds the expected string did not show up > {noformat} > Another error case is this (see > https://travis-ci.org/mjsax/flink/jobs/77313444) > {noformat} > Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 27.356 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 17.421 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase) Time elapsed: > 11.984 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > {noformat} > Furthermore, this build failed too: > https://travis-ci.org/apache/flink/jobs/77313450 > (no error, but Travis terminated to due no progress for 300 seconds -> > deadlock?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2111: [FLINK-4076] BoltWrapper#dispose() should call Abs...
GitHub user f7753 opened a pull request: https://github.com/apache/flink/pull/2111 [FLINK-4076] BoltWrapper#dispose() should call AbstractStreamOperator#dispose() Add missing method `super.close()` that should be called in sub class `org.apache.flink.storm.wrappers.BoltWrapper` 's override method . You can merge this pull request into a Git repository by running: $ git pull https://github.com/f7753/flink flink4076 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2111 commit 0c97618dd8f80ae45a2bb9dce3c85e2014c0b481 Author: MaBiao Date: 2016-06-16T09:17:41Z [FLINK-4076] add spuer.dispose() in class BoltWrapper --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4076) BoltWrapper#dispose() should call AbstractStreamOperator#dispose()
[ https://issues.apache.org/jira/browse/FLINK-4076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333432#comment-15333432 ] ASF GitHub Bot commented on FLINK-4076: --- GitHub user f7753 opened a pull request: https://github.com/apache/flink/pull/2111 [FLINK-4076] BoltWrapper#dispose() should call AbstractStreamOperator#dispose() Add missing method `super.close()` that should be called in sub class `org.apache.flink.storm.wrappers.BoltWrapper` 's override method . You can merge this pull request into a Git repository by running: $ git pull https://github.com/f7753/flink flink4076 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2111.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2111 commit 0c97618dd8f80ae45a2bb9dce3c85e2014c0b481 Author: MaBiao Date: 2016-06-16T09:17:41Z [FLINK-4076] add spuer.dispose() in class BoltWrapper > BoltWrapper#dispose() should call AbstractStreamOperator#dispose() > -- > > Key: FLINK-4076 > URL: https://issues.apache.org/jira/browse/FLINK-4076 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > @Override > public void dispose() { > this.bolt.cleanup(); > } > {code} > AbstractStreamOperator#dispose() should be called first. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333437#comment-15333437 ] Maximilian Michels commented on FLINK-2392: --- The logs fail because the following Exception is logged: {noformat} 2016-06-16 04:58:37,218 ERROR org.apache.flink.metrics.reporter.JMXReporter - Metric did not comply with JMX MBean naming rules. javax.management.NotCompliantMBeanException: Interface is not public: org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114) at com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102) at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67) at com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192) at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138) at com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60) at com.sun.jmx.mbeanserver.Introspector.makeDynamicMBean(Introspector.java:192) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:113) at org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:174) at org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:206) at org.apache.flink.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:162) at org.apache.flink.runtime.taskmanager.TaskManager$.instantiateClassLoaderMetrics(TaskManager.scala:2298) at org.apache.flink.runtime.taskmanager.TaskManager$.org$apache$flink$runtime$taskmanager$TaskManager$$instantiateStatusMetrics(TaskManager.scala:2287) at org.apache.flink.runtime.taskmanager.TaskManager.associateWithJobManager(TaskManager.scala:951) at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRegistrationMessage(TaskManager.scala:631) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:297) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2016-06-16 04:58:37,221 ERROR org.apache.flink.metrics.reporter.JMXReporter - Metric did not comply with JMX MBean naming rules. javax.management.NotCompliantMBeanException: Interface is not public: org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114) at com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102) at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67) at com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192) at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138) at com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60) at com.sun.jmx.mbeanserver.Introspector.makeDynamicMBean(Introspector.java:192) at com.sun.jmx.i
[jira] [Comment Edited] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333437#comment-15333437 ] Maximilian Michels edited comment on FLINK-2392 at 6/16/16 9:26 AM: The builds fail because the following Exception is logged: {noformat} 2016-06-16 04:58:37,218 ERROR org.apache.flink.metrics.reporter.JMXReporter - Metric did not comply with JMX MBean naming rules. javax.management.NotCompliantMBeanException: Interface is not public: org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114) at com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102) at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67) at com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192) at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138) at com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60) at com.sun.jmx.mbeanserver.Introspector.makeDynamicMBean(Introspector.java:192) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:113) at org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:174) at org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:206) at org.apache.flink.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:162) at org.apache.flink.runtime.taskmanager.TaskManager$.instantiateClassLoaderMetrics(TaskManager.scala:2298) at org.apache.flink.runtime.taskmanager.TaskManager$.org$apache$flink$runtime$taskmanager$TaskManager$$instantiateStatusMetrics(TaskManager.scala:2287) at org.apache.flink.runtime.taskmanager.TaskManager.associateWithJobManager(TaskManager.scala:951) at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRegistrationMessage(TaskManager.scala:631) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:297) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2016-06-16 04:58:37,221 ERROR org.apache.flink.metrics.reporter.JMXReporter - Metric did not comply with JMX MBean naming rules. javax.management.NotCompliantMBeanException: Interface is not public: org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114) at com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102) at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67) at com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192) at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138) at com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60) at com.sun.jmx.mbeanserver.Introspector.makeDynamicMBe
[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2093#discussion_r67313976 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,9 +132,9 @@ /** * The allowed lateness for elements. This is used for: -* Deciding if an element should be dropped from a window due to lateness. -* Clearing the state of a window if the system time passes -* the {@code window.maxTimestamp + allowedLateness} landmark. +* Deciding if an element should be dropped from a window due to lateness. --- End diff -- Ah, I meant something like this: ``` item 1 item2 the javadoc syntax is almost similar to HTML, just the closing `` tags are missing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333442#comment-15333442 ] Maximilian Michels commented on FLINK-2392: --- Should be fixed the resolution of FLINK-3962 > Instable test in flink-yarn-tests > - > > Key: FLINK-2392 > URL: https://issues.apache.org/jira/browse/FLINK-2392 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10.0 >Reporter: Matthias J. Sax >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.0.0 > > > The test YARNSessionFIFOITCase fails from time to time on an irregular basis. > For example see: https://travis-ci.org/apache/flink/jobs/72019690 > {noformat} > Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) > Time elapsed: 60.651 sec <<< FAILURE! > java.lang.AssertionError: During the timeout period of 60 seconds the > expected string did not show up > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435) > Results : > Failed tests: > > YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478 > During the timeout period of 60 seconds the expected string did not show up > {noformat} > Another error case is this (see > https://travis-ci.org/mjsax/flink/jobs/77313444) > {noformat} > Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 27.356 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 17.421 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase) Time elapsed: > 11.984 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > {noformat} > Furthermore, this build failed too: > https://travis-ci.org/apache/flink/jobs/77313450 > (no error, but Travis terminated to due no progress for 300 seconds -> > deadlock?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics
[ https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333440#comment-15333440 ] Maximilian Michels commented on FLINK-3962: --- Another exception from the Yarn logs: {noformat} 2016-06-16 04:58:37,218 ERROR org.apache.flink.metrics.reporter.JMXReporter - Metric did not comply with JMX MBean naming rules. javax.management.NotCompliantMBeanException: Interface is not public: org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114) at com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102) at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67) at com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192) at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138) at com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60) at com.sun.jmx.mbeanserver.Introspector.makeDynamicMBean(Introspector.java:192) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:898) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) at org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:113) at org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:174) at org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:206) at org.apache.flink.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:162) at org.apache.flink.runtime.taskmanager.TaskManager$.instantiateClassLoaderMetrics(TaskManager.scala:2298) at org.apache.flink.runtime.taskmanager.TaskManager$.org$apache$flink$runtime$taskmanager$TaskManager$$instantiateStatusMetrics(TaskManager.scala:2287) at org.apache.flink.runtime.taskmanager.TaskManager.associateWithJobManager(TaskManager.scala:951) at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRegistrationMessage(TaskManager.scala:631) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:297) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2016-06-16 04:58:37,221 ERROR org.apache.flink.metrics.reporter.JMXReporter - Metric did not comply with JMX MBean naming rules. javax.management.NotCompliantMBeanException: Interface is not public: org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114) at com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102) at com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67) at com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192) at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138) at com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60) at com.sun.jmx.mbeanserver.Introspector.makeDynamicMBean(Introspector.java:192) at com.sun.jmx.interceptor.DefaultM
[jira] [Commented] (FLINK-3714) Add Support for "Allowed Lateness"
[ https://issues.apache.org/jira/browse/FLINK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333441#comment-15333441 ] ASF GitHub Bot commented on FLINK-3714: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2093#discussion_r67313976 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,9 +132,9 @@ /** * The allowed lateness for elements. This is used for: -* Deciding if an element should be dropped from a window due to lateness. -* Clearing the state of a window if the system time passes -* the {@code window.maxTimestamp + allowedLateness} landmark. +* Deciding if an element should be dropped from a window due to lateness. --- End diff -- Ah, I meant something like this: ``` item 1 item2 the javadoc syntax is almost similar to HTML, just the closing `` tags are missing. > Add Support for "Allowed Lateness" > -- > > Key: FLINK-3714 > URL: https://issues.apache.org/jira/browse/FLINK-3714 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > > As mentioned in > https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit# > we should add support for an allowed lateness setting. > This includes several things: > - API for setting allowed lateness > - Dropping of late elements > - Garbage collection of windows state/timers > Depending on whether the {{WindowAssigner}} assigns windows based on event > time or processing time we have to adjust the GC behavior. For event-time > windows "allowed lateness" makes sense and we should garbage collect after > this expires. For processing-time windows "allowed lateness" does not make > sense and we should always GC window state/timers at the end timestamp of a > processing-time window. I think that we need a method for this on > {{WindowAssigner}} that allows to differentiate between event-time windows > and processing-time windows: {{boolean WindowAssigner.isEventTime()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat
Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67314234 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("this is", result.productElement(0)); +// assertEquals(new Integer(1), result.productElement(1)); +// assertEquals(new Double(2.0), result.productElement(2)); +// +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("a test", result.productEl
[jira] [Created] (FLINK-4081) FieldParsers should support empty strings
Flavio Pompermaier created FLINK-4081: - Summary: FieldParsers should support empty strings Key: FLINK-4081 URL: https://issues.apache.org/jira/browse/FLINK-4081 Project: Flink Issue Type: Bug Components: Core Reporter: Flavio Pompermaier In order to parse CSV files using the new Table API that converts rows to Row objects (that support null values), FiledParser implementations should support emptry strings setting the parser state to ParseErrorState.EMPTY_STRING (for example FloatParser and DoubleParser doesn't respect this constraint) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API
[ https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333444#comment-15333444 ] ASF GitHub Bot commented on FLINK-3901: --- Github user fpompermaier commented on a diff in the pull request: https://github.com/apache/flink/pull/1989#discussion_r67314234 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java --- @@ -0,0 +1,1075 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.types.parser.StringParser; +import org.junit.Test; + +public class RowCsvInputFormatTest { + + private static final Path PATH = new Path("an/ignored/file/"); + + //Static variables for testing the removal of \r\n to \n + private static final String FIRST_PART = "That is the first part"; + + private static final String SECOND_PART = "That is the second part"; + + @Test + public void ignoreInvalidLines() { + try { + String fileContent = + "header1|header2|header3|\n"+ + "this is|1|2.0|\n"+ + "//a comment\n" + + "a test|3|4.0|\n" + + "#next|5|6.0|\n"; + + FileInputSplit split = createTempFile(fileContent); + + RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation[] { + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO + }); + CsvInputFormat format = new RowCsvInputFormat(PATH, "\n", "|", typeInfo); + format.setLenient(false); + + Configuration parameters = new Configuration(); + format.configure(parameters); + format.open(split); + + Row result = new Row(3); + + try { + result = format.nextRecord(result); + fail("Parse Exception was not thrown! (Invalid int value)"); + } catch (ParseException ex) { + } + + // if format has lenient == false this can be asserted only after FLINK-3908 +// result = format.nextRecord(result); +// assertNotNull(result); +// assertEquals("this is", result.productElement(0)); +// assertEquals(new Integer(1), result.productElement(1));
[jira] [Commented] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics
[ https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333458#comment-15333458 ] Chesnay Schepler commented on FLINK-3962: - that new exception makes no sense at all. Both the JMXReporter and JMXGaugeMBean class are clearly marked as public classes. If this were not the case we would have run into that weeks ago. there is something odd going on there. > JMXReporter doesn't properly register/deregister metrics > > > Key: FLINK-3962 > URL: https://issues.apache.org/jira/browse/FLINK-3962 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Stephan Ewen > Fix For: 1.1.0 > > > The following fails our Yarn tests because it checks for errors in the > jobmanager/taskmanager logs: > {noformat} > 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter > - A metric with the name > org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn > was already registered. > javax.management.InstanceAlreadyExistsException: > org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn > at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) > at > com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) > at > com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) > at > org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76) > at > org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191) > at > org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144) > at > org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40) > at > org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68) > at > org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74) > at > org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86) > at > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1092) > at > org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:441) > at > org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:283) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch
[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333456#comment-15333456 ] Chesnay Schepler commented on FLINK-2392: - except it is entirely unrelated to metric names. > Instable test in flink-yarn-tests > - > > Key: FLINK-2392 > URL: https://issues.apache.org/jira/browse/FLINK-2392 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10.0 >Reporter: Matthias J. Sax >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.0.0 > > > The test YARNSessionFIFOITCase fails from time to time on an irregular basis. > For example see: https://travis-ci.org/apache/flink/jobs/72019690 > {noformat} > Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) > Time elapsed: 60.651 sec <<< FAILURE! > java.lang.AssertionError: During the timeout period of 60 seconds the > expected string did not show up > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435) > Results : > Failed tests: > > YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478 > During the timeout period of 60 seconds the expected string did not show up > {noformat} > Another error case is this (see > https://travis-ci.org/mjsax/flink/jobs/77313444) > {noformat} > Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 27.356 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 17.421 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase) Time elapsed: > 11.984 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > {noformat} > Furthermore, this build failed too: > https://travis-ci.org/apache/flink/jobs/77313450 > (no error, but Travis terminated to due no progress for 300 seconds -> > deadlock?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333461#comment-15333461 ] Maximilian Michels commented on FLINK-2392: --- What do you mean? The issue is unrelated or the Exception? > Instable test in flink-yarn-tests > - > > Key: FLINK-2392 > URL: https://issues.apache.org/jira/browse/FLINK-2392 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10.0 >Reporter: Matthias J. Sax >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.0.0 > > > The test YARNSessionFIFOITCase fails from time to time on an irregular basis. > For example see: https://travis-ci.org/apache/flink/jobs/72019690 > {noformat} > Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) > Time elapsed: 60.651 sec <<< FAILURE! > java.lang.AssertionError: During the timeout period of 60 seconds the > expected string did not show up > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435) > Results : > Failed tests: > > YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478 > During the timeout period of 60 seconds the expected string did not show up > {noformat} > Another error case is this (see > https://travis-ci.org/mjsax/flink/jobs/77313444) > {noformat} > Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 27.356 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 17.421 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase) Time elapsed: > 11.984 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > {noformat} > Furthermore, this build failed too: > https://travis-ci.org/apache/flink/jobs/77313450 > (no error, but Travis terminated to due no progress for 300 seconds -> > deadlock?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333477#comment-15333477 ] Chesnay Schepler commented on FLINK-2392: - the exception is unrelated, that's why i replied directly to the comment regarding the exception and not in the main thread. Yes, the error message indicates a naming conflict. But this line is generated by us, and is generally accurate since we exclude the possibility of the other case: a non-public interface. > Instable test in flink-yarn-tests > - > > Key: FLINK-2392 > URL: https://issues.apache.org/jira/browse/FLINK-2392 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10.0 >Reporter: Matthias J. Sax >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.0.0 > > > The test YARNSessionFIFOITCase fails from time to time on an irregular basis. > For example see: https://travis-ci.org/apache/flink/jobs/72019690 > {noformat} > Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) > Time elapsed: 60.651 sec <<< FAILURE! > java.lang.AssertionError: During the timeout period of 60 seconds the > expected string did not show up > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435) > Results : > Failed tests: > > YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478 > During the timeout period of 60 seconds the expected string did not show up > {noformat} > Another error case is this (see > https://travis-ci.org/mjsax/flink/jobs/77313444) > {noformat} > Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 27.356 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 17.421 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase) Time elapsed: > 11.984 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > {noformat} > Furthermore, this build failed too: > https://travis-ci.org/apache/flink/jobs/77313450 > (no error, but Travis terminated to due no progress for 300 seconds -> > deadlock?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333483#comment-15333483 ] Chesnay Schepler commented on FLINK-2392: - are these jobs from different profiles? > Instable test in flink-yarn-tests > - > > Key: FLINK-2392 > URL: https://issues.apache.org/jira/browse/FLINK-2392 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10.0 >Reporter: Matthias J. Sax >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.0.0 > > > The test YARNSessionFIFOITCase fails from time to time on an irregular basis. > For example see: https://travis-ci.org/apache/flink/jobs/72019690 > {noformat} > Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) > Time elapsed: 60.651 sec <<< FAILURE! > java.lang.AssertionError: During the timeout period of 60 seconds the > expected string did not show up > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435) > Results : > Failed tests: > > YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478 > During the timeout period of 60 seconds the expected string did not show up > {noformat} > Another error case is this (see > https://travis-ci.org/mjsax/flink/jobs/77313444) > {noformat} > Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 27.356 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 17.421 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase) Time elapsed: > 11.984 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > {noformat} > Furthermore, this build failed too: > https://travis-ci.org/apache/flink/jobs/77313450 > (no error, but Travis terminated to due no progress for 300 seconds -> > deadlock?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2111: [FLINK-4076] BoltWrapper#dispose() should call AbstractSt...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2111 This looks good, let's see what Travis says. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4076) BoltWrapper#dispose() should call AbstractStreamOperator#dispose()
[ https://issues.apache.org/jira/browse/FLINK-4076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333487#comment-15333487 ] ASF GitHub Bot commented on FLINK-4076: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2111 This looks good, let's see what Travis says. > BoltWrapper#dispose() should call AbstractStreamOperator#dispose() > -- > > Key: FLINK-4076 > URL: https://issues.apache.org/jira/browse/FLINK-4076 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > @Override > public void dispose() { > this.bolt.cleanup(); > } > {code} > AbstractStreamOperator#dispose() should be called first. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoints
Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2108 Thank you for the review @eliaslevy! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333492#comment-15333492 ] ASF GitHub Bot commented on FLINK-4027: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2108 Thank you for the review @eliaslevy! > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333498#comment-15333498 ] ASF GitHub Bot commented on FLINK-4027: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318672 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java --- @@ -127,4 +127,11 @@ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema seriali public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { super(topicId, serializationSchema, producerConfig, customPartitioner); } + + @Override + protected void flush() { + if(this.producer != null) { --- End diff -- missing space after if > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333497#comment-15333497 ] ASF GitHub Bot commented on FLINK-4027: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318603 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java --- @@ -125,4 +125,17 @@ public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema seriali super(topicId, serializationSchema, producerConfig, customPartitioner); } + @Override + protected void flush() { + // The Kafka 0.8 producer doesn't support flushing, therefore, we are using an inefficient + // busy wait approach + while(pendingRecords > 0) { --- End diff -- missing space after while > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318603 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java --- @@ -125,4 +125,17 @@ public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema seriali super(topicId, serializationSchema, producerConfig, customPartitioner); } + @Override + protected void flush() { + // The Kafka 0.8 producer doesn't support flushing, therefore, we are using an inefficient + // busy wait approach + while(pendingRecords > 0) { --- End diff -- missing space after while --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318672 --- Diff: flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java --- @@ -127,4 +127,11 @@ public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema seriali public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, KafkaPartitioner customPartitioner) { super(topicId, serializationSchema, producerConfig, customPartitioner); } + + @Override + protected void flush() { + if(this.producer != null) { --- End diff -- missing space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333502#comment-15333502 ] ASF GitHub Bot commented on FLINK-4027: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318807 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -276,6 +315,38 @@ public void close() throws Exception { checkErroneous(); } + // --- Logic for handling checkpoint flushing -- // + + private void acknowledgeMessage() { + if(!flushOnCheckpoint) { + // the logic is disabled + return; + } + pendingRecords--; + } + + protected abstract void flush(); + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + if(flushOnCheckpoint) { + // flushing is activated: We need to wait until pendingRecords is 0 + flush(); + + if(pendingRecords != 0) { --- End diff -- space > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318807 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -276,6 +315,38 @@ public void close() throws Exception { checkErroneous(); } + // --- Logic for handling checkpoint flushing -- // + + private void acknowledgeMessage() { + if(!flushOnCheckpoint) { + // the logic is disabled + return; + } + pendingRecords--; + } + + protected abstract void flush(); + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + if(flushOnCheckpoint) { + // flushing is activated: We need to wait until pendingRecords is 0 + flush(); + + if(pendingRecords != 0) { --- End diff -- space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333501#comment-15333501 ] ASF GitHub Bot commented on FLINK-4027: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318778 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -276,6 +315,38 @@ public void close() throws Exception { checkErroneous(); } + // --- Logic for handling checkpoint flushing -- // + + private void acknowledgeMessage() { + if(!flushOnCheckpoint) { + // the logic is disabled + return; + } + pendingRecords--; + } + + protected abstract void flush(); + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + if(flushOnCheckpoint) { --- End diff -- space > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333500#comment-15333500 ] ASF GitHub Bot commented on FLINK-4027: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318738 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -261,7 +298,9 @@ public void invoke(IN next) throws Exception { } else { record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue); } - + if(flushOnCheckpoint) { --- End diff -- missing space after if > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318738 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -261,7 +298,9 @@ public void invoke(IN next) throws Exception { } else { record = new ProducerRecord<>(targetTopic, partitioner.partition(next, serializedKey, serializedValue, partitions.length), serializedKey, serializedValue); } - + if(flushOnCheckpoint) { --- End diff -- missing space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes
[ https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-4037: -- Assignee: (was: Stefan Richter) > Introduce ArchivedExecutionGraph without any user classes > - > > Key: FLINK-4037 > URL: https://issues.apache.org/jira/browse/FLINK-4037 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Reporter: Robert Metzger > > As a follow up to FLINK-4011: In order to allow the JobManager to unload all > classes from a finished job, we need to convert the ExecutionGraph (and some > attached objects like the ExecutionConfig) into a stringified version, not > containing any user classes. > The web frontend can show strings only anyways. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318778 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -276,6 +315,38 @@ public void close() throws Exception { checkErroneous(); } + // --- Logic for handling checkpoint flushing -- // + + private void acknowledgeMessage() { + if(!flushOnCheckpoint) { + // the logic is disabled + return; + } + pendingRecords--; + } + + protected abstract void flush(); + + @Override + public Serializable snapshotState(long checkpointId, long checkpointTimestamp) { + if(flushOnCheckpoint) { --- End diff -- space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318851 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -276,6 +315,38 @@ public void close() throws Exception { checkErroneous(); } + // --- Logic for handling checkpoint flushing -- // + + private void acknowledgeMessage() { + if(!flushOnCheckpoint) { --- End diff -- space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318905 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Test ensuring that the producer is not dropping buffered records + */ +@SuppressWarnings("unchecked") +public class AtLeastOnceProducerTest { + + @Test + public void testAtLeastOnceProducer() throws Exception { + runTest(true); + } + + // This test ensures that the actual test fails if the flushing is disabled + @Test(expected = AssertionError.class) + public void ensureTestFails() throws Exception { + runTest(false); + } + + private void runTest(boolean flushOnCheckpoint) throws Exception { + Properties props = new Properties(); + final TestingKafkaProducer producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props); + producer.setFlushOnCheckpoint(flushOnCheckpoint); + producer.setRuntimeContext(new MockRuntimeContext(0, 1)); + + producer.open(new Configuration()); + + for(int i = 0; i < 100; i++) { --- End diff -- missing space after for --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333503#comment-15333503 ] ASF GitHub Bot commented on FLINK-4027: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318851 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java --- @@ -276,6 +315,38 @@ public void close() throws Exception { checkErroneous(); } + // --- Logic for handling checkpoint flushing -- // + + private void acknowledgeMessage() { + if(!flushOnCheckpoint) { --- End diff -- space > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333505#comment-15333505 ] ASF GitHub Bot commented on FLINK-4027: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2108#discussion_r67318905 --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Test ensuring that the producer is not dropping buffered records + */ +@SuppressWarnings("unchecked") +public class AtLeastOnceProducerTest { + + @Test + public void testAtLeastOnceProducer() throws Exception { + runTest(true); + } + + // This test ensures that the actual test fails if the flushing is disabled + @Test(expected = AssertionError.class) + public void ensureTestFails() throws Exception { + runTest(false); + } + + private void runTest(boolean flushOnCheckpoint) throws Exception { + Properties props = new Properties(); + final TestingKafkaProducer producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props); + producer.setFlushOnCheckpoint(flushOnCheckpoint); + producer.setRuntimeContext(new MockRuntimeContext(0, 1)); + + producer.open(new Configuration()); + + for(int i = 0; i < 100; i++) { --- End diff -- missing space after for > FlinkKafkaProducer09 sink can lose messages > --- > > Key: FLINK-4027 > URL: https://issues.apache.org/jira/browse/FLINK-4027 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.0.3 >Reporter: Elias Levy >Assignee: Robert Metzger >Priority: Critical > > The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. > The producer is publishing messages asynchronously. A callback can record > publishing errors, which will be raised when detected. But as far as I can > tell, there is no barrier to wait for async errors from the sink when > checkpointing or to track the event time of acked messages to inform the > checkpointing process. > If a checkpoint occurs while there are pending publish requests, and the > requests return a failure after the checkpoint occurred, those message will > be lost as the checkpoint will consider them processed by the sink. -- Thi
[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns
[ https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333509#comment-15333509 ] ASF GitHub Bot commented on FLINK-3677: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r67319428 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FilesFilterTest.java --- @@ -0,0 +1,33 @@ +package org.apache.flink.api.common.io; --- End diff -- missing license > FileInputFormat: Allow to specify include/exclude file name patterns > > > Key: FLINK-3677 > URL: https://issues.apache.org/jira/browse/FLINK-3677 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Maximilian Michels >Assignee: Ivan Mushketyk >Priority: Minor > Labels: starter > > It would be nice to be able to specify a regular expression to filter files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r67319317 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java --- @@ -70,7 +70,17 @@ * The fraction that the last split may be larger than the others. */ private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f; - + + /** +* Patterns for file names to include +*/ + private static final String INCLUDE_PATTERNS = "fileInputFormat.include"; --- End diff -- these should be prefixed with KEY_ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns
[ https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333508#comment-15333508 ] ASF GitHub Bot commented on FLINK-3677: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r67319317 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java --- @@ -70,7 +70,17 @@ * The fraction that the last split may be larger than the others. */ private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f; - + + /** +* Patterns for file names to include +*/ + private static final String INCLUDE_PATTERNS = "fileInputFormat.include"; --- End diff -- these should be prefixed with KEY_ > FileInputFormat: Allow to specify include/exclude file name patterns > > > Key: FLINK-3677 > URL: https://issues.apache.org/jira/browse/FLINK-3677 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Maximilian Michels >Assignee: Ivan Mushketyk >Priority: Minor > Labels: starter > > It would be nice to be able to specify a regular expression to filter files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r67319428 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/FilesFilterTest.java --- @@ -0,0 +1,33 @@ +package org.apache.flink.api.common.io; --- End diff -- missing license --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns
[ https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333510#comment-15333510 ] ASF GitHub Bot commented on FLINK-3677: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r67319452 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FilesFilter.java --- @@ -0,0 +1,79 @@ +package org.apache.flink.api.common.io; --- End diff -- missing license > FileInputFormat: Allow to specify include/exclude file name patterns > > > Key: FLINK-3677 > URL: https://issues.apache.org/jira/browse/FLINK-3677 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Maximilian Michels >Assignee: Ivan Mushketyk >Priority: Minor > Labels: starter > > It would be nice to be able to specify a regular expression to filter files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2109#discussion_r67319452 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/FilesFilter.java --- @@ -0,0 +1,79 @@ +package org.apache.flink.api.common.io; --- End diff -- missing license --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2109 You marked this PR as containing documentation for new functionality, but i can't find any changes to the docs regarding the newly added configuration keys. Did you forget to include them in this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns
[ https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333514#comment-15333514 ] ASF GitHub Bot commented on FLINK-3677: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2109 You marked this PR as containing documentation for new functionality, but i can't find any changes to the docs regarding the newly added configuration keys. Did you forget to include them in this PR? > FileInputFormat: Allow to specify include/exclude file name patterns > > > Key: FLINK-3677 > URL: https://issues.apache.org/jira/browse/FLINK-3677 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.0 >Reporter: Maximilian Michels >Assignee: Ivan Mushketyk >Priority: Minor > Labels: starter > > It would be nice to be able to specify a regular expression to filter files. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3951) Add Histogram Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-3951: Assignee: Till Rohrmann > Add Histogram Metric Type > - > > Key: FLINK-3951 > URL: https://issues.apache.org/jira/browse/FLINK-3951 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Till Rohrmann > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2106: [FLINK-4063] Add Metrics Support for Triggers
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2106 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333516#comment-15333516 ] Maximilian Michels commented on FLINK-2392: --- It doesn't matter if it is a naming conflict or a malformed bean. It still seems to be an instance of incorrect registration, no? :) > Instable test in flink-yarn-tests > - > > Key: FLINK-2392 > URL: https://issues.apache.org/jira/browse/FLINK-2392 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10.0 >Reporter: Matthias J. Sax >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.0.0 > > > The test YARNSessionFIFOITCase fails from time to time on an irregular basis. > For example see: https://travis-ci.org/apache/flink/jobs/72019690 > {noformat} > Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) > Time elapsed: 60.651 sec <<< FAILURE! > java.lang.AssertionError: During the timeout period of 60 seconds the > expected string did not show up > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435) > Results : > Failed tests: > > YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478 > During the timeout period of 60 seconds the expected string did not show up > {noformat} > Another error case is this (see > https://travis-ci.org/mjsax/flink/jobs/77313444) > {noformat} > Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 27.356 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 17.421 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase) Time elapsed: > 11.984 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > {noformat} > Furthermore, this build failed too: > https://travis-ci.org/apache/flink/jobs/77313450 > (no error, but Travis terminated to due no progress for 300 seconds -> > deadlock?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4063) Add Metrics Support for Triggers
[ https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333517#comment-15333517 ] ASF GitHub Bot commented on FLINK-4063: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/2106 > Add Metrics Support for Triggers > > > Key: FLINK-4063 > URL: https://issues.apache.org/jira/browse/FLINK-4063 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > Now that we have proper support for metrics we should also add a hook that > allows triggers to report metrics. > This supersedes FLINK-3758 which was about using accumulators for metrics in > triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4063) Add Metrics Support for Triggers
[ https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek resolved FLINK-4063. - Resolution: Fixed Fix Version/s: 1.1.0 Implemented here: https://github.com/apache/flink/commit/10495852370fed3b7683d428f6c2cc7470b3d8ed > Add Metrics Support for Triggers > > > Key: FLINK-4063 > URL: https://issues.apache.org/jira/browse/FLINK-4063 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.1.0 > > > Now that we have proper support for metrics we should also add a hook that > allows triggers to report metrics. > This supersedes FLINK-3758 which was about using accumulators for metrics in > triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-3949) Collect Metrics in Runtime Operators
[ https://issues.apache.org/jira/browse/FLINK-3949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-3949: --- Assignee: Chesnay Schepler > Collect Metrics in Runtime Operators > > > Key: FLINK-3949 > URL: https://issues.apache.org/jira/browse/FLINK-3949 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Stephan Ewen >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3951) Add Histogram Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333533#comment-15333533 ] ASF GitHub Bot commented on FLINK-3951: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2112 [FLINK-3951] Add histogram metric type This PR introduces an interface for Flink's histogram metric type `Histogram` and the corresponding statistics `HistogramStatistics`. The `Histogram` interface allows to update the histogram with new values , retrieve the number of collected values and to obtain a `HistogramStatistics` object which represents the current statistics of the histogram. The values must be of type long. The `HistogramStatistics` allows to compute the mean, min, max and values for quantiles based on the element distribution calculated by the histogram. An implementation of the `Histogram` interface is contained in the flink-metrics-dropwizard module. The implementation wraps a dropwizard histogram via the `DropwizardHistogramWrapper` class. This is currently the only implementation of `Histogram`. If needed, we can add a histogram implementation to flink-core later. But for the moment histograms are intended to be used as user metrics. Histograms add computational overhead which should not be imposed on the runtime per default. The histogram' statistics are reported by all currently supported `MetricReporters`. The `JMXReporter` introduces a `JmxHistogramMBean` interface for this purpose. The `StatsDReporter` reports the individual values of the `HistogramStatistics` object as gauge values. The `ScheduledDropwizardReporter` wraps the `Histogram` in a `HistogramWrapper` and the `HistogramStatistics` in a `HistogramStatisticsWrapper` so that they can be given to Dropwizards' own reporters. As an optimization, the `ScheduledDropwizardReporter` detects a `DropwizardHistogramWrapper` and unwraps the dropwizard histogram which is then used for reporting. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink addHistogram Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2112.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2112 commit c3e8c315a56d5a25493f56d9713a968708e29b93 Author: Till Rohrmann Date: 2016-06-14T17:04:43Z Add histogram interface commit fd9fba8b00eef645def43c9b76d4dd606a9303a5 Author: Till Rohrmann Date: 2016-06-14T17:11:24Z Rename flink-metric-reporters into flink-metrics commit 796bf6964800b891413f11c2c366bb94758de4e8 Author: Till Rohrmann Date: 2016-06-15T12:02:20Z Add dropwizard histogram wrapper commit 9b7d9c324246a4a99a6de5ad18571a68564bfe9c Author: Till Rohrmann Date: 2016-06-15T12:16:26Z Add histogram to MetricGroup commit a782277c63065c297ef1a7a7b8e20e4260791945 Author: Till Rohrmann Date: 2016-06-15T13:37:34Z Add histogram support to existing reporters commit 38f3d82d056921bb54e71a0bf4aa11a5ed879fe8 Author: Till Rohrmann Date: 2016-06-15T15:47:48Z Add DropwizardHistogramWrapperTest commit 35f907ce977b2eaa09e78905e40fae820aad1ba7 Author: Till Rohrmann Date: 2016-06-15T16:17:19Z Add JMXReporter histogram test commit 832b83c90a03d9f358ed234ebfe730c83a45a41b Author: Till Rohrmann Date: 2016-06-16T10:09:06Z Add statsD reporter histogram test > Add Histogram Metric Type > - > > Key: FLINK-3951 > URL: https://issues.apache.org/jira/browse/FLINK-3951 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Till Rohrmann > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2112: [FLINK-3951] Add histogram metric type
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2112 [FLINK-3951] Add histogram metric type This PR introduces an interface for Flink's histogram metric type `Histogram` and the corresponding statistics `HistogramStatistics`. The `Histogram` interface allows to update the histogram with new values , retrieve the number of collected values and to obtain a `HistogramStatistics` object which represents the current statistics of the histogram. The values must be of type long. The `HistogramStatistics` allows to compute the mean, min, max and values for quantiles based on the element distribution calculated by the histogram. An implementation of the `Histogram` interface is contained in the flink-metrics-dropwizard module. The implementation wraps a dropwizard histogram via the `DropwizardHistogramWrapper` class. This is currently the only implementation of `Histogram`. If needed, we can add a histogram implementation to flink-core later. But for the moment histograms are intended to be used as user metrics. Histograms add computational overhead which should not be imposed on the runtime per default. The histogram' statistics are reported by all currently supported `MetricReporters`. The `JMXReporter` introduces a `JmxHistogramMBean` interface for this purpose. The `StatsDReporter` reports the individual values of the `HistogramStatistics` object as gauge values. The `ScheduledDropwizardReporter` wraps the `Histogram` in a `HistogramWrapper` and the `HistogramStatistics` in a `HistogramStatisticsWrapper` so that they can be given to Dropwizards' own reporters. As an optimization, the `ScheduledDropwizardReporter` detects a `DropwizardHistogramWrapper` and unwraps the dropwizard histogram which is then used for reporting. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink addHistogram Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2112.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2112 commit c3e8c315a56d5a25493f56d9713a968708e29b93 Author: Till Rohrmann Date: 2016-06-14T17:04:43Z Add histogram interface commit fd9fba8b00eef645def43c9b76d4dd606a9303a5 Author: Till Rohrmann Date: 2016-06-14T17:11:24Z Rename flink-metric-reporters into flink-metrics commit 796bf6964800b891413f11c2c366bb94758de4e8 Author: Till Rohrmann Date: 2016-06-15T12:02:20Z Add dropwizard histogram wrapper commit 9b7d9c324246a4a99a6de5ad18571a68564bfe9c Author: Till Rohrmann Date: 2016-06-15T12:16:26Z Add histogram to MetricGroup commit a782277c63065c297ef1a7a7b8e20e4260791945 Author: Till Rohrmann Date: 2016-06-15T13:37:34Z Add histogram support to existing reporters commit 38f3d82d056921bb54e71a0bf4aa11a5ed879fe8 Author: Till Rohrmann Date: 2016-06-15T15:47:48Z Add DropwizardHistogramWrapperTest commit 35f907ce977b2eaa09e78905e40fae820aad1ba7 Author: Till Rohrmann Date: 2016-06-15T16:17:19Z Add JMXReporter histogram test commit 832b83c90a03d9f358ed234ebfe730c83a45a41b Author: Till Rohrmann Date: 2016-06-16T10:09:06Z Add statsD reporter histogram test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests
[ https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333536#comment-15333536 ] Chesnay Schepler commented on FLINK-2392: - Yes it is indeed related to registering metrics. But the issue mentioned so far in FLINK-3962 were related to naming. I would prefer creating a new issue since the chances of FLINK-3962 randomly fixing this issue as well are slim. They are also only tangentially related based on where they occur, the actual cause (and thus, fix) are most likely completely different. > Instable test in flink-yarn-tests > - > > Key: FLINK-2392 > URL: https://issues.apache.org/jira/browse/FLINK-2392 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 0.10.0 >Reporter: Matthias J. Sax >Assignee: Robert Metzger >Priority: Critical > Labels: test-stability > Fix For: 1.0.0 > > > The test YARNSessionFIFOITCase fails from time to time on an irregular basis. > For example see: https://travis-ci.org/apache/flink/jobs/72019690 > {noformat} > Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) > Time elapsed: 60.651 sec <<< FAILURE! > java.lang.AssertionError: During the timeout period of 60 seconds the > expected string did not show up > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435) > Results : > Failed tests: > > YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478 > During the timeout period of 60 seconds the expected string did not show up > {noformat} > Another error case is this (see > https://travis-ci.org/mjsax/flink/jobs/77313444) > {noformat} > Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec > <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase > testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 27.356 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase) Time > elapsed: 17.421 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase) Time elapsed: > 11.984 sec <<< FAILURE! > java.lang.AssertionError: Found a file > /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081] > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294) > at > org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94) > {noformat} > Furthermore, this build failed too: > https://travis-ci.org/apache/flink/jobs/77313450 > (no error, but Travis terminated to due no progress for 300 seconds -> > deadlock?) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2090: Expose several new Metrics
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2090 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-3949) Collect Metrics in Runtime Operators
[ https://issues.apache.org/jira/browse/FLINK-3949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-3949. --- Resolution: Fixed implemented in c78b3c49e0e82874cbfa71e88bf28b99ed395610. If additional metrics are to be collected a new specific issue should be created. > Collect Metrics in Runtime Operators > > > Key: FLINK-3949 > URL: https://issues.apache.org/jira/browse/FLINK-3949 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Stephan Ewen >Assignee: Chesnay Schepler > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)