/**
 * 
 */
package com.xyz.topology.netflow.beam;

import java.util.Properties;

import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.joda.time.Duration;

import com.xyz.schemas.Test;

public class BeamKafkaFlinkAvroConsumerTest {

	private static final String TOPIC = "topic3";
	private static BeamKafkaOptions options;

	private static Properties props = new Properties();

	public static void setup(String[] args) {
		PipelineOptionsFactory.register(BeamKafkaOptions.class);
		options = PipelineOptionsFactory.fromArgs(args).as(BeamKafkaOptions.class);
		options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds");

		// options.setZookeeper(EMBEDDED_ZOOKEEPER.getConnection());
		// options.setBroker(EMBEDDED_KAFKA_CLUSTER.getBrokerList());
		options.setKafkaTopic(TOPIC);
		options.setStreaming(true);
		options.setCheckpointingInterval(1000L);
		options.setNumberOfExecutionRetries(5);
		options.setExecutionRetryDelay(3000L);
		options.setRunner(FlinkPipelineRunner.class);
		System.out.println(options.getKafkaTopic() + " " + options.getZookeeper() + " " + options.getBroker() + " "
				+ options.getGroup());

		props.setProperty("zookeeper.connect", options.getZookeeper());
		props.setProperty("bootstrap.servers", options.getBroker());
		props.setProperty("group.id", options.getGroup());
	}

	public static UnboundedSource<Test, CheckpointMark> consumeMessages() {
		// AvroDeserializationSchema schema = new
		// AvroDeserializationSchema(Test.class);

		TypeInformation<Test> info = TypeExtractor.getForClass(Test.class);
		TypeInformationSerializationSchema<Test> schema = new TypeInformationSerializationSchema<Test>(info,
				new ExecutionConfig());

		FlinkKafkaConsumer08<Test> kafkaConsumer = new FlinkKafkaConsumer08<>(TOPIC, schema, props);

		return UnboundedFlinkSource.of(kafkaConsumer);
	}

	public static void main(String args[]) {

		setup(args);

		Pipeline pipeline = Pipeline.create(options);

		PCollection<Test> users = pipeline.apply(Read.named("StreamingWordCount").from(consumeMessages()))
				.apply(Window.<Test> into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
						.triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
						.discardingFiredPanes());

		// users.apply(ConsoleIO.Write.create());

		PCollection<Long> counts = users.apply(Count.globally());
		// .apply(ConsoleIO.Write.create());
		// .apply(TextIO.Write.to("outputKafka.txt"));

		System.out.println("*****************  " + counts);

		PipelineResult result = pipeline.run();

		System.out.println("*****************  " + result.toString());

	}

	public static class FormatAsStringFn extends DoFn<Test, String> {
		@Override
		public void processElement(ProcessContext c) {
			CharSequence row = c.element().getUname();
			System.out.println("$$$$$$$$$$$$$$$$");
			System.out.println(row);
			c.output(row.toString());
		}
	}

}

class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

	private final Class<T> avroType;

	private transient DatumReader<T> reader;
	private transient BinaryDecoder decoder;

	public AvroDeserializationSchema(Class<T> avroType) {
		this.avroType = avroType;
	}

	@Override
	public T deserialize(byte[] message) {
		ensureInitialized();

		try {
			decoder = DecoderFactory.get().binaryDecoder(message, decoder);
			return reader.read(null, decoder);
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	@Override
	public boolean isEndOfStream(T nextElement) {
		return false;
	}

	@Override
	public TypeInformation<T> getProducedType() {
		return TypeExtractor.getForClass(avroType);
	}

	private void ensureInitialized() {
		if (reader == null) {
			if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
				reader = new SpecificDatumReader<T>(avroType);
			} else {
				reader = new ReflectDatumReader<T>(avroType);
			}
		}
	}
}
