[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-16 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67295348
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("this is", result.productElement(0));
+// assertEquals(new Integer(1), result.productElement(1));
+// assertEquals(new Double(2.0), result.productElement(2));
+//
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("a test", result.productEl

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333241#comment-15333241
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67295348
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("this is", result.productElement(0));
+// assertEquals(new Integer(1), result.productElement(1));
   

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-16 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67295515
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-16 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67295498
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333244#comment-15333244
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67295498
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
--- End diff --

ok


> Create a RowCsvInputFormat to use as default CSV IF in Table API
> 
>
> Key: FLINK-3901
> URL: https://issues.apache.org/jira/browse/FLINK-3901
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>Priority: Minor
>  Labels: csv, null-values, row, tuple
>
> At the moment the Table APIs reads CSVs using the TupleCsvInputFormat, that 
> has the big limitation of 25 fields and null handling.
> A new IF producing Row object is indeed necessary to avoid those limitations



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333246#comment-15333246
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67295515
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
--- End diff --

ok


> Create a RowCsvInputFormat to use as default CSV IF in Table API
> 
>
> Key: FLINK-3901
> URL: https://issues.apache.org/jira/browse/FLINK-3901
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>Priority: Minor
>  Labels: csv, null-value

[jira] [Updated] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread Konstantin Knauf (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-4027:

Priority: Major  (was: Critical)

> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread Konstantin Knauf (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-4027:

Priority: Critical  (was: Major)

> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-16 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67295853
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+   if (reuse == null) {
+   reuse = new Row(rowSerializer.getLength());
+   }
+   for (int i = 0; i < parsedValues.length; i++) {
+   reuse.setField(i, parsedValues[i]);
+ 

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333253#comment-15333253
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67295853
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-16 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67296019
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+   if (reuse == null) {
+   reuse = new Row(rowSerializer.getLength());
+   }
+   for (int i = 0; i < parsedValues.length; i++) {
+   reuse.setField(i, parsedValues[i]);
+ 

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333255#comment-15333255
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67296019
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+

[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333260#comment-15333260
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67296678
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -276,6 +314,41 @@ public void close() throws Exception {
checkErroneous();
}
 
+   // --- Logic for handling checkpoint flushing 
-- //
+
+   private void acknowledgeMessage() {
+   if(!flushOnCheckpoint) {
+   // the logic is disabled
+   return;
+   }
+   pendingRecords--;
+   }
+
+   @Override
+   public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+   if(flushOnCheckpoint) {
+   // flushing is activated: We need to wait until 
pendingRecords is 0
+   while(pendingRecords > 0) {
+   try {
+   Thread.sleep(10);
--- End diff --

The problem is that the `flush()` method is only implemented by the Kafka 
0.9 producer, not by the 0.8 implementation.
As you can see from the classname, its the shared base class between the 
two version specific implementations. I think for the 0.8 producer, there is no 
way around the waiting approach.

I'll update the pull request to call `flush()` on the 0.9 producer.


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67296678
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -276,6 +314,41 @@ public void close() throws Exception {
checkErroneous();
}
 
+   // --- Logic for handling checkpoint flushing 
-- //
+
+   private void acknowledgeMessage() {
+   if(!flushOnCheckpoint) {
+   // the logic is disabled
+   return;
+   }
+   pendingRecords--;
+   }
+
+   @Override
+   public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+   if(flushOnCheckpoint) {
+   // flushing is activated: We need to wait until 
pendingRecords is 0
+   while(pendingRecords > 0) {
+   try {
+   Thread.sleep(10);
--- End diff --

The problem is that the `flush()` method is only implemented by the Kafka 
0.9 producer, not by the 0.8 implementation.
As you can see from the classname, its the shared base class between the 
two version specific implementations. I think for the 0.8 producer, there is no 
way around the waiting approach.

I'll update the pull request to call `flush()` on the 0.9 producer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67297138
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

The line should be
```
private val emptyTupleData = List[(Int, Long, String, Long, Int)]()
```
by `List[X]` you define the type to be a list of type `X`. `(Int, Long, 
String)` is a shortcut to define the type for a `scala.Tuple3[Int, Long, 
String]`. The final `()` creates an empty list.
You can also remove the `new CustomType()` from the next line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333266#comment-15333266
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67297138
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

The line should be
```
private val emptyTupleData = List[(Int, Long, String, Long, Int)]()
```
by `List[X]` you define the type to be a list of type `X`. `(Int, Long, 
String)` is a shortcut to define the type for a `scala.Tuple3[Int, Long, 
String]`. The final `()` creates an empty list.
You can also remove the `new CustomType()` from the next line.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67297990
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

Yes. I just now updated it reading some where. One more test is failing 
checking that and then wil push the PR. I followed the steps that you told. So 
now all my squashed commits are in a new branch and I will push that forcefully.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333278#comment-15333278
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/1856#discussion_r67297990
  
--- Diff: 
flink-scala/src/test/scala/org/apache/flink/api/operator/MaxByOperatorTest.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.operator
+
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.scala._
+import org.junit.Test
+import org.junit.Assert
+
+class MaxByOperatorTest {
+  private val emptyTupleData = List[scala.Tuple5[Int, Long, String, Long, 
Int]]()
--- End diff --

Yes. I just now updated it reading some where. One more test is failing 
checking that and then wil push the PR. I followed the steps that you told. So 
now all my squashed commits are in a new branch and I will push that forcefully.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67298280
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+   if (reuse == null) {
+   reuse = new Row(rowSerializer.getLength());
+   }
+   for (int i = 0; i < parsedValues.length; i++) {
+   reuse.setField(i, parsedValues[i]);
+   }
 

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333284#comment-15333284
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67298280
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+ 

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333290#comment-15333290
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67299243
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("this is", result.productElement(0));
+// assertEquals(new Integer(1), result.productElement(1));
+// 

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67299243
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("this is", result.productElement(0));
+// assertEquals(new Integer(1), result.productElement(1));
+// assertEquals(new Double(2.0), result.productElement(2));
+//
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("a test", result.productElement

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-16 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67299396
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("this is", result.productElement(0));
+// assertEquals(new Integer(1), result.productElement(1));
+// assertEquals(new Double(2.0), result.productElement(2));
+//
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("a test", result.productEl

[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67299477
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+   if (reuse == null) {
+   reuse = new Row(rowSerializer.getLength());
+   }
+   for (int i = 0; i < parsedValues.length; i++) {
+   reuse.setField(i, parsedValues[i]);
+   }
 

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333292#comment-15333292
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67299396
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("this is", result.productElement(0));
+// assertEquals(new Integer(1), result.productElement(1));
   

[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333293#comment-15333293
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67299477
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowSerializer;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.FieldParser.ParseErrorState;
+
+@Internal
+public class RowCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private RowSerializer rowSerializer;
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   int[] includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null) ? 
createDefaultMask(rowTypeInfo.getArity())
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, mask);
+   }
+
+   public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
+   }
+
+   public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
+   boolean[] includedFieldsMask) {
+   super(filePath);
+   configure(lineDelimiter, fieldDelimiter, rowTypeInfo, 
includedFieldsMask);
+   }
+
+   private void configure(String lineDelimiter, String fieldDelimiter,
+   RowTypeInfo rowTypeInfo, boolean[] includedFieldsMask) {
+
+   if (rowTypeInfo.getArity() == 0) {
+   throw new IllegalArgumentException("Row arity must be 
greater than 0.");
+   }
+
+   if (includedFieldsMask == null) {
+   includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
+   }
+
+   rowSerializer = (RowSerializer) 
rowTypeInfo.createSerializer(new ExecutionConfig());
+
+   setDelimiter(lineDelimiter);
+   setFieldDelimiter(fieldDelimiter);
+
+   Class[] classes = new Class[rowTypeInfo.getArity()];
+
+   for (int i = 0; i < rowTypeInfo.getArity(); i++) {
+   classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldsGeneric(includedFieldsMask, classes);
+   }
+
+   @Override
+   public Row fillRecord(Row reuse, Object[] parsedValues) {
+ 

[GitHub] flink pull request #2100: [FLINK-4024] FileSourceFunction not adjusted to ne...

2016-06-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2100


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2007: [FLINK-3908] Fixed Parser's error state reset

2016-06-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2007


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333298#comment-15333298
 ] 

ASF GitHub Bot commented on FLINK-4024:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2100


> FileSourceFunction not adjusted to new IF lifecycle
> ---
>
> Key: FLINK-4024
> URL: https://issues.apache.org/jira/browse/FLINK-4024
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.1.0
>
>
> The InputFormat lifecycle was extended in 
> ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional 
> open-/closeInputFormat() methods.
> The streaming FileSourceFunction was not adjusted for this change, and thus 
> will fail for every InputFormat that leverages these new methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3908) FieldParsers error state is not reset correctly to NONE

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333299#comment-15333299
 ] 

ASF GitHub Bot commented on FLINK-3908:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2007


> FieldParsers error state is not reset correctly to NONE
> ---
>
> Key: FLINK-3908
> URL: https://issues.apache.org/jira/browse/FLINK-3908
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>  Labels: parser
>
> If during the parse of a csv there's a parse error (for example when in a 
> integer column there are non-int values) the errorState is not reset 
> correctly in the next parseField call. A simple fix would be to add as a 
> first statement of the {{parseField()}} function a call to 
> {{setErrorState(ParseErrorState.NONE)}} but it is something that should be 
> handled better (by default) for every subclass of {{FieldParser}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4024) FileSourceFunction not adjusted to new IF lifecycle

2016-06-16 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske resolved FLINK-4024.
--
Resolution: Fixed

Fixed with fe0eb602d2e908d116d71018bb141e5c748c780b

> FileSourceFunction not adjusted to new IF lifecycle
> ---
>
> Key: FLINK-4024
> URL: https://issues.apache.org/jira/browse/FLINK-4024
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.1.0
>
>
> The InputFormat lifecycle was extended in 
> ac2137cfa5e63bd4f53a4b7669dc591ab210093f, adding additional 
> open-/closeInputFormat() methods.
> The streaming FileSourceFunction was not adjusted for this change, and thus 
> will fail for every InputFormat that leverages these new methods.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3908) FieldParsers error state is not reset correctly to NONE

2016-06-16 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3908.

   Resolution: Fixed
Fix Version/s: 1.1.0

Fixed with d8747978386c0aacce1dc2631e1c5602ec9b2a00

> FieldParsers error state is not reset correctly to NONE
> ---
>
> Key: FLINK-3908
> URL: https://issues.apache.org/jira/browse/FLINK-3908
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Flavio Pompermaier
>  Labels: parser
> Fix For: 1.1.0
>
>
> If during the parse of a csv there's a parse error (for example when in a 
> integer column there are non-int values) the errorState is not reset 
> correctly in the next parseField call. A simple fix would be to add as a 
> first statement of the {{parseField()}} function a call to 
> {{setErrorState(ParseErrorState.NONE)}} but it is something that should be 
> handled better (by default) for every subclass of {{FieldParser}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-06-16 Thread mushketyk
GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/2109

[FLINK-3677] FileInputFormat: Allow to specify include/exclude file name 
patterns

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink file-include

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2109


commit f45dadf90322cc180b1afe1dc91d83a3aced7e22
Author: Ivan Mushketyk 
Date:   2016-06-14T21:44:19Z

[FLINK-3677] Add FileMatcher class

commit 8d4a2a72020ee19fe70d06ded953af67b6ed487d
Author: Ivan Mushketyk 
Date:   2016-06-15T20:47:43Z

[FLINK-3677] FileInputFormat: Allow to specify include/exclude file name 
patterns

commit 350527bb6c1bf7a6e9d252bb921125249103635c
Author: Ivan Mushketyk 
Date:   2016-06-15T20:48:13Z

[FLINK-3677] Rename test

commit 6045fe79e192aca6e97f568351927a17e0c64d09
Author: Ivan Mushketyk 
Date:   2016-06-15T21:07:55Z

[FLINK-3677] Tests refactoring




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1507#comment-1507
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

GitHub user mushketyk opened a pull request:

https://github.com/apache/flink/pull/2109

[FLINK-3677] FileInputFormat: Allow to specify include/exclude file name 
patterns

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mushketyk/flink file-include

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2109


commit f45dadf90322cc180b1afe1dc91d83a3aced7e22
Author: Ivan Mushketyk 
Date:   2016-06-14T21:44:19Z

[FLINK-3677] Add FileMatcher class

commit 8d4a2a72020ee19fe70d06ded953af67b6ed487d
Author: Ivan Mushketyk 
Date:   2016-06-15T20:47:43Z

[FLINK-3677] FileInputFormat: Allow to specify include/exclude file name 
patterns

commit 350527bb6c1bf7a6e9d252bb921125249103635c
Author: Ivan Mushketyk 
Date:   2016-06-15T20:48:13Z

[FLINK-3677] Rename test

commit 6045fe79e192aca6e97f568351927a17e0c64d09
Author: Ivan Mushketyk 
Date:   2016-06-15T21:07:55Z

[FLINK-3677] Tests refactoring




> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4053) Return value from Connection should be checked against null

2016-06-16 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1523#comment-1523
 ] 

Ivan Mushketyk commented on FLINK-4053:
---

I'll fix this.

> Return value from Connection should be checked against null
> ---
>
> Key: FLINK-4053
> URL: https://issues.apache.org/jira/browse/FLINK-4053
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> In RMQSource.java and RMQSink.java, there is code in the following pattern:
> {code}
>   connection = factory.newConnection();
>   channel = connection.createChannel();
> {code}
> According to 
> https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#createChannel()
>  :
> {code}
> Returns:
> a new channel descriptor, or null if none is available
> {code}
> The return value should be checked against null.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4053) Return value from Connection should be checked against null

2016-06-16 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-4053:
-

Assignee: Ivan Mushketyk

> Return value from Connection should be checked against null
> ---
>
> Key: FLINK-4053
> URL: https://issues.apache.org/jira/browse/FLINK-4053
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> In RMQSource.java and RMQSink.java, there is code in the following pattern:
> {code}
>   connection = factory.newConnection();
>   channel = connection.createChannel();
> {code}
> According to 
> https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Connection.html#createChannel()
>  :
> {code}
> Returns:
> a new channel descriptor, or null if none is available
> {code}
> The return value should be checked against null.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-16 Thread Ivan Mushketyk (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1526#comment-1526
 ] 

Ivan Mushketyk commented on FLINK-3943:
---

I'll work on this.

> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3943) Add support for EXCEPT (set minus)

2016-06-16 Thread Ivan Mushketyk (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Mushketyk reassigned FLINK-3943:
-

Assignee: Ivan Mushketyk

> Add support for EXCEPT (set minus)
> --
>
> Key: FLINK-3943
> URL: https://issues.apache.org/jira/browse/FLINK-3943
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Currently, the Table API and SQL do not support EXCEPT.
> EXCEPT can be executed as a coGroup on all fields that forwards records of 
> the first input if the second input is empty.
> In order to add support for EXCEPT to the Table API and SQL we need to:
> - Implement a {{DataSetMinus}} class that translates an EXCEPT into a DataSet 
> API program using a coGroup on all fields.
> - Implement a {{DataSetMinusRule}} that translates a Calcite {{LogicalMinus}} 
> into a {{DataSetMinus}}.
> - Extend the Table API (and validation phase) to provide an except() method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3650) Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1586#comment-1586
 ] 

ASF GitHub Bot commented on FLINK-3650:
---

Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/1856
  
Force pushed as per your advice @fhueske. Ran mvn clean verify and there 
are no warnings generated.


> Add maxBy/minBy to Scala DataSet API
> 
>
> Key: FLINK-3650
> URL: https://issues.apache.org/jira/browse/FLINK-3650
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API, Scala API
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: ramkrishna.s.vasudevan
>
> The stable Java DataSet API contains the API calls {{maxBy}} and {{minBy}}. 
> These methods are not supported by the Scala DataSet API. These methods 
> should be added in order to have a consistent API.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1856: FLINK-3650 Add maxBy/minBy to Scala DataSet API

2016-06-16 Thread ramkrish86
Github user ramkrish86 commented on the issue:

https://github.com/apache/flink/pull/1856
  
Force pushed as per your advice @fhueske. Ran mvn clean verify and there 
are no warnings generated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3344) EventTimeWindowCheckpointingITCase.testPreAggregatedTumblingTimeWindow

2016-06-16 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333416#comment-15333416
 ] 

Till Rohrmann commented on FLINK-3344:
--

Another instance, but this time the error was: {{Caused by: 
java.lang.AssertionError: Window start: 600 end: 700 expected:<64950> but 
was:<21872>}}

https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748030/log.txt

> EventTimeWindowCheckpointingITCase.testPreAggregatedTumblingTimeWindow
> --
>
> Key: FLINK-3344
> URL: https://issues.apache.org/jira/browse/FLINK-3344
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Matthias J. Sax
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: test-stability
>
> https://travis-ci.org/apache/flink/jobs/107198388
> https://travis-ci.org/mjsax/flink/jobs/107198383
> {noformat}
> Maven produced no output for 300 seconds.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3974) enableObjectReuse fails when an operator chains to multiple downstream operators

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333422#comment-15333422
 ] 

ASF GitHub Bot commented on FLINK-3974:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2110

[FLINK-3974] Fix object reuse with multi-chaining

Before, a job would fail if object reuse was enabled and multiple
operators were chained to one upstream operator. Now, we always create a
shallow copy of the StreamRecord in OperatorChain.ChainingOutput because
downstream operations change/reuse the StreamRecord.

This fix was contributed by @wanderingbort (if this is the right github 
handle) as a patch on the Flink Jira. I can change the commit to attribute it 
to him but so far he didn't respond to my question about this on Jira.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink chaining/fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2110


commit 092f350cccbda32331f527c4eaf7ad3304fa1811
Author: Aljoscha Krettek 
Date:   2016-06-14T10:18:35Z

[FLINK-3974] Fix object reuse with multi-chaining

Before, a job would fail if object reuse was enabled and multiple
operators were chained to one upstream operator. Now, we always create a
shallow copy of the StreamRecord in OperatorChain.ChainingOutput because
downstream operations change/reuse the StreamRecord.




> enableObjectReuse fails when an operator chains to multiple downstream 
> operators
> 
>
> Key: FLINK-3974
> URL: https://issues.apache.org/jira/browse/FLINK-3974
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.0.3
>Reporter: B Wyatt
> Attachments: ReproFLINK3974.java, bwyatt-FLINK3974.1.patch
>
>
> Given a topology that looks like this:
> {code:java}
> DataStream input = ...
> input
> .map(MapFunction...)
> .addSink(...);
> input
> .map(MapFunction...)
> ​.addSink(...);
> {code}
> enableObjectReuse() will cause an exception in the form of 
> {{"java.lang.ClassCastException: B cannot be cast to A"}} to be thrown.
> It looks like the input operator calls {{Output>.collect}} 
> which attempts to loop over the downstream operators and process them.
> However, the first map operation will call {{StreamRecord<>.replace}} which 
> mutates the value stored in the StreamRecord<>.  
> As a result, when the {{Output>.collect}} call passes the 
> {{StreamRecord}} to the second map operation it is actually a 
> {{StreamRecord}} and behaves as if the two map operations were serial 
> instead of parallel.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4075) ContinuousFileProcessingCheckpointITCase failed on Travis

2016-06-16 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333420#comment-15333420
 ] 

