Hi Ahmed,
I also have the same error that is probably caused by the KryoSerializer.
Right now I'm testing a patch to this problem so maybe you could also test
it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you can use
my KryoSerializer but I think so. Actually I just recreate Input and Output
every time in the serialized/deserialize and then I close them.

This is my attempt to fix the problem (actually the KryoSerializer class in
the flink-core module):


/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Preconditions;

import org.apache.avro.generic.GenericData;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;

/**
 * A type serializer that serializes its type using the Kryo serialization
 * framework (https://github.com/EsotericSoftware/kryo).
 *
 * This serializer is intended as a fallback serializer for the cases that
are
 * not covered by the basic types, tuples, and POJOs.
 *
 * @param <T> The type to be serialized.
 */
public class KryoSerializer<T> extends TypeSerializer<T> {

private static final long serialVersionUID = 3L;

private static final Logger LOG =
LoggerFactory.getLogger(KryoSerializer.class);

// ------------------------------------------------------------------------

private final LinkedHashMap<Class<?>,
ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
registeredTypesWithSerializerClasses;
private final LinkedHashMap<Class<?>,
ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
defaultSerializerClasses;
private final LinkedHashSet<Class<?>> registeredTypes;

private final Class<T> type;
// ------------------------------------------------------------------------
// The fields below are lazily initialized after duplication or
deserialization.

private transient Kryo kryo;
private transient T copyInstance;
// ------------------------------------------------------------------------

public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
this.type = Preconditions.checkNotNull(type);

this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses =
executionConfig.getDefaultKryoSerializerClasses();
this.registeredTypesWithSerializers =
executionConfig.getRegisteredTypesWithKryoSerializers();
this.registeredTypesWithSerializerClasses =
executionConfig.getRegisteredTypesWithKryoSerializerClasses();
this.registeredTypes = executionConfig.getRegisteredKryoTypes();
}

/**
* Copy-constructor that does not copy transient fields. They will be
initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
registeredTypesWithSerializerClasses =
toCopy.registeredTypesWithSerializerClasses;
defaultSerializers = toCopy.defaultSerializers;
defaultSerializerClasses = toCopy.defaultSerializerClasses;
registeredTypes = toCopy.registeredTypes;

type = toCopy.type;
if(type == null){
throw new NullPointerException("Type class cannot be null.");
}
}

// ------------------------------------------------------------------------

@Override
public boolean isImmutableType() {
return false;
}

@Override
public KryoSerializer<T> duplicate() {
return new KryoSerializer<T>(this);
}

@Override
public T createInstance() {
if(Modifier.isAbstract(type.getModifiers()) ||
Modifier.isInterface(type.getModifiers()) ) {
return null;
} else {
checkKryoInitialized();
try {
return kryo.newInstance(type);
} catch(Throwable e) {
return null;
}
}
}

@SuppressWarnings("unchecked")
@Override
public T copy(T from) {
if (from == null) {
return null;
}
checkKryoInitialized();
try {
return kryo.copy(from);
}
catch(KryoException ke) {
// kryo was unable to copy it, so we do it through serialization:
ByteArrayOutputStream baout = new ByteArrayOutputStream();
Output output = new Output(baout);

kryo.writeObject(output, from);

output.close();

ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
Input input = new Input(bain);

return (T)kryo.readObject(input, from.getClass());
}
}
@Override
public T copy(T from, T reuse) {
return copy(from);
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(T record, DataOutputView target) throws IOException {
checkKryoInitialized();
  DataOutputViewStream outputStream = new DataOutputViewStream(target);
  Output output = new Output(outputStream);

try {
   // Sanity check: Make sure that the output is cleared/has been flushed
by the last call
       // otherwise data might be written multiple times in case of a
previous EOFException
       if (output.position() != 0) {
           throw new IllegalStateException("The Kryo Output still contains
data from a previous " +
               "serialize call. It has to be flushed or cleared at the end
of the serialize call.");
       }

kryo.writeClassAndObject(output, record);
output.flush();
}
catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
}
else {
throw ke;
}
} finally {
     try {
              output.close();
            } catch (KryoException ke) {
              Throwable cause = ke.getCause();

              if (cause instanceof EOFException) {
                  throw (EOFException) cause;
              } else {
                  throw ke;
              }
          }
}
}

@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
checkKryoInitialized();
DataInputViewStream inputStream = new DataInputViewStream(source);
Input input = new NoFetchingInput(inputStream);

try {
return (T) kryo.readClassAndObject(input);
} catch (KryoException ke) {
Throwable cause = ke.getCause();

if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
} finally {
 try {
   input.close();
 } catch (KryoException ke) {
            Throwable cause = ke.getCause();

            if (cause instanceof EOFException) {
                throw (EOFException) cause;
            } else {
                throw ke;
            }
          }
}
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws
IOException {
checkKryoInitialized();
if(this.copyInstance == null){
this.copyInstance = createInstance();
}

T tmp = deserialize(copyInstance, source);
serialize(tmp, target);
}
//
--------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return Objects.hash(
type,
registeredTypes,
registeredTypesWithSerializerClasses,
defaultSerializerClasses);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;

// we cannot include the Serializers here because they don't implement the
equals method
return other.canEqual(this) &&
type == other.type &&
registeredTypes.equals(other.registeredTypes) &&
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
&&
defaultSerializerClasses.equals(other.defaultSerializerClasses);
} else {
return false;
}
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof KryoSerializer;
}

//
--------------------------------------------------------------------------------------------

/**
* Returns the Chill Kryo Serializer which is implictly added to the
classpath via flink-runtime.
* Falls back to the default Kryo serializer if it can't be found.
* @return The Kryo serializer instance.
*/
private Kryo getKryoInstance() {

try {
// check if ScalaKryoInstantiator is in class path (coming from Twitter's
Chill library).
// This will be true if Flink's Scala API is used.
Class<?> chillInstantiatorClazz =
Class.forName("com.twitter.chill.ScalaKryoInstantiator");
Object chillInstantiator = chillInstantiatorClazz.newInstance();

// obtain a Kryo instance through Twitter Chill
Method m = chillInstantiatorClazz.getMethod("newKryo");

return (Kryo) m.invoke(chillInstantiator);
} catch (ClassNotFoundException | InstantiationException |
NoSuchMethodException |
IllegalAccessException | InvocationTargetException e) {

LOG.warn("Falling back to default Kryo serializer because Chill serializer
couldn't be found.", e);

Kryo.DefaultInstantiatorStrategy initStrategy = new
Kryo.DefaultInstantiatorStrategy();
initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());

Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(initStrategy);

return kryo;
}
}

