Hi Ahmed,

I tried setting up your use case and for me it all seems to work. However,
I didn't use the Spring framework and executed the program in a local Flink
cluster.

Maybe you can compile a self-containing example (including example data) to
reproduce your problem and send it to us.

Cheers,
Till

On Wed, Jun 8, 2016 at 5:48 PM, Ahmed Nader <ahmednader...@gmail.com> wrote:

> Hello Flavio,
> Thank you so much for replying, however I didn't download Flink locally, I
> only added dependencies in a maven project. So i don't think I'll be able
> to modify the KryoSerializer class. But yeah me too i think it's the
> problem.
> Thanks,
> Ahmed
>
> On 8 June 2016 at 16:07, Flavio Pompermaier <pomperma...@okkam.it> wrote:
>
>> Hi Ahmed,
>> I also have the same error that is probably caused by the KryoSerializer.
>> Right now I'm testing a patch to this problem so maybe you could also
>> test it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you
>> can use my KryoSerializer but I think so. Actually I just recreate Input
>> and Output every time in the serialized/deserialize and then I close them.
>>
>> This is my attempt to fix the problem (actually the KryoSerializer class
>> in the flink-core module):
>>
>>
>> /*
>>  * Licensed to the Apache Software Foundation (ASF) under one
>>  * or more contributor license agreements.  See the NOTICE file
>>  * distributed with this work for additional information
>>  * regarding copyright ownership.  The ASF licenses this file
>>  * to you under the Apache License, Version 2.0 (the
>>  * "License"); you may not use this file except in compliance
>>  * with the License.  You may obtain a copy of the License at
>>  *
>>  *     http://www.apache.org/licenses/LICENSE-2.0
>>  *
>>  * Unless required by applicable law or agreed to in writing, software
>>  * distributed under the License is distributed on an "AS IS" BASIS,
>>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>> implied.
>>  * See the License for the specific language governing permissions and
>>  * limitations under the License.
>>  */
>>
>> package org.apache.flink.api.java.typeutils.runtime.kryo;
>>
>> import com.esotericsoftware.kryo.Kryo;
>> import com.esotericsoftware.kryo.KryoException;
>> import com.esotericsoftware.kryo.Serializer;
>> import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
>> import com.esotericsoftware.kryo.io.Input;
>> import com.esotericsoftware.kryo.io.Output;
>> import com.esotericsoftware.kryo.serializers.JavaSerializer;
>> import com.google.common.base.Preconditions;
>>
>> import org.apache.avro.generic.GenericData;
>> import org.apache.flink.api.common.ExecutionConfig;
>> import org.apache.flink.api.common.typeutils.TypeSerializer;
>> import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
>> import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
>> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
>> import
>> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
>> import org.apache.flink.core.memory.DataInputView;
>> import org.apache.flink.core.memory.DataOutputView;
>> import org.objenesis.strategy.StdInstantiatorStrategy;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.ByteArrayInputStream;
>> import java.io.ByteArrayOutputStream;
>> import java.io.EOFException;
>> import java.io.IOException;
>> import java.lang.reflect.InvocationTargetException;
>> import java.lang.reflect.Method;
>> import java.lang.reflect.Modifier;
>> import java.util.LinkedHashMap;
>> import java.util.LinkedHashSet;
>> import java.util.Map;
>> import java.util.Objects;
>>
>> /**
>>  * A type serializer that serializes its type using the Kryo serialization
>>  * framework (https://github.com/EsotericSoftware/kryo).
>>  *
>>  * This serializer is intended as a fallback serializer for the cases
>> that are
>>  * not covered by the basic types, tuples, and POJOs.
>>  *
>>  * @param <T> The type to be serialized.
>>  */
>> public class KryoSerializer<T> extends TypeSerializer<T> {
>>
>> private static final long serialVersionUID = 3L;
>>
>> private static final Logger LOG =
>> LoggerFactory.getLogger(KryoSerializer.class);
>>
>> //
>> ------------------------------------------------------------------------
>>
>> private final LinkedHashMap<Class<?>,
>> ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
>> private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
>> registeredTypesWithSerializerClasses;
>> private final LinkedHashMap<Class<?>,
>> ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
>> private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
>> defaultSerializerClasses;
>> private final LinkedHashSet<Class<?>> registeredTypes;
>>
>> private final Class<T> type;
>> //
>> ------------------------------------------------------------------------
>> // The fields below are lazily initialized after duplication or
>> deserialization.
>>
>> private transient Kryo kryo;
>> private transient T copyInstance;
>> //
>> ------------------------------------------------------------------------
>>
>> public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
>> this.type = Preconditions.checkNotNull(type);
>>
>> this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
>> this.defaultSerializerClasses =
>> executionConfig.getDefaultKryoSerializerClasses();
>> this.registeredTypesWithSerializers =
>> executionConfig.getRegisteredTypesWithKryoSerializers();
>> this.registeredTypesWithSerializerClasses =
>> executionConfig.getRegisteredTypesWithKryoSerializerClasses();
>> this.registeredTypes = executionConfig.getRegisteredKryoTypes();
>> }
>>
>> /**
>> * Copy-constructor that does not copy transient fields. They will be
>> initialized once required.
>> */
>> protected KryoSerializer(KryoSerializer<T> toCopy) {
>> registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
>> registeredTypesWithSerializerClasses =
>> toCopy.registeredTypesWithSerializerClasses;
>> defaultSerializers = toCopy.defaultSerializers;
>> defaultSerializerClasses = toCopy.defaultSerializerClasses;
>> registeredTypes = toCopy.registeredTypes;
>>
>> type = toCopy.type;
>> if(type == null){
>> throw new NullPointerException("Type class cannot be null.");
>> }
>> }
>>
>> //
>> ------------------------------------------------------------------------
>>
>> @Override
>> public boolean isImmutableType() {
>> return false;
>> }
>>
>> @Override
>> public KryoSerializer<T> duplicate() {
>> return new KryoSerializer<T>(this);
>> }
>>
>> @Override
>> public T createInstance() {
>> if(Modifier.isAbstract(type.getModifiers()) ||
>> Modifier.isInterface(type.getModifiers()) ) {
>> return null;
>> } else {
>> checkKryoInitialized();
>> try {
>> return kryo.newInstance(type);
>> } catch(Throwable e) {
>> return null;
>> }
>> }
>> }
>>
>> @SuppressWarnings("unchecked")
>> @Override
>> public T copy(T from) {
>> if (from == null) {
>> return null;
>> }
>> checkKryoInitialized();
>> try {
>> return kryo.copy(from);
>> }
>> catch(KryoException ke) {
>> // kryo was unable to copy it, so we do it through serialization:
>> ByteArrayOutputStream baout = new ByteArrayOutputStream();
>> Output output = new Output(baout);
>>
>> kryo.writeObject(output, from);
>>
>> output.close();
>>
>> ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
>> Input input = new Input(bain);
>>
>> return (T)kryo.readObject(input, from.getClass());
>> }
>> }
>> @Override
>> public T copy(T from, T reuse) {
>> return copy(from);
>> }
>>
>> @Override
>> public int getLength() {
>> return -1;
>> }
>>
>> @Override
>> public void serialize(T record, DataOutputView target) throws IOException
>> {
>> checkKryoInitialized();
>>   DataOutputViewStream outputStream = new DataOutputViewStream(target);
>>   Output output = new Output(outputStream);
>>
>> try {
>>    // Sanity check: Make sure that the output is cleared/has been flushed
>> by the last call
>>        // otherwise data might be written multiple times in case of a
>> previous EOFException
>>        if (output.position() != 0) {
>>            throw new IllegalStateException("The Kryo Output still
>> contains data from a previous " +
>>                "serialize call. It has to be flushed or cleared at the
>> end of the serialize call.");
>>        }
>>
>> kryo.writeClassAndObject(output, record);
>> output.flush();
>> }
>> catch (KryoException ke) {
>> Throwable cause = ke.getCause();
>> if (cause instanceof EOFException) {
>> throw (EOFException) cause;
>> }
>> else {
>> throw ke;
>> }
>> } finally {
>>      try {
>>               output.close();
>>             } catch (KryoException ke) {
>>               Throwable cause = ke.getCause();
>>
>>               if (cause instanceof EOFException) {
>>                   throw (EOFException) cause;
>>               } else {
>>                   throw ke;
>>               }
>>           }
>> }
>> }
>>
>> @SuppressWarnings("unchecked")
>> @Override
>> public T deserialize(DataInputView source) throws IOException {
>> checkKryoInitialized();
>> DataInputViewStream inputStream = new DataInputViewStream(source);
>> Input input = new NoFetchingInput(inputStream);
>>
>> try {
>> return (T) kryo.readClassAndObject(input);
>> } catch (KryoException ke) {
>> Throwable cause = ke.getCause();
>>
>> if (cause instanceof EOFException) {
>> throw (EOFException) cause;
>> } else {
>> throw ke;
>> }
>> } finally {
>>  try {
>>    input.close();
>>  } catch (KryoException ke) {
>>             Throwable cause = ke.getCause();
>>
>>             if (cause instanceof EOFException) {
>>                 throw (EOFException) cause;
>>             } else {
>>                 throw ke;
>>             }
>>           }
>> }
>> }
>> @Override
>> public T deserialize(T reuse, DataInputView source) throws IOException {
>> return deserialize(source);
>> }
>>
>> @Override
>> public void copy(DataInputView source, DataOutputView target) throws
>> IOException {
>> checkKryoInitialized();
>> if(this.copyInstance == null){
>> this.copyInstance = createInstance();
>> }
>>
>> T tmp = deserialize(copyInstance, source);
>> serialize(tmp, target);
>> }
>> //
>> --------------------------------------------------------------------------------------------
>> @Override
>> public int hashCode() {
>> return Objects.hash(
>> type,
>> registeredTypes,
>> registeredTypesWithSerializerClasses,
>> defaultSerializerClasses);
>> }
>> @Override
>> public boolean equals(Object obj) {
>> if (obj instanceof KryoSerializer) {
>> KryoSerializer<?> other = (KryoSerializer<?>) obj;
>>
>> // we cannot include the Serializers here because they don't implement
>> the equals method
>> return other.canEqual(this) &&
>> type == other.type &&
>> registeredTypes.equals(other.registeredTypes) &&
>> registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses)
>> &&
>> defaultSerializerClasses.equals(other.defaultSerializerClasses);
>> } else {
>> return false;
>> }
>> }
>>
>> @Override
>> public boolean canEqual(Object obj) {
>> return obj instanceof KryoSerializer;
>> }
>>
>> //
>> --------------------------------------------------------------------------------------------
>>
>> /**
>> * Returns the Chill Kryo Serializer which is implictly added to the
>> classpath via flink-runtime.
>> * Falls back to the default Kryo serializer if it can't be found.
>> * @return The Kryo serializer instance.
>> */
>> private Kryo getKryoInstance() {
>>
>> try {
>> // check if ScalaKryoInstantiator is in class path (coming from Twitter's
>> Chill library).
>> // This will be true if Flink's Scala API is used.
>> Class<?> chillInstantiatorClazz =
>> Class.forName("com.twitter.chill.ScalaKryoInstantiator");
>> Object chillInstantiator = chillInstantiatorClazz.newInstance();
>>
>> // obtain a Kryo instance through Twitter Chill
>> Method m = chillInstantiatorClazz.getMethod("newKryo");
>>
>> return (Kryo) m.invoke(chillInstantiator);
>> } catch (ClassNotFoundException | InstantiationException |
>> NoSuchMethodException |
>> IllegalAccessException | InvocationTargetException e) {
>>
>> LOG.warn("Falling back to default Kryo serializer because Chill
>> serializer couldn't be found.", e);
>>
>> Kryo.DefaultInstantiatorStrategy initStrategy = new
>> Kryo.DefaultInstantiatorStrategy();
>> initStrategy.setFallbackInstantiatorStrategy(new
>> StdInstantiatorStrategy());
>>
>> Kryo kryo = new Kryo();
>> kryo.setInstantiatorStrategy(initStrategy);
>>
>> return kryo;
>> }
>> }
>>
>> private void checkKryoInitialized() {
>> if (this.kryo == null) {
>> this.kryo = getKryoInstance();
>>
>> // Enable reference tracking.
>> kryo.setReferences(true);
>> // Throwable and all subclasses should be serialized via java
>> serialization
>> kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
>>
>> // Add default serializers first, so that they type registrations without
>> a serializer
>> // are registered with a default serializer
>> for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>>
>> entry: defaultSerializers.entrySet()) {
>> kryo.addDefaultSerializer(entry.getKey(),
>> entry.getValue().getSerializer());
>> }
>>
>> for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry:
>> defaultSerializerClasses.entrySet()) {
>> kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
>> }
>>
>> // register the type of our class
>> kryo.register(type);
>>
>> // register given types. we do this first so that any registration of a
>> // more specific serializer overrides this
>> for (Class<?> type : registeredTypes) {
>> kryo.register(type);
>> }
>>
>> // register given serializer classes
>> for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e :
>> registeredTypesWithSerializerClasses.entrySet()) {
>> Class<?> typeClass = e.getKey();
>> Class<? extends Serializer<?>> serializerClass = e.getValue();
>>
>> Serializer<?> serializer =
>> ReflectionSerializerFactory.makeSerializer(kryo, serializerClass,
>> typeClass);
>> kryo.register(typeClass, serializer);
>> }
>>
>> // register given serializers
>> for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e :
>> registeredTypesWithSerializers.entrySet()) {
>> kryo.register(e.getKey(), e.getValue().getSerializer());
>> }
>> // this is needed for Avro but can not be added on demand.
>> kryo.register(GenericData.Array.class, new
>> SpecificInstanceCollectionSerializerForArrayList());
>>
>> kryo.setRegistrationRequired(false);
>> kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
>> }
>> }
>>
>> //
>> --------------------------------------------------------------------------------------------
>> // For testing
>> //
>> --------------------------------------------------------------------------------------------
>> public Kryo getKryo() {
>> checkKryoInitialized();
>> return this.kryo;
>> }
>> }
>>
>> Best,
>> Flavio
>>
>> On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ahmednader...@gmail.com>
>> wrote:
>>
>>> Hello,
>>> I have a TwitterSource and I'm applying some transformations as filter
>>> and map on the resulting stream from twitter. I'm collecting the output in
>>> an iterator: iterator = DataStreamUtils.collect(datastream). Then in a
>>> parallel thread i periodically check if this iterator.hasNext() and print
>>> the next item. I'm using Flink 1.0.3.
>>> That program works at the beginning and actually prints some items,
>>> however when i leave it running for some more time (Like for example after
>>> 40 seconds or 1 minute) then i get 2 exceptions which are:
>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>>> ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
>>> These 2 exceptions result from the line where i'm checking if the
>>> iterator hasNext().
>>>
>>> I wanted to know why do these exceptions happen in general and also if
>>> anyone knows a specific solution for my program, that would be great too.
>>> Thanks,
>>> Ahmed
>>>
>>
>>
>>
>> --
>>
>> Flavio Pompermaier
>>
>> *Development Department*_______________________________________________
>> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>*
>>
>> *Phone:* +(39) 0461 283 702
>> *Fax:* + (39) 0461 186 6433
>> *Email:* pomperma...@okkam.it
>> *Headquarters:* Trento (Italy), via G.B. Trener 8
>> *Registered office:* Trento (Italy), via Segantini 23
>>
>> Confidentially notice. This e-mail transmission may contain legally
>> privileged and/or confidential information. Please do not read it if you
>> are not the intended recipient(S). Any use, distribution, reproduction or
>> disclosure by any other person is strictly prohibited. If you have received
>> this e-mail in error, please notify the sender and destroy the original
>> transmission and its attachments without reading or saving it in any manner.
>>
>>
>

Reply via email to