Till Rohrmann commented on FLINK-4075:
--

Another instance: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748033/log.txt

> ContinuousFileProcessingCheckpointITCase failed on Travis
> -
>
> Key: FLINK-4075
> URL: https://issues.apache.org/jira/browse/FLINK-4075
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The test case {{ContinuousFileProcessingCheckpointITCase}} failed on Travis.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748004/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2110: [FLINK-3974] Fix object reuse with multi-chaining

2016-06-16 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/2110

[FLINK-3974] Fix object reuse with multi-chaining

Before, a job would fail if object reuse was enabled and multiple
operators were chained to one upstream operator. Now, we always create a
shallow copy of the StreamRecord in OperatorChain.ChainingOutput because
downstream operations change/reuse the StreamRecord.

This fix was contributed by @wanderingbort (if this is the right github 
handle) as a patch on the Flink Jira. I can change the commit to attribute it 
to him but so far he didn't respond to my question about this on Jira.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink chaining/fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2110


commit 092f350cccbda32331f527c4eaf7ad3304fa1811
Author: Aljoscha Krettek 
Date:   2016-06-14T10:18:35Z

[FLINK-3974] Fix object reuse with multi-chaining

Before, a job would fail if object reuse was enabled and multiple
operators were chained to one upstream operator. Now, we always create a
shallow copy of the StreamRecord in OperatorChain.ChainingOutput because
downstream operations change/reuse the StreamRecord.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3813) YARNSessionFIFOITCase.testDetachedMode failed on Travis

