/**
 * 
 */
package com.xyz.topology.netflow.beam;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Properties;

import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;

import com.xyz.schemas.Test;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * @author kaniska
 *
 */
public class BeamKafkaFlinkAvroProducerTest {
	
	private static final String TOPIC = "topic3";
	private static BeamKafkaOptions options;

	private static Properties props = new Properties();

	public static void setup(String[] args) {
		PipelineOptionsFactory.register(BeamKafkaOptions.class);
		options = PipelineOptionsFactory.fromArgs(args).as(BeamKafkaOptions.class);
		options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");

		// options.setZookeeper(EMBEDDED_ZOOKEEPER.getConnection());
		// options.setBroker(EMBEDDED_KAFKA_CLUSTER.getBrokerList());
		options.setKafkaTopic(TOPIC);
		options.setStreaming(true);
		options.setCheckpointingInterval(1000L);
		options.setNumberOfExecutionRetries(5);
		options.setExecutionRetryDelay(3000L);
		options.setRunner(FlinkPipelineRunner.class);
		System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " + options.getBroker() + " "
				+ options.getGroup());

		props.setProperty("zookeeper.connect", options.getZookeeper());
		props.setProperty("bootstrap.servers", options.getBroker());
		props.setProperty("group.id", options.getGroup());
		props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
	}
	
	private void produceData1(){
		FlinkKafkaProducer08<User> kafkaSink =
	               new FlinkKafkaProducer08<>(TOPIC,  new AvroSerializationSchema(User.class), props);
	
	         Pipeline pipeline = Pipeline.create(options);
	         pipeline
	         .apply(Create.of(
	                 new User("Joe", 3, "red"),
	                 new User("Mary", 4, "blue"),
	                 new User("Mark", 1, "green"),
	                 new User("Julia", 5, "purple")))
	             //.withCoder(AvroCoder.of(User.class)))
	         .apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
	
	         pipeline.run();
	}
	
	private static void produceAvroData2(){
		TypeInformation<Test> info = TypeExtractor.getForClass(Test.class);
		
		TypeInformationSerializationSchema<Test> schema =new TypeInformationSerializationSchema<Test>(info, new ExecutionConfig());
		
		// AvroSerializationSchema schema = new AvroSerializationSchema(Test.class);
		
		FlinkKafkaProducer08<Test> kafkaSink =
	               new FlinkKafkaProducer08<>(TOPIC,  schema, props);
	
	         Pipeline pipeline = Pipeline.create(options);
	         pipeline
	         .apply(Create.of(
	                 new Test("Joe", 6))
	             .withCoder(AvroCoder.of(Test.class))).
	          apply(Write.to(UnboundedFlinkSink.of(kafkaSink)));
	
	         pipeline.run();
	}
	
	private static void produceSimpleData() throws IOException{
		  Properties props = new Properties();
	        props.put("metadata.broker.list", "localhost:9092");
	        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
	        props.put("request.required.acks", "1");
	        ProducerConfig config = new ProducerConfig(props);
	        Producer<String, byte[]> kafkaProducer = new Producer<String, byte[]>(config);
		
		
	        Test test = new Test("Don", 32);

            // serializing in avro format
            //DatumWriter<NetFlowPkt> datumWriter = new SpecificDatumWriter<NetFlowPkt>(NetFlowPkt
            //       .class);
            DatumWriter<Test> datumWriter = new SpecificDatumWriter<Test>(Test
                    .class);
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
            datumWriter.write(test, encoder);
            encoder.flush();
            byte[] serializedBytes = out.toByteArray();
            out.close();
            KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>("topic3",
                    serializedBytes);
            kafkaProducer.send(message);
        kafkaProducer.close();
	}
	
	public static void main(String args[]){
		
		setup(args);
		
		try {
			//produceSimpleData();
			produceAvroData2();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		
	}
	
	
	/**
	public void testDeSerialization() {
		try {
			TypeInformation<User> info = TypeExtractor.getForClass(User.class);
			
			TypeInformationSerializationSchema<User> schema =
					new TypeInformationSerializationSchema<User>(info, new ExecutionConfig());
			
			User[] types = {
					new User(72, new Date(763784523L), new Date(88234L)),
					new User(-1, new Date(11111111111111L)),
					new User(42),
					new User(17, new Date(222763784523L))
			};
			
			for (User val : types) {
				byte[] serialized = schema.serialize(val);
				User deser = schema.deserialize(serialized);
				assertEquals(val, deser);
			}
		}
		catch (Exception e) {
			e.printStackTrace();
			fail(e.getMessage());
		}
	}
	**/
	
	private static class AvroSerializationSchema<T> implements SerializationSchema {
		
		private final Class<T> avroType;

		private transient GenericDatumWriter writer;
		private transient BinaryEncoder encoder;
		private T obj;
		
		public AvroSerializationSchema(Class<T> avroType) {
			this.avroType = avroType;
		}

		@Override
		public byte[] serialize(Object elem) {
			
			obj = (T)elem;
			
			ensureInitialized();
			
			
			// TODO Auto-generated method stub
			//return SerializationUtils.serialize((Serializable) obj);
			
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            encoder = EncoderFactory.get().binaryEncoder(out, null);
            byte[] serializedBytes = null;
            try {
            	writer.write(obj, encoder);
				encoder.flush();
	            serializedBytes = out.toByteArray();
	            out.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
            
            
            return serializedBytes;
		}
		
		private void ensureInitialized() {
			
			
			if (writer == null) {
				if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
					
					writer = new SpecificDatumWriter<T>(avroType);
					
					//if(obj instanceof GenericRecord) {
					//	writer = new GenericDatumWriter(((GenericRecord)obj).getSchema());
					//}else {
					//	writer = new SpecificDatumWriter<T>(avroType);
					//}
				} else {
					writer = new ReflectDatumWriter<T>(avroType);
				}
			}
		}
		
	}
	
	

}