private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = getKryoInstance();

// Enable reference tracking.
kryo.setReferences(true);
// Throwable and all subclasses should be serialized via java serialization
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());

// Add default serializers first, so that they type registrations without a
serializer
// are registered with a default serializer
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry:
defaultSerializers.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
}

for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry:
defaultSerializerClasses.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
}

// register the type of our class
kryo.register(type);

// register given types. we do this first so that any registration of a
// more specific serializer overrides this
for (Class<?> type : registeredTypes) {
kryo.register(type);
}

// register given serializer classes
for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e :
registeredTypesWithSerializerClasses.entrySet()) {
Class<?> typeClass = e.getKey();
Class<? extends Serializer<?>> serializerClass = e.getValue();

Serializer<?> serializer =
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass,
typeClass);
kryo.register(typeClass, serializer);
}

// register given serializers
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e :
registeredTypesWithSerializers.entrySet()) {
kryo.register(e.getKey(), e.getValue().getSerializer());
}
// this is needed for Avro but can not be added on demand.
kryo.register(GenericData.Array.class, new
SpecificInstanceCollectionSerializerForArrayList());

kryo.setRegistrationRequired(false);
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
}

//
--------------------------------------------------------------------------------------------
// For testing
//
--------------------------------------------------------------------------------------------
public Kryo getKryo() {
checkKryoInitialized();
return this.kryo;
}
}

Best,
Flavio

On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ahmednader...@gmail.com> wrote:

> Hello,
> I have a TwitterSource and I'm applying some transformations as filter and
> map on the resulting stream from twitter. I'm collecting the output in an
> iterator: iterator = DataStreamUtils.collect(datastream). Then in a
> parallel thread i periodically check if this iterator.hasNext() and print
> the next item. I'm using Flink 1.0.3.
> That program works at the beginning and actually prints some items,
> however when i leave it running for some more time (Like for example after
> 40 seconds or 1 minute) then i get 2 exceptions which are:
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID
> and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
> These 2 exceptions result from the line where i'm checking if the iterator
> hasNext().
>
> I wanted to know why do these exceptions happen in general and also if
> anyone knows a specific solution for my program, that would be great too.
> Thanks,
> Ahmed
>



-- 

Flavio Pompermaier

*Development Department*_______________________________________________
*OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*

*Phone:* +(39) 0461 283 702
*Fax:* + (39) 0461 186 6433
*Email:* pomperma...@okkam.it
*Headquarters:* Trento (Italy), via G.B. Trener 8
*Registered office:* Trento (Italy), via Segantini 23

Confidentially notice. This e-mail transmission may contain legally
privileged and/or confidential information. Please do not read it if you
are not the intended recipient(S). Any use, distribution, reproduction or
disclosure by any other person is strictly prohibited. If you have received
this e-mail in error, please notify the sender and destroy the original
transmission and its attachments without reading or saving it in any manner.

Reply via email to