2016-06-16 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333424#comment-15333424
 ] 

Till Rohrmann commented on FLINK-3813:
--

Another instance: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/137748041/log.txt

> YARNSessionFIFOITCase.testDetachedMode failed on Travis
> ---
>
> Key: FLINK-3813
> URL: https://issues.apache.org/jira/browse/FLINK-3813
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{YARNSessionFIFOITCase.testDetachedMode}} failed on Travis.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/125560038/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests

2016-06-16 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333425#comment-15333425
 ] 

Till Rohrmann commented on FLINK-2392:
--

More instances of failing yarn tests:

https://s3.amazonaws.com/archive.travis-ci.org/jobs/137801198/log.txt
https://s3.amazonaws.com/archive.travis-ci.org/jobs/137801197/log.txt

> Instable test in flink-yarn-tests
> -
>
> Key: FLINK-2392
> URL: https://issues.apache.org/jira/browse/FLINK-2392
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10.0
>Reporter: Matthias J. Sax
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.0.0
>
>
> The test YARNSessionFIFOITCase fails from time to time on an irregular basis. 
> For example see: https://travis-ci.org/apache/flink/jobs/72019690
> {noformat}
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) 
>  Time elapsed: 60.651 sec  <<< FAILURE!
> java.lang.AssertionError: During the timeout period of 60 seconds the 
> expected string did not show up
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435)
> Results :
> Failed tests: 
>   
> YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478
>  During the timeout period of 60 seconds the expected string did not show up
> {noformat}
> Another error case is this (see 
> https://travis-ci.org/mjsax/flink/jobs/77313444)
> {noformat}
> Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 27.356 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 17.421 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time elapsed: 
> 11.984 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> {noformat}
> Furthermore, this build failed too: 
> https://travis-ci.org/apache/flink/jobs/77313450
> (no error, but Travis terminated to due no progress for 300 seconds -> 
> deadlock?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2111: [FLINK-4076] BoltWrapper#dispose() should call Abs...

2016-06-16 Thread f7753
GitHub user f7753 opened a pull request:

https://github.com/apache/flink/pull/2111

[FLINK-4076] BoltWrapper#dispose() should call 
AbstractStreamOperator#dispose()

Add missing method `super.close()` that should be called in sub class 
`org.apache.flink.storm.wrappers.BoltWrapper` 's override method .

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/f7753/flink flink4076

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2111


commit 0c97618dd8f80ae45a2bb9dce3c85e2014c0b481
Author: MaBiao 
Date:   2016-06-16T09:17:41Z

[FLINK-4076] add spuer.dispose() in class BoltWrapper




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4076) BoltWrapper#dispose() should call AbstractStreamOperator#dispose()

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333432#comment-15333432
 ] 

ASF GitHub Bot commented on FLINK-4076:
---

GitHub user f7753 opened a pull request:

https://github.com/apache/flink/pull/2111

[FLINK-4076] BoltWrapper#dispose() should call 
AbstractStreamOperator#dispose()

Add missing method `super.close()` that should be called in sub class 
`org.apache.flink.storm.wrappers.BoltWrapper` 's override method .

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/f7753/flink flink4076

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2111


commit 0c97618dd8f80ae45a2bb9dce3c85e2014c0b481
Author: MaBiao 
Date:   2016-06-16T09:17:41Z

[FLINK-4076] add spuer.dispose() in class BoltWrapper




> BoltWrapper#dispose() should call AbstractStreamOperator#dispose()
> --
>
> Key: FLINK-4076
> URL: https://issues.apache.org/jira/browse/FLINK-4076
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   @Override
>   public void dispose() {
> this.bolt.cleanup();
>   }
> {code}
> AbstractStreamOperator#dispose() should be called first.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests

2016-06-16 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333437#comment-15333437
 ] 

Maximilian Michels commented on FLINK-2392:
---

The logs fail because the following Exception is logged:

