Chenguang He created FLINK-3782:
-----------------------------------

             Summary: ByteArrayOutputStream and ObjectOutputStream should close
                 Key: FLINK-3782
                 URL: https://issues.apache.org/jira/browse/FLINK-3782
             Project: Flink
          Issue Type: Test
          Components: Java API
    Affects Versions: 1.0.1
            Reporter: Chenguang He
            Priority: Minor


@Test
        public void testSerializability() {
                try {
                        Collection<ElementType> inputCollection = new 
ArrayList<ElementType>();
                        ElementType element1 = new ElementType(1);
                        ElementType element2 = new ElementType(2);
                        ElementType element3 = new ElementType(3);
                        inputCollection.add(element1);
                        inputCollection.add(element2);
                        inputCollection.add(element3);
        
                        @SuppressWarnings("unchecked")
                        TypeInformation<ElementType> info = 
(TypeInformation<ElementType>) TypeExtractor.createTypeInfo(ElementType.class);
        
                        CollectionInputFormat<ElementType> inputFormat = new 
CollectionInputFormat<ElementType>(inputCollection,
                                        info.createSerializer(new 
ExecutionConfig()));

                        ByteArrayOutputStream buffer = new 
ByteArrayOutputStream();
                        ObjectOutputStream out = new ObjectOutputStream(buffer);

                        out.writeObject(inputFormat);

                        ObjectInputStream in = new ObjectInputStream(new 
ByteArrayInputStream(buffer.toByteArray()));

                        Object serializationResult = in.readObject();

                        assertNotNull(serializationResult);
                        assertTrue(serializationResult instanceof 
CollectionInputFormat<?>);

                        @SuppressWarnings("unchecked")
                        CollectionInputFormat<ElementType> result = 
(CollectionInputFormat<ElementType>) serializationResult;

                        GenericInputSplit inputSplit = new GenericInputSplit(0, 
1);
                        inputFormat.open(inputSplit);
                        result.open(inputSplit);
                        while(!inputFormat.reachedEnd() && 
!result.reachedEnd()){
                                ElementType expectedElement = 
inputFormat.nextRecord(null);
                                ElementType actualElement = 
result.nextRecord(null);

                                assertEquals(expectedElement, actualElement);
                        }
                }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.toString());
                }
        }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to