package streamtest;

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;

public class DynamicGenericRecordTypeInfo extends TypeInformation<GenericRecord> {
	
	/*
	 * private final org.apache.avro.Schema schema; private final
	 * AvroSerializer<GenericRecord> serializer;
	 * 
	 * public DynamicGenericRecordTypeInfo(org.apache.avro.Schema schema) {
	 * this.schema = schema; this.serializer = new
	 * AvroSerializer<>(GenericRecord.class, schema); }
	 */

    @Override
    public boolean isBasicType() {
        return false;
    }

    @Override
    public boolean isTupleType() {
        return false;
    }

    @Override
    public int getArity() {
        return 1;
    }

    @Override
    public int getTotalFields() {
        return 1;
    }

    @Override
    public Class<GenericRecord> getTypeClass() {
        return GenericRecord.class;
    }

    @Override
    public boolean isKeyType() {
        return false;
    }

    @SuppressWarnings("rawtypes")
	@Override
    public TypeSerializer<GenericRecord> createSerializer(ExecutionConfig config) {
    	System.out.println("createSerializer method is called");
        return new DynamicGenericRecordSerializer();
    	
    	//return serializer;
    }

    @Override
    public String toString() {
        return "DynamicGenericRecordTypeInfo";
    }

    @Override
    public boolean equals(Object obj) {
        return obj instanceof DynamicGenericRecordTypeInfo;
    }

    @Override
    public int hashCode() {
        return GenericRecord.class.hashCode();
    }

    @Override
    public boolean canEqual(Object obj) {
        return obj instanceof DynamicGenericRecordTypeInfo;
    }
}