{noformat}
2016-06-16 04:58:37,218 ERROR org.apache.flink.metrics.reporter.JMXReporter 
- Metric did not comply with JMX MBean naming rules.
javax.management.NotCompliantMBeanException: Interface is not public: 
org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean
at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114)
at 
com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102)
at 
com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67)
at 
com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192)
at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138)
at 
com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60)
at 
com.sun.jmx.mbeanserver.Introspector.makeDynamicMBean(Introspector.java:192)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:898)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at 
org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:113)
at 
org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:174)
at 
org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:206)
at 
org.apache.flink.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:162)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.instantiateClassLoaderMetrics(TaskManager.scala:2298)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.org$apache$flink$runtime$taskmanager$TaskManager$$instantiateStatusMetrics(TaskManager.scala:2287)
at 
org.apache.flink.runtime.taskmanager.TaskManager.associateWithJobManager(TaskManager.scala:951)
at 
org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRegistrationMessage(TaskManager.scala:631)
at 
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:297)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-06-16 04:58:37,221 ERROR org.apache.flink.metrics.reporter.JMXReporter 
- Metric did not comply with JMX MBean naming rules.
javax.management.NotCompliantMBeanException: Interface is not public: 
org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean
at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114)
at 
com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102)
at 
com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67)
at 
com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192)
at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138)
at 
com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60)
at 
com.sun.jmx.mbeanserver.Introspector.makeDynamicMBean(Introspector.java:192)
at 
com.sun.jmx.i

[jira] [Comment Edited] (FLINK-2392) Instable test in flink-yarn-tests

2016-06-16 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333437#comment-15333437
 ] 

Maximilian Michels edited comment on FLINK-2392 at 6/16/16 9:26 AM:


The builds fail because the following Exception is logged:

{noformat}
2016-06-16 04:58:37,218 ERROR org.apache.flink.metrics.reporter.JMXReporter 
- Metric did not comply with JMX MBean naming rules.
javax.management.NotCompliantMBeanException: Interface is not public: 
org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean
at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114)
at 
com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102)
at 
com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67)
at 
com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192)
at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138)
at 
com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60)
at 
com.sun.jmx.mbeanserver.Introspector.makeDynamicMBean(Introspector.java:192)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:898)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at 
org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:113)
at 
org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:174)
at 
org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:206)
at 
org.apache.flink.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:162)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.instantiateClassLoaderMetrics(TaskManager.scala:2298)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.org$apache$flink$runtime$taskmanager$TaskManager$$instantiateStatusMetrics(TaskManager.scala:2287)
at 
org.apache.flink.runtime.taskmanager.TaskManager.associateWithJobManager(TaskManager.scala:951)
at 
org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRegistrationMessage(TaskManager.scala:631)
at 
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:297)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-06-16 04:58:37,221 ERROR org.apache.flink.metrics.reporter.JMXReporter 
- Metric did not comply with JMX MBean naming rules.
javax.management.NotCompliantMBeanException: Interface is not public: 
org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean
at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114)
at 
com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102)
at 
com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67)
at 
com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192)
at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138)
at 
com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60)
at 
com.sun.jmx.mbeanserver.Introspector.makeDynamicMBe

[GitHub] flink pull request #2093: [FLINK-3714] Add Support for "Allowed Lateness"

2016-06-16 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2093#discussion_r67313976
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,9 +132,9 @@
 
