
import static org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils.toBeamSchema;

import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.iceberg.DynamicDestinations;
import org.apache.beam.sdk.io.iceberg.IcebergDestination;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.TableIdentifier;

public class DynamicIcebergDestinations implements DynamicDestinations {
    private final String databaseName;
    private final DataflowPipelineConfig config;

    public DynamicIcebergDestinations(String databaseName, DataflowPipelineConfig config) {
        this.databaseName = databaseName;
        this.config = config;
    }

    public String getDestination(Row element) {
        System.out.println("DynamicIcebergDestinations getDestination ");

        // Determine the destination table based on the "eventType" field in the Row
        System.out.println("getDestination with element.getString(\"eventType\");: " + element.getString("eventType"));
        return element.getString("eventType");
    }

    @Override
    public IcebergDestination instantiateDestination(String destination) {
        System.out.println("DynamicIcebergDestinations instantiateDestination ");

        // Create the Iceberg destination for the specified table
        String tableName = config.getConfig().getProto().getEventToTableMap().get(destination);
        TableIdentifier tableId = TableIdentifier.of(databaseName, tableName);
        return IcebergDestination.builder()
            .setTableIdentifier(tableId)
            .setFileFormat(FileFormat.PARQUET)
            .build();
    }

    @Override
    public org.apache.beam.sdk.schemas.Schema getDataSchema() {
        // Return a minimal schema that includes only universally present fields
        System.out.println("DynamicIcebergDestinations: Returning minimal schema for validation.");
        return org.apache.beam.sdk.schemas.Schema.builder()
            .addStringField("id")
            .addDateTimeField("ts")
            .build();
    }

    public org.apache.beam.sdk.schemas.Schema getDataSchema(String destination) {
        System.out.println("DynamicIcebergDestinations getDataSchema ");

        // Retrieve the Avro schema for the destination
        org.apache.avro.Schema avroSchema = config.getAvroSchema(destination);
        if (avroSchema == null) {
            throw new IllegalArgumentException("No schema found for destination: " + destination);
        }

        // Convert the Avro schema to Beam schema
        org.apache.beam.sdk.schemas.Schema originalSchema = AvroUtils.toBeamSchema(avroSchema);

        // Create a new schema builder and add all existing fields
        org.apache.beam.sdk.schemas.Schema.Builder schemaBuilder = org.apache.beam.sdk.schemas.Schema.builder();
        originalSchema.getFields().forEach(field -> schemaBuilder.addField(field));

        // Add the eventType field
        schemaBuilder.addField(
            org.apache.beam.sdk.schemas.Schema.Field.of(
                "eventType",
                org.apache.beam.sdk.schemas.Schema.FieldType.STRING
            ).withDescription("Event type identifier")
        );

        // Build the final schema
        org.apache.beam.sdk.schemas.Schema beamSchema = schemaBuilder.build();

        System.out.println("getDataSchema with destination/eventType: " + destination + ", beamSchema: " + beamSchema);
        return beamSchema;
    }


    private org.apache.avro.Schema getNonNullType(org.apache.avro.Schema schema) {
        System.out.println("DynamicIcebergDestinations getNonNullType ");
        if (schema.getType() == org.apache.avro.Schema.Type.UNION) {
            return schema.getTypes().stream()
                .filter(type -> type.getType() != org.apache.avro.Schema.Type.NULL)
                .findFirst()
                .orElse(null);
        }
        return schema;
    }


    @Override
    public Row getData(Row element) {
        System.out.println("DynamicIcebergDestinations getData ");
        System.out.println("getData: " + element);

        // Return the Row itself as the data
        return element;
    }

    @Override
    public String getTableStringIdentifier(ValueInSingleWindow<Row> element) {
        System.out.println("DynamicIcebergDestinations getTableStringIdentifier ");

        // Use the "eventType" field to determine the table identifier
        String eventType = element.getValue().getString("eventType");
        System.out.println("DynamicIcebergDestinations with eventType: " + eventType);
        String tableName = config.getConfig().getProto().getEventToTableMap().get(eventType);
        System.out.println("DynamicIcebergDestinations tableName: " + tableName);
        return tableName;
    }

    public org.apache.beam.sdk.schemas.Schema getDynamicSchema(String destination) {
        System.out.println("DynamicIcebergDestinations getDynamicSchema ");

        // Retrieve the Avro schema for the given destination
        org.apache.avro.Schema avroSchema = config.getAvroSchema(destination);
        if (avroSchema == null) {
            throw new IllegalArgumentException("No schema found for destination: " + destination);
        }
        // Convert Avro schema to Beam schema
        return toBeamSchema(avroSchema);
    }
}