/**
 * The allowed lateness for elements. This is used for:
-*  Deciding if an element should be dropped from a window due to 
lateness. 
-*  Clearing the state of a window if the system time passes
-* the {@code window.maxTimestamp + allowedLateness} landmark. 
+*  Deciding if an element should be dropped from a window due to 
lateness.
--- End diff --

Ah, I meant something like this:
```

   item 1
   item2


the javadoc syntax is almost similar to HTML, just the closing `` tags 
are missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests

2016-06-16 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333442#comment-15333442
 ] 

Maximilian Michels commented on FLINK-2392:
---

Should be fixed the resolution of FLINK-3962

> Instable test in flink-yarn-tests
> -
>
> Key: FLINK-2392
> URL: https://issues.apache.org/jira/browse/FLINK-2392
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10.0
>Reporter: Matthias J. Sax
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.0.0
>
>
> The test YARNSessionFIFOITCase fails from time to time on an irregular basis. 
> For example see: https://travis-ci.org/apache/flink/jobs/72019690
> {noformat}
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) 
>  Time elapsed: 60.651 sec  <<< FAILURE!
> java.lang.AssertionError: During the timeout period of 60 seconds the 
> expected string did not show up
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435)
> Results :
> Failed tests: 
>   
> YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478
>  During the timeout period of 60 seconds the expected string did not show up
> {noformat}
> Another error case is this (see 
> https://travis-ci.org/mjsax/flink/jobs/77313444)
> {noformat}
> Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 27.356 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 17.421 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time elapsed: 
> 11.984 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> {noformat}
> Furthermore, this build failed too: 
> https://travis-ci.org/apache/flink/jobs/77313450
> (no error, but Travis terminated to due no progress for 300 seconds -> 
> deadlock?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics

2016-06-16 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333440#comment-15333440
 ] 

Maximilian Michels commented on FLINK-3962:
---

Another exception from the Yarn logs:

{noformat}
2016-06-16 04:58:37,218 ERROR org.apache.flink.metrics.reporter.JMXReporter 
- Metric did not comply with JMX MBean naming rules.
javax.management.NotCompliantMBeanException: Interface is not public: 
org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean
at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114)
at 
com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102)
at 
com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67)
at 
com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192)
at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138)
at 
com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60)
at 
com.sun.jmx.mbeanserver.Introspector.makeDynamicMBean(Introspector.java:192)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:898)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at 
org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:113)
at 
org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:174)
at 
org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:206)
at 
org.apache.flink.metrics.groups.AbstractMetricGroup.gauge(AbstractMetricGroup.java:162)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.instantiateClassLoaderMetrics(TaskManager.scala:2298)
at 
org.apache.flink.runtime.taskmanager.TaskManager$.org$apache$flink$runtime$taskmanager$TaskManager$$instantiateStatusMetrics(TaskManager.scala:2287)
at 
org.apache.flink.runtime.taskmanager.TaskManager.associateWithJobManager(TaskManager.scala:951)
at 
org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleRegistrationMessage(TaskManager.scala:631)
at 
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:297)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at 
org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2016-06-16 04:58:37,221 ERROR org.apache.flink.metrics.reporter.JMXReporter 
- Metric did not comply with JMX MBean naming rules.
javax.management.NotCompliantMBeanException: Interface is not public: 
org.apache.flink.metrics.reporter.JMXReporter$JmxGaugeMBean
at com.sun.jmx.mbeanserver.MBeanAnalyzer.(MBeanAnalyzer.java:114)
at 
com.sun.jmx.mbeanserver.MBeanAnalyzer.analyzer(MBeanAnalyzer.java:102)
at 
com.sun.jmx.mbeanserver.StandardMBeanIntrospector.getAnalyzer(StandardMBeanIntrospector.java:67)
at 
com.sun.jmx.mbeanserver.MBeanIntrospector.getPerInterface(MBeanIntrospector.java:192)
at com.sun.jmx.mbeanserver.MBeanSupport.(MBeanSupport.java:138)
at 
com.sun.jmx.mbeanserver.StandardMBeanSupport.(StandardMBeanSupport.java:60)
at 
com.sun.jmx.mbeanserver.Introspector.makeDynamicMBean(Introspector.java:192)
at 
com.sun.jmx.interceptor.DefaultM

[jira] [Commented] (FLINK-3714) Add Support for "Allowed Lateness"

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333441#comment-15333441
 ] 

ASF GitHub Bot commented on FLINK-3714:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2093#discussion_r67313976
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,9 +132,9 @@
 
/**
 * The allowed lateness for elements. This is used for:
-*  Deciding if an element should be dropped from a window due to 
lateness. 
-*  Clearing the state of a window if the system time passes
-* the {@code window.maxTimestamp + allowedLateness} landmark. 
+*  Deciding if an element should be dropped from a window due to 
lateness.
--- End diff --

Ah, I meant something like this:
```

   item 1
   item2


the javadoc syntax is almost similar to HTML, just the closing `` tags 
are missing.


> Add Support for "Allowed Lateness"
> --
>
> Key: FLINK-3714
> URL: https://issues.apache.org/jira/browse/FLINK-3714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
>
> As mentioned in 
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#
>  we should add support for an allowed lateness setting.
> This includes several things:
>  - API for setting allowed lateness
>  - Dropping of late elements 
>  - Garbage collection of windows state/timers
> Depending on whether the {{WindowAssigner}} assigns windows based on event 
> time or processing time we have to adjust the GC behavior. For event-time 
> windows "allowed lateness" makes sense and we should garbage collect after 
> this expires. For processing-time windows "allowed lateness" does not make 
> sense and we should always GC window state/timers at the end timestamp of a 
> processing-time window. I think that we need a method for this on 
> {{WindowAssigner}} that allows to differentiate between event-time windows 
> and processing-time windows: {{boolean WindowAssigner.isEventTime()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #1989: FLINK-3901 - Added CsvRowInputFormat

2016-06-16 Thread fpompermaier
Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67314234
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("this is", result.productElement(0));
+// assertEquals(new Integer(1), result.productElement(1));
+// assertEquals(new Double(2.0), result.productElement(2));
+//
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("a test", result.productEl

[jira] [Created] (FLINK-4081) FieldParsers should support empty strings

2016-06-16 Thread Flavio Pompermaier (JIRA)
Flavio Pompermaier created FLINK-4081:
-

 Summary: FieldParsers should support empty strings
 Key: FLINK-4081
 URL: https://issues.apache.org/jira/browse/FLINK-4081
 Project: Flink
  Issue Type: Bug
  Components: Core
Reporter: Flavio Pompermaier


In order to parse CSV files using the new Table API that converts rows to Row 
objects (that support null values), FiledParser implementations should support 
emptry strings setting the parser state to ParseErrorState.EMPTY_STRING (for 
example FloatParser and DoubleParser doesn't respect this constraint)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3901) Create a RowCsvInputFormat to use as default CSV IF in Table API

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333444#comment-15333444
 ] 

ASF GitHub Bot commented on FLINK-3901:
---

Github user fpompermaier commented on a diff in the pull request:

https://github.com/apache/flink/pull/1989#discussion_r67314234
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.types.parser.StringParser;
+import org.junit.Test;
+
+public class RowCsvInputFormatTest {
+
+   private static final Path PATH = new Path("an/ignored/file/");
+
+   //Static variables for testing the removal of \r\n to \n
+   private static final String FIRST_PART = "That is the first part";
+
+   private static final String SECOND_PART = "That is the second part";
+
+   @Test
+   public void ignoreInvalidLines() {
+   try {
+   String fileContent =
+   "header1|header2|header3|\n"+
+   "this is|1|2.0|\n"+
+   "//a comment\n" +
+   "a test|3|4.0|\n" +
+   "#next|5|6.0|\n";
+
+   FileInputSplit split = createTempFile(fileContent);
+
+   RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation[] {
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO,
+   BasicTypeInfo.DOUBLE_TYPE_INFO
+   });
+   CsvInputFormat format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
+   format.setLenient(false);
+
+   Configuration parameters = new Configuration();
+   format.configure(parameters);
+   format.open(split);
+
+   Row result = new Row(3);
+
+   try {
+   result = format.nextRecord(result);
+   fail("Parse Exception was not thrown! (Invalid 
int value)");
+   } catch (ParseException ex) {
+   }
+
+   // if format has lenient == false this can be asserted 
only after FLINK-3908
+// result = format.nextRecord(result);
+// assertNotNull(result);
+// assertEquals("this is", result.productElement(0));
+// assertEquals(new Integer(1), result.productElement(1));
   

[jira] [Commented] (FLINK-3962) JMXReporter doesn't properly register/deregister metrics

2016-06-16 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333458#comment-15333458
 ] 

Chesnay Schepler commented on FLINK-3962:
-

that new exception makes no sense at all. Both the JMXReporter and 
JMXGaugeMBean class are clearly marked as public classes. If this were not the 
case we would have run into that weeks ago.

there is something odd going on there.

> JMXReporter doesn't properly register/deregister metrics
> 
>
> Key: FLINK-3962
> URL: https://issues.apache.org/jira/browse/FLINK-3962
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Stephan Ewen
> Fix For: 1.1.0
>
>
> The following fails our Yarn tests because it checks for errors in the 
> jobmanager/taskmanager logs:
> {noformat}
> 2016-05-23 19:20:02,349 ERROR org.apache.flink.metrics.reporter.JMXReporter   
>   - A metric with the name 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>  was already registered.
> javax.management.InstanceAlreadyExistsException: 
> org.apache.flink.metrics:key0=testing-worker-linux-docker-05a6b382-3386-linux-4,key1=taskmanager,key2=9398ca9392af615e9d1896d0bd7ff52a,key3=Flink_Java_Job_at_Mon_May_23_19-20-00_UTC_2016,key4=,name=numBytesIn
>   at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>   at 
> org.apache.flink.metrics.reporter.JMXReporter.notifyOfAddedMetric(JMXReporter.java:76)
>   at 
> org.apache.flink.metrics.MetricRegistry.register(MetricRegistry.java:177)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:191)
>   at 
> org.apache.flink.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:144)
>   at 
> org.apache.flink.metrics.groups.IOMetricGroup.(IOMetricGroup.java:40)
>   at 
> org.apache.flink.metrics.groups.TaskMetricGroup.(TaskMetricGroup.java:68)
>   at 
> org.apache.flink.metrics.groups.JobMetricGroup.addTask(JobMetricGroup.java:74)
>   at 
> org.apache.flink.metrics.groups.TaskManagerMetricGroup.addTaskForJob(TaskManagerMetricGroup.java:86)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.scala:1092)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleTaskMessage(TaskManager.scala:441)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:283)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:124)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch

[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests

2016-06-16 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333456#comment-15333456
 ] 

Chesnay Schepler commented on FLINK-2392:
-

except it is entirely unrelated to metric names.

> Instable test in flink-yarn-tests
> -
>
> Key: FLINK-2392
> URL: https://issues.apache.org/jira/browse/FLINK-2392
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10.0
>Reporter: Matthias J. Sax
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.0.0
>
>
> The test YARNSessionFIFOITCase fails from time to time on an irregular basis. 
> For example see: https://travis-ci.org/apache/flink/jobs/72019690
> {noformat}
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) 
>  Time elapsed: 60.651 sec  <<< FAILURE!
> java.lang.AssertionError: During the timeout period of 60 seconds the 
> expected string did not show up
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435)
> Results :
> Failed tests: 
>   
> YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478
>  During the timeout period of 60 seconds the expected string did not show up
> {noformat}
> Another error case is this (see 
> https://travis-ci.org/mjsax/flink/jobs/77313444)
> {noformat}
> Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 27.356 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 17.421 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time elapsed: 
> 11.984 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> {noformat}
> Furthermore, this build failed too: 
> https://travis-ci.org/apache/flink/jobs/77313450
> (no error, but Travis terminated to due no progress for 300 seconds -> 
> deadlock?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests

2016-06-16 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333461#comment-15333461
 ] 

Maximilian Michels commented on FLINK-2392:
---

What do you mean? The issue is unrelated or the Exception?

> Instable test in flink-yarn-tests
> -
>
> Key: FLINK-2392
> URL: https://issues.apache.org/jira/browse/FLINK-2392
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10.0
>Reporter: Matthias J. Sax
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.0.0
>
>
> The test YARNSessionFIFOITCase fails from time to time on an irregular basis. 
> For example see: https://travis-ci.org/apache/flink/jobs/72019690
> {noformat}
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) 
>  Time elapsed: 60.651 sec  <<< FAILURE!
> java.lang.AssertionError: During the timeout period of 60 seconds the 
> expected string did not show up
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435)
> Results :
> Failed tests: 
>   
> YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478
>  During the timeout period of 60 seconds the expected string did not show up
> {noformat}
> Another error case is this (see 
> https://travis-ci.org/mjsax/flink/jobs/77313444)
> {noformat}
> Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 27.356 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 17.421 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time elapsed: 
> 11.984 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> {noformat}
> Furthermore, this build failed too: 
> https://travis-ci.org/apache/flink/jobs/77313450
> (no error, but Travis terminated to due no progress for 300 seconds -> 
> deadlock?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests

2016-06-16 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333477#comment-15333477
 ] 

Chesnay Schepler commented on FLINK-2392:
-

the exception is unrelated, that's why i replied directly to the comment 
regarding the exception and not in the main thread.

Yes, the error message indicates a naming conflict. But this line is generated 
by us, and is generally accurate since we exclude the possibility of the other 
case: a non-public interface.

> Instable test in flink-yarn-tests
> -
>
> Key: FLINK-2392
> URL: https://issues.apache.org/jira/browse/FLINK-2392
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10.0
>Reporter: Matthias J. Sax
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.0.0
>
>
> The test YARNSessionFIFOITCase fails from time to time on an irregular basis. 
> For example see: https://travis-ci.org/apache/flink/jobs/72019690
> {noformat}
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) 
>  Time elapsed: 60.651 sec  <<< FAILURE!
> java.lang.AssertionError: During the timeout period of 60 seconds the 
> expected string did not show up
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435)
> Results :
> Failed tests: 
>   
> YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478
>  During the timeout period of 60 seconds the expected string did not show up
> {noformat}
> Another error case is this (see 
> https://travis-ci.org/mjsax/flink/jobs/77313444)
> {noformat}
> Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 27.356 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 17.421 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time elapsed: 
> 11.984 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> {noformat}
> Furthermore, this build failed too: 
> https://travis-ci.org/apache/flink/jobs/77313450
> (no error, but Travis terminated to due no progress for 300 seconds -> 
> deadlock?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests

2016-06-16 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333483#comment-15333483
 ] 

Chesnay Schepler commented on FLINK-2392:
-

are these jobs from different profiles?

> Instable test in flink-yarn-tests
> -
>
> Key: FLINK-2392
> URL: https://issues.apache.org/jira/browse/FLINK-2392
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10.0
>Reporter: Matthias J. Sax
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.0.0
>
>
> The test YARNSessionFIFOITCase fails from time to time on an irregular basis. 
> For example see: https://travis-ci.org/apache/flink/jobs/72019690
> {noformat}
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) 
>  Time elapsed: 60.651 sec  <<< FAILURE!
> java.lang.AssertionError: During the timeout period of 60 seconds the 
> expected string did not show up
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435)
> Results :
> Failed tests: 
>   
> YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478
>  During the timeout period of 60 seconds the expected string did not show up
> {noformat}
> Another error case is this (see 
> https://travis-ci.org/mjsax/flink/jobs/77313444)
> {noformat}
> Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 27.356 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 17.421 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time elapsed: 
> 11.984 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> {noformat}
> Furthermore, this build failed too: 
> https://travis-ci.org/apache/flink/jobs/77313450
> (no error, but Travis terminated to due no progress for 300 seconds -> 
> deadlock?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2111: [FLINK-4076] BoltWrapper#dispose() should call AbstractSt...

2016-06-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2111
  
This looks good, let's see what Travis says.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4076) BoltWrapper#dispose() should call AbstractStreamOperator#dispose()

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333487#comment-15333487
 ] 

ASF GitHub Bot commented on FLINK-4076:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2111
  
This looks good, let's see what Travis says.


> BoltWrapper#dispose() should call AbstractStreamOperator#dispose()
> --
>
> Key: FLINK-4076
> URL: https://issues.apache.org/jira/browse/FLINK-4076
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   @Override
>   public void dispose() {
> this.bolt.cleanup();
>   }
> {code}
> AbstractStreamOperator#dispose() should be called first.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoints

2016-06-16 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2108
  
Thank you for the review @eliaslevy!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333492#comment-15333492
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2108
  
Thank you for the review @eliaslevy!


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333498#comment-15333498
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318672
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
 ---
@@ -127,4 +127,11 @@ public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema seriali
public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema serializationSchema, Properties producerConfig, 
KafkaPartitioner customPartitioner) {
super(topicId, serializationSchema, producerConfig, 
customPartitioner);
}
+
+   @Override
+   protected void flush() {
+   if(this.producer != null) {
--- End diff --

missing space after if


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333497#comment-15333497
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318603
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
 ---
@@ -125,4 +125,17 @@ public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema seriali
super(topicId, serializationSchema, producerConfig, 
customPartitioner);
}
 
+   @Override
+   protected void flush() {
+   // The Kafka 0.8 producer doesn't support flushing, therefore, 
we are using an inefficient
+   // busy wait approach
+   while(pendingRecords > 0) {
--- End diff --

missing space after while


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318603
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
 ---
@@ -125,4 +125,17 @@ public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema seriali
super(topicId, serializationSchema, producerConfig, 
customPartitioner);
}
 
+   @Override
+   protected void flush() {
+   // The Kafka 0.8 producer doesn't support flushing, therefore, 
we are using an inefficient
+   // busy wait approach
+   while(pendingRecords > 0) {
--- End diff --

missing space after while


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318672
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
 ---
@@ -127,4 +127,11 @@ public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema seriali
public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema serializationSchema, Properties producerConfig, 
KafkaPartitioner customPartitioner) {
super(topicId, serializationSchema, producerConfig, 
customPartitioner);
}
+
+   @Override
+   protected void flush() {
+   if(this.producer != null) {
--- End diff --

missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333502#comment-15333502
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318807
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -276,6 +315,38 @@ public void close() throws Exception {
checkErroneous();
}
 
+   // --- Logic for handling checkpoint flushing 
-- //
+
+   private void acknowledgeMessage() {
+   if(!flushOnCheckpoint) {
+   // the logic is disabled
+   return;
+   }
+   pendingRecords--;
+   }
+
+   protected abstract void flush();
+
+   @Override
+   public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+   if(flushOnCheckpoint) {
+   // flushing is activated: We need to wait until 
pendingRecords is 0
+   flush();
+
+   if(pendingRecords != 0) {
--- End diff --

space


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318807
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -276,6 +315,38 @@ public void close() throws Exception {
checkErroneous();
}
 
+   // --- Logic for handling checkpoint flushing 
-- //
+
+   private void acknowledgeMessage() {
+   if(!flushOnCheckpoint) {
+   // the logic is disabled
+   return;
+   }
+   pendingRecords--;
+   }
+
+   protected abstract void flush();
+
+   @Override
+   public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+   if(flushOnCheckpoint) {
+   // flushing is activated: We need to wait until 
pendingRecords is 0
+   flush();
+
+   if(pendingRecords != 0) {
--- End diff --

space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333501#comment-15333501
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318778
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -276,6 +315,38 @@ public void close() throws Exception {
checkErroneous();
}
 
+   // --- Logic for handling checkpoint flushing 
-- //
+
+   private void acknowledgeMessage() {
+   if(!flushOnCheckpoint) {
+   // the logic is disabled
+   return;
+   }
+   pendingRecords--;
+   }
+
+   protected abstract void flush();
+
+   @Override
+   public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+   if(flushOnCheckpoint) {
--- End diff --

space


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333500#comment-15333500
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318738
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -261,7 +298,9 @@ public void invoke(IN next) throws Exception {
} else {
record = new ProducerRecord<>(targetTopic, 
partitioner.partition(next, serializedKey, serializedValue, partitions.length), 
serializedKey, serializedValue);
}
-
+   if(flushOnCheckpoint) {
--- End diff --

missing space after if


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318738
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -261,7 +298,9 @@ public void invoke(IN next) throws Exception {
} else {
record = new ProducerRecord<>(targetTopic, 
partitioner.partition(next, serializedKey, serializedValue, partitions.length), 
serializedKey, serializedValue);
}
-
+   if(flushOnCheckpoint) {
--- End diff --

missing space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-4037) Introduce ArchivedExecutionGraph without any user classes

2016-06-16 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-4037:
--
Assignee: (was: Stefan Richter)

> Introduce ArchivedExecutionGraph without any user classes
> -
>
> Key: FLINK-4037
> URL: https://issues.apache.org/jira/browse/FLINK-4037
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Robert Metzger
>
> As a follow up to FLINK-4011: In order to allow the JobManager to unload all 
> classes from a finished job, we need to convert the ExecutionGraph (and some 
> attached objects like the ExecutionConfig) into a stringified version, not 
> containing any user classes.
> The web frontend can show strings only anyways.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318778
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -276,6 +315,38 @@ public void close() throws Exception {
checkErroneous();
}
 
+   // --- Logic for handling checkpoint flushing 
-- //
+
+   private void acknowledgeMessage() {
+   if(!flushOnCheckpoint) {
+   // the logic is disabled
+   return;
+   }
+   pendingRecords--;
+   }
+
+   protected abstract void flush();
+
+   @Override
+   public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
+   if(flushOnCheckpoint) {
--- End diff --

space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318851
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -276,6 +315,38 @@ public void close() throws Exception {
checkErroneous();
}
 
+   // --- Logic for handling checkpoint flushing 
-- //
+
+   private void acknowledgeMessage() {
+   if(!flushOnCheckpoint) {
--- End diff --

space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2108: [FLINK-4027] Flush FlinkKafkaProducer on checkpoin...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318905
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Test ensuring that the producer is not dropping buffered records
+ */
+@SuppressWarnings("unchecked")
+public class AtLeastOnceProducerTest {
+
+   @Test
+   public void testAtLeastOnceProducer() throws Exception {
+   runTest(true);
+   }
+
+   // This test ensures that the actual test fails if the flushing is 
disabled
+   @Test(expected = AssertionError.class)
+   public void ensureTestFails() throws Exception {
+   runTest(false);
+   }
+
+   private void runTest(boolean flushOnCheckpoint) throws Exception {
+   Properties props = new Properties();
+   final TestingKafkaProducer producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props);
+   producer.setFlushOnCheckpoint(flushOnCheckpoint);
+   producer.setRuntimeContext(new MockRuntimeContext(0, 1));
+
+   producer.open(new Configuration());
+
+   for(int i = 0; i < 100; i++) {
--- End diff --

missing space after for


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333503#comment-15333503
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318851
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 ---
@@ -276,6 +315,38 @@ public void close() throws Exception {
checkErroneous();
}
 
+   // --- Logic for handling checkpoint flushing 
-- //
+
+   private void acknowledgeMessage() {
+   if(!flushOnCheckpoint) {
--- End diff --

space


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333505#comment-15333505
 ] 

ASF GitHub Bot commented on FLINK-4027:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2108#discussion_r67318905
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Test ensuring that the producer is not dropping buffered records
+ */
+@SuppressWarnings("unchecked")
+public class AtLeastOnceProducerTest {
+
+   @Test
+   public void testAtLeastOnceProducer() throws Exception {
+   runTest(true);
+   }
+
+   // This test ensures that the actual test fails if the flushing is 
disabled
+   @Test(expected = AssertionError.class)
+   public void ensureTestFails() throws Exception {
+   runTest(false);
+   }
+
+   private void runTest(boolean flushOnCheckpoint) throws Exception {
+   Properties props = new Properties();
+   final TestingKafkaProducer producer = new 
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new 
SimpleStringSchema()), props);
+   producer.setFlushOnCheckpoint(flushOnCheckpoint);
+   producer.setRuntimeContext(new MockRuntimeContext(0, 1));
+
+   producer.open(new Configuration());
+
+   for(int i = 0; i < 100; i++) {
--- End diff --

missing space after for


> FlinkKafkaProducer09 sink can lose messages
> ---
>
> Key: FLINK-4027
> URL: https://issues.apache.org/jira/browse/FLINK-4027
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Critical
>
> The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees.
> The producer is publishing messages asynchronously.  A callback can record 
> publishing errors, which will be raised when detected.  But as far as I can 
> tell, there is no barrier to wait for async errors from the sink when 
> checkpointing or to track the event time of acked messages to inform the 
> checkpointing process.
> If a checkpoint occurs while there are pending publish requests, and the 
> requests return a failure after the checkpoint occurred, those message will 
> be lost as the checkpoint will consider them processed by the sink.



--
Thi

[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333509#comment-15333509
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r67319428
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FilesFilterTest.java ---
@@ -0,0 +1,33 @@
+package org.apache.flink.api.common.io;
--- End diff --

missing license


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r67319317
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---
@@ -70,7 +70,17 @@
 * The fraction that the last split may be larger than the others.
 */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
-   
+
+   /**
+* Patterns for file names to include
+*/
+   private static final String INCLUDE_PATTERNS = 
"fileInputFormat.include";
--- End diff --

these should be prefixed with KEY_


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333508#comment-15333508
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r67319317
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---
@@ -70,7 +70,17 @@
 * The fraction that the last split may be larger than the others.
 */
private static final float MAX_SPLIT_SIZE_DISCREPANCY = 1.1f;
-   
+
+   /**
+* Patterns for file names to include
+*/
+   private static final String INCLUDE_PATTERNS = 
"fileInputFormat.include";
--- End diff --

these should be prefixed with KEY_


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r67319428
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FilesFilterTest.java ---
@@ -0,0 +1,33 @@
+package org.apache.flink.api.common.io;
--- End diff --

missing license


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333510#comment-15333510
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r67319452
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FilesFilter.java ---
@@ -0,0 +1,79 @@
+package org.apache.flink.api.common.io;
--- End diff --

missing license


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-06-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r67319452
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FilesFilter.java ---
@@ -0,0 +1,79 @@
+package org.apache.flink.api.common.io;
--- End diff --

missing license


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

2016-06-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2109
  
You marked this PR as containing documentation for new functionality, but i 
can't find any changes to the docs regarding the newly added configuration 
keys. Did you forget to include them in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333514#comment-15333514
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2109
  
You marked this PR as containing documentation for new functionality, but i 
can't find any changes to the docs regarding the newly added configuration 
keys. Did you forget to include them in this PR?


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3951) Add Histogram Metric Type

2016-06-16 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-3951:


Assignee: Till Rohrmann

> Add Histogram Metric Type
> -
>
> Key: FLINK-3951
> URL: https://issues.apache.org/jira/browse/FLINK-3951
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2106: [FLINK-4063] Add Metrics Support for Triggers

2016-06-16 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/2106


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests

2016-06-16 Thread Maximilian Michels (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333516#comment-15333516
 ] 

Maximilian Michels commented on FLINK-2392:
---

It doesn't matter if it is a naming conflict or a malformed bean. It still 
seems to be an instance of incorrect registration, no? :)

> Instable test in flink-yarn-tests
> -
>
> Key: FLINK-2392
> URL: https://issues.apache.org/jira/browse/FLINK-2392
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10.0
>Reporter: Matthias J. Sax
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.0.0
>
>
> The test YARNSessionFIFOITCase fails from time to time on an irregular basis. 
> For example see: https://travis-ci.org/apache/flink/jobs/72019690
> {noformat}
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) 
>  Time elapsed: 60.651 sec  <<< FAILURE!
> java.lang.AssertionError: During the timeout period of 60 seconds the 
> expected string did not show up
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435)
> Results :
> Failed tests: 
>   
> YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478
>  During the timeout period of 60 seconds the expected string did not show up
> {noformat}
> Another error case is this (see 
> https://travis-ci.org/mjsax/flink/jobs/77313444)
> {noformat}
> Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 27.356 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 17.421 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time elapsed: 
> 11.984 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> {noformat}
> Furthermore, this build failed too: 
> https://travis-ci.org/apache/flink/jobs/77313450
> (no error, but Travis terminated to due no progress for 300 seconds -> 
> deadlock?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4063) Add Metrics Support for Triggers

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333517#comment-15333517
 ] 

ASF GitHub Bot commented on FLINK-4063:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/2106


> Add Metrics Support for Triggers
> 
>
> Key: FLINK-4063
> URL: https://issues.apache.org/jira/browse/FLINK-4063
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> Now that we have proper support for metrics we should also add a hook that 
> allows triggers to report metrics.
> This supersedes FLINK-3758 which was about using accumulators for metrics in 
> triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4063) Add Metrics Support for Triggers

2016-06-16 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek resolved FLINK-4063.
-
   Resolution: Fixed
Fix Version/s: 1.1.0

Implemented here: 
https://github.com/apache/flink/commit/10495852370fed3b7683d428f6c2cc7470b3d8ed

> Add Metrics Support for Triggers
> 
>
> Key: FLINK-4063
> URL: https://issues.apache.org/jira/browse/FLINK-4063
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.1.0
>
>
> Now that we have proper support for metrics we should also add a hook that 
> allows triggers to report metrics.
> This supersedes FLINK-3758 which was about using accumulators for metrics in 
> triggers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3949) Collect Metrics in Runtime Operators

2016-06-16 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-3949:
---

Assignee: Chesnay Schepler

> Collect Metrics in Runtime Operators
> 
>
> Key: FLINK-3949
> URL: https://issues.apache.org/jira/browse/FLINK-3949
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3951) Add Histogram Metric Type

2016-06-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333533#comment-15333533
 ] 

ASF GitHub Bot commented on FLINK-3951:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2112

[FLINK-3951] Add histogram metric type

This PR introduces an interface for Flink's histogram metric type 
`Histogram` and the corresponding statistics `HistogramStatistics`. The 
`Histogram` interface allows to update the histogram with new values , retrieve 
the number of collected values and to obtain a `HistogramStatistics` object 
which represents the current statistics of the histogram. The values must be of 
type long.

The `HistogramStatistics` allows to compute the mean, min, max and values 
for quantiles based on the element distribution calculated by the histogram.

An implementation of the `Histogram` interface is contained in the 
flink-metrics-dropwizard module. The implementation wraps a dropwizard 
histogram via the `DropwizardHistogramWrapper` class. This is currently the 
only implementation of `Histogram`. If needed, we can add a histogram 
implementation to flink-core later. But for the moment histograms are intended 
to be used as user metrics. Histograms add computational overhead which should 
not be imposed on the runtime per default.

The histogram' statistics are reported by all currently supported 
`MetricReporters`. The `JMXReporter` introduces a `JmxHistogramMBean` interface 
for this purpose. The `StatsDReporter` reports the individual values of the 
`HistogramStatistics` object as gauge values. The `ScheduledDropwizardReporter` 
wraps the `Histogram` in a `HistogramWrapper` and the `HistogramStatistics` in 
a `HistogramStatisticsWrapper` so that they can be given to Dropwizards' own 
reporters. As an optimization, the `ScheduledDropwizardReporter` detects a 
`DropwizardHistogramWrapper` and unwraps the dropwizard histogram which is then 
used for reporting.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink addHistogram

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2112.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2112


commit c3e8c315a56d5a25493f56d9713a968708e29b93
Author: Till Rohrmann 
Date:   2016-06-14T17:04:43Z

Add histogram interface

commit fd9fba8b00eef645def43c9b76d4dd606a9303a5
Author: Till Rohrmann 
Date:   2016-06-14T17:11:24Z

Rename flink-metric-reporters into flink-metrics

commit 796bf6964800b891413f11c2c366bb94758de4e8
Author: Till Rohrmann 
Date:   2016-06-15T12:02:20Z

Add dropwizard histogram wrapper

commit 9b7d9c324246a4a99a6de5ad18571a68564bfe9c
Author: Till Rohrmann 
Date:   2016-06-15T12:16:26Z

Add histogram to MetricGroup

commit a782277c63065c297ef1a7a7b8e20e4260791945
Author: Till Rohrmann 
Date:   2016-06-15T13:37:34Z

Add histogram support to existing reporters

commit 38f3d82d056921bb54e71a0bf4aa11a5ed879fe8
Author: Till Rohrmann 
Date:   2016-06-15T15:47:48Z

Add DropwizardHistogramWrapperTest

commit 35f907ce977b2eaa09e78905e40fae820aad1ba7
Author: Till Rohrmann 
Date:   2016-06-15T16:17:19Z

Add JMXReporter histogram test

commit 832b83c90a03d9f358ed234ebfe730c83a45a41b
Author: Till Rohrmann 
Date:   2016-06-16T10:09:06Z

Add statsD reporter histogram test




> Add Histogram Metric Type
> -
>
> Key: FLINK-3951
> URL: https://issues.apache.org/jira/browse/FLINK-3951
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Till Rohrmann
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2112: [FLINK-3951] Add histogram metric type

2016-06-16 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/2112

[FLINK-3951] Add histogram metric type

This PR introduces an interface for Flink's histogram metric type 
`Histogram` and the corresponding statistics `HistogramStatistics`. The 
`Histogram` interface allows to update the histogram with new values , retrieve 
the number of collected values and to obtain a `HistogramStatistics` object 
which represents the current statistics of the histogram. The values must be of 
type long.

The `HistogramStatistics` allows to compute the mean, min, max and values 
for quantiles based on the element distribution calculated by the histogram.

An implementation of the `Histogram` interface is contained in the 
flink-metrics-dropwizard module. The implementation wraps a dropwizard 
histogram via the `DropwizardHistogramWrapper` class. This is currently the 
only implementation of `Histogram`. If needed, we can add a histogram 
implementation to flink-core later. But for the moment histograms are intended 
to be used as user metrics. Histograms add computational overhead which should 
not be imposed on the runtime per default.

The histogram' statistics are reported by all currently supported 
`MetricReporters`. The `JMXReporter` introduces a `JmxHistogramMBean` interface 
for this purpose. The `StatsDReporter` reports the individual values of the 
`HistogramStatistics` object as gauge values. The `ScheduledDropwizardReporter` 
wraps the `Histogram` in a `HistogramWrapper` and the `HistogramStatistics` in 
a `HistogramStatisticsWrapper` so that they can be given to Dropwizards' own 
reporters. As an optimization, the `ScheduledDropwizardReporter` detects a 
`DropwizardHistogramWrapper` and unwraps the dropwizard histogram which is then 
used for reporting.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink addHistogram

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2112.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2112


commit c3e8c315a56d5a25493f56d9713a968708e29b93
Author: Till Rohrmann 
Date:   2016-06-14T17:04:43Z

Add histogram interface

commit fd9fba8b00eef645def43c9b76d4dd606a9303a5
Author: Till Rohrmann 
Date:   2016-06-14T17:11:24Z

Rename flink-metric-reporters into flink-metrics

commit 796bf6964800b891413f11c2c366bb94758de4e8
Author: Till Rohrmann 
Date:   2016-06-15T12:02:20Z

Add dropwizard histogram wrapper

commit 9b7d9c324246a4a99a6de5ad18571a68564bfe9c
Author: Till Rohrmann 
Date:   2016-06-15T12:16:26Z

Add histogram to MetricGroup

commit a782277c63065c297ef1a7a7b8e20e4260791945
Author: Till Rohrmann 
Date:   2016-06-15T13:37:34Z

Add histogram support to existing reporters

commit 38f3d82d056921bb54e71a0bf4aa11a5ed879fe8
Author: Till Rohrmann 
Date:   2016-06-15T15:47:48Z

Add DropwizardHistogramWrapperTest

commit 35f907ce977b2eaa09e78905e40fae820aad1ba7
Author: Till Rohrmann 
Date:   2016-06-15T16:17:19Z

Add JMXReporter histogram test

commit 832b83c90a03d9f358ed234ebfe730c83a45a41b
Author: Till Rohrmann 
Date:   2016-06-16T10:09:06Z

Add statsD reporter histogram test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2392) Instable test in flink-yarn-tests

2016-06-16 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15333536#comment-15333536
 ] 

Chesnay Schepler commented on FLINK-2392:
-

Yes it is indeed related to registering metrics. But the issue mentioned so far 
in FLINK-3962 were related to naming.

I would prefer creating a new issue since the chances of FLINK-3962 randomly 
fixing this issue as well are slim. They are also only tangentially related 
based on where they occur, the actual cause (and thus, fix) are most likely 
completely different.



> Instable test in flink-yarn-tests
> -
>
> Key: FLINK-2392
> URL: https://issues.apache.org/jira/browse/FLINK-2392
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10.0
>Reporter: Matthias J. Sax
>Assignee: Robert Metzger
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.0.0
>
>
> The test YARNSessionFIFOITCase fails from time to time on an irregular basis. 
> For example see: https://travis-ci.org/apache/flink/jobs/72019690
> {noformat}
> Tests run: 12, Failures: 1, Errors: 0, Skipped: 2, Time elapsed: 205.163 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> perJobYarnClusterWithParallelism(org.apache.flink.yarn.YARNSessionFIFOITCase) 
>  Time elapsed: 60.651 sec  <<< FAILURE!
> java.lang.AssertionError: During the timeout period of 60 seconds the 
> expected string did not show up
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.assertTrue(Assert.java:41)
>   at org.apache.flink.yarn.YarnTestBase.runWithArgs(YarnTestBase.java:478)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.perJobYarnClusterWithParallelism(YARNSessionFIFOITCase.java:435)
> Results :
> Failed tests: 
>   
> YARNSessionFIFOITCase.perJobYarnClusterWithParallelism:435->YarnTestBase.runWithArgs:478
>  During the timeout period of 60 seconds the expected string did not show up
> {noformat}
> Another error case is this (see 
> https://travis-ci.org/mjsax/flink/jobs/77313444)
> {noformat}
> Tests run: 12, Failures: 3, Errors: 0, Skipped: 2, Time elapsed: 182.008 sec 
> <<< FAILURE! - in org.apache.flink.yarn.YARNSessionFIFOITCase
> testTaskManagerFailure(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 27.356 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testNonexistingQueue(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time 
> elapsed: 17.421 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> testJavaAPI(org.apache.flink.yarn.YARNSessionFIFOITCase)  Time elapsed: 
> 11.984 sec  <<< FAILURE!
> java.lang.AssertionError: Found a file 
> /home/travis/build/mjsax/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-0_0/application_1440595422559_0007/container_1440595422559_0007_01_03/taskmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081]
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.yarn.YarnTestBase.ensureNoProhibitedStringInLogFiles(YarnTestBase.java:294)
>   at 
> org.apache.flink.yarn.YARNSessionFIFOITCase.checkForProhibitedLogContents(YARNSessionFIFOITCase.java:94)
> {noformat}
> Furthermore, this build failed too: 
> https://travis-ci.org/apache/flink/jobs/77313450
> (no error, but Travis terminated to due no progress for 300 seconds -> 
> deadlock?)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2090: Expose several new Metrics

2016-06-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2090


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3949) Collect Metrics in Runtime Operators

2016-06-16 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-3949.
---
Resolution: Fixed

implemented in c78b3c49e0e82874cbfa71e88bf28b99ed395610.

If additional metrics are to be collected a new specific issue should be 
created.

> Collect Metrics in Runtime Operators
> 
>
> Key: FLINK-3949
> URL: https://issues.apache.org/jira/browse/FLINK-3949
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   >