Hi Kenneth / Reuven,

Actually I did a little more observation on the root cause on what might be
the actual problem here,

Here's what I found:


409s on Bigquery duplicate jobs might be the side effect of something else
related to schema,
What I have came to observe is that when I am giving DynamicDestinations as
below it works (*Working* / *Not Working*)

Basically, upon returning DestinationInfo object it somehow is not able to
get correct schema downstream into the pipeline while writing to bigquery
However if I return a normal string in getDestination() and construct the
actual schema in getSchema() then it works correctly

I have also shared DestinationInfo's coder (if there might be something
there)

Seems like this may be a bug in Beam ? I am trying to get past GCP product
/ engg teams but not much luch there :(

If this is bug, where I should report it,
May be I can give a fix if I have understood the actual problem correctly,

Now, as a side effect, what's happening is that, Job shows successful on
GCP Dataflow, but only a very small portion of data gets written to BQ,
At the least the pipeline should throw some error on schema if that's the
actual problem,

Reuven,
While working on fixing my write to BQ's I tried a combination of GCP
runner legacy / GCP runner V2 / older versions of beam 2.52.0, 2.53.0 ,
latest (2.60.0), but problem still persisted
Seems to me root cause is independent on anything on runner / versions,



*Working*

public class BatchDynamicDestination extends
DynamicDestinations<KV<String, TableRow>, String> {

    private static final Logger LOG =
LoggerFactory.getLogger(BatchDynamicDestination.class);

    private static final Map<String, TableSchema> SCHEMA_MAP = new HashMap<>();

    private final String datasetName;
    private final PCollectionView<Map<String, String>> schemaView;
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public BatchDynamicDestination(String datasetName,
PCollectionView<Map<String, String>> schemaView) {
        this.datasetName = datasetName;
        this.schemaView = schemaView;
    }

    @Override
    public java.util.List<PCollectionView<?>> getSideInputs() {
        // Register the schemaView as a side input
        return Collections.singletonList(schemaView);
    }


    @Override
    public String getDestination(ValueInSingleWindow<KV<String,
TableRow>> element) {
        String topic = element.getValue().getKey();
        return String.format("livewel-prod:%s.%s", datasetName, topic);
    }

    @Override
    public TableDestination getTable(String destination) {
        String tableName = destination.toLowerCase();
        return new TableDestination(tableName, "Table for category ");
    }

    @Override
    public TableSchema getSchema(String destination) {
        Map<String, String> schemasMap = sideInput(schemaView);
        LOG.info("destination in getSchema: {}", destination);
        StringBuilder sb = new StringBuilder();
        schemasMap.entrySet().stream().forEach(entry -> {
            sb.append(entry.getKey()).append(" : ");
        });
        LOG.info("destination in getSchema map: {}", sb.toString());
        String[] parts = destination.split("\\.");

        //assert parts.length > 1;

        if (parts.length < 2) {
            throw new RuntimeException("parts.length in
BatchDynamicDestinations is less than 2: " + destination);
        }
        String tableSchema = schemasMap.get(parts[1]);
        TableSchema tableSchemaObj;
        try {
            tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema,
TableSchema.class);  // Deserialize JSON to TableSchema
        } catch (IOException e) {
            throw new RuntimeException("Error parsing schema JSON", e);
        }

        return tableSchemaObj;
    }

}



*Not working*

public class StreamDynamicDestination extends
DynamicDestinations<BigQueryRow, DestinationInfo> {

    private final String bigQueryDataset;

    public StreamDynamicDestination(String bigQueryDataset){
        this.bigQueryDataset = bigQueryDataset;
    }

    @Override
    public Coder<DestinationInfo> getDestinationCoder() {
        // Using SerializableCoder because DestinationInfo implements
Serializable
        return DestinationInfoCoder.of();
    }

    @Override
    public DestinationInfo
getDestination(ValueInSingleWindow<BigQueryRow> element) {
        String tablePrefix = "testvitalsdb";
        BigQueryRow bigQueryRow = element.getValue();
        assert bigQueryRow != null;
        TableSchema tableSchema = bigQueryRow.generateSchemaFromRow();
        //return new DestinationInfo("livewel-prod", dataset,
String.format("%s_%d", tablePrefix, BigQueryRow.getTimeStampMod()),
tableSchema);
        return new DestinationInfo("livewel-prod", bigQueryDataset,
tablePrefix, tableSchema);
    }

    @Override
    public TableDestination getTable(DestinationInfo destination) {
        return new TableDestination(destination.toTableSpec(),
"Dynamically generated table");
    }

    @Override
    public TableSchema getSchema(DestinationInfo destination) {
        return destination.getTableSchema();
    }

}



*My DestinationInfoCoder*

public class DestinationInfoCoder extends AtomicCoder<DestinationInfo> {

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void encode(DestinationInfo value, OutputStream outStream)
throws CoderException, IOException {
        // Encode the String fields
        try {
            StringUtf8Coder.of().encode(value.getProject(), outStream);
            StringUtf8Coder.of().encode(value.getDataset(), outStream);
            StringUtf8Coder.of().encode(value.getTable(), outStream);

            // Encode TableSchema as JSON string
            String schemaJson =
objectMapper.writeValueAsString(value.getTableSchema());
            StringUtf8Coder.of().encode(schemaJson, outStream);
        }catch (Exception e) {
            throw new CoderException("Failed to decode project field", e);
        }
    }

    @Override
    public DestinationInfo decode(InputStream inStream) throws
CoderException, IOException {
        // Decode the String fields
        String project;
        String dataset;
        String table;
        TableSchema tableSchema;
        try{
            project = StringUtf8Coder.of().decode(inStream);
            dataset = StringUtf8Coder.of().decode(inStream);
            table = StringUtf8Coder.of().decode(inStream);

            // Decode TableSchema from JSON string
            String schemaJson = StringUtf8Coder.of().decode(inStream);
            tableSchema = objectMapper.readValue(schemaJson, TableSchema.class);
        } catch (Exception e) {
            throw new CoderException("Failed to decode project field", e);
        }
        return new DestinationInfo(project, dataset, table, tableSchema);
    }

    public static DestinationInfoCoder of() {
        return new DestinationInfoCoder();
    }
}


On Fri, Oct 25, 2024 at 9:50 PM Reuven Lax <re...@google.com> wrote:

> Are you using runner v2? The issue you linked to implies that this only
> happened on runner v2.
>
> On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> Hi Pranjal,
>>
>> If there is a bug in Beam, this is a good list to contact. If there is a
>> problem with a GCP service, then GCP support is better.
>>
>> I see the code you shared, but what error or difficulty are you
>> encountering?
>>
>> Kenn
>>
>> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote:
>>
>>> Hi Kenneth / Yi Hun,
>>>
>>> I have been grappling since past few days to solve for a pipeline which
>>> runs as batch pipeline and loads data from GCS to BigQuery,
>>>
>>>
>>> While striving to find solution I found out similar issue and have
>>> posted what I am observing her in github issues link (
>>> https://github.com/apache/beam/issues/28219),
>>>
>>> We are using GCP Dataflow runner  as such I have written to GCP
>>> support but not much luck there,
>>>
>>> I will try to briefly explain my problem I am facing here again ,
>>>
>>> Really appreciate any leads on how I can resolve this issue,
>>>
>>>
>>> Trying to use dynamic destinations to load data from GCS to Bigquery to
>>> multiple tables / multiple schemas ,
>>>
>>>
>>> Tried to put down all required classes I am using here,
>>>
>>>
>>> PCollection<KV<String, TableRow>> kvRows = decompressedLines
>>>         .apply("Convert To BigQueryRow", ParDo.of(new 
>>> Utility.ConvertToBigQueryRow(ekaUUID)));
>>>
>>> // Group by TableId to manage different schemas per table type
>>>
>>> PCollectionView<Map<String, String>> schemaView = 
>>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows);
>>>
>>> WriteResult result = kvRows.apply("WriteToBigQuery",
>>>         BigQueryIO.<KV<String, TableRow>>write()
>>>                 .to(new BatchDynamicDestination(bigqueryDatasetName, 
>>> schemaView))
>>>                 
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>                 
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>                 .withFormatFunction(stringTableRowKV -> stringTableRowKV != 
>>> null ? stringTableRowKV.getValue() : null)
>>> );
>>>
>>>
>>> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, 
>>> KV<String, TableRow>> {
>>>
>>>     private static final Logger LOG = 
>>> LoggerFactory.getLogger(ConvertToBigQueryRow.class);
>>>
>>>     String uuidKeyName;
>>>
>>>     public ConvertToBigQueryRow(String uuidKeyName){
>>>         this.uuidKeyName = uuidKeyName;
>>>     }
>>>
>>>     @ProcessElement
>>>     public void processElement(ProcessContext c) throws 
>>> JsonProcessingException {
>>>         ObjectMapper mapper = new ObjectMapper();
>>>         String value = Objects.requireNonNull(c.element().getValue());
>>>         String topic = Objects.requireNonNull(c.element().getKey());
>>>         // System.out.println("Data in ProcessElem   " + value);
>>>         // Parse the entire value as a JSON tree
>>>         JsonNode rootNode = mapper.readTree(value);
>>>
>>>         // Extract the "after" field as a JSON node (not as a string)
>>>         JsonNode afterNode = rootNode.get("after");
>>>
>>>         // System.out.println("Data in ProcessElem  after  " + 
>>> afterNode.asText());
>>>         // Check if "after" field exists and is not null
>>>         if (!afterNode.isNull()) {
>>>
>>>             String afterJsonString = afterNode.asText(); // Extract the 
>>> string
>>>             // representation for json node for after
>>>             JsonNode afterJsonNode = mapper.readTree(afterJsonString); // 
>>> Parse the string into a JsonNode
>>>
>>>             Map<String, Object> afterMap = new HashMap<>();
>>>             try {
>>>                 afterMap = mapper.convertValue(afterNode, new 
>>> TypeReference<Map<String, Object>>() {
>>>                 });
>>>             } catch (IllegalArgumentException e) {
>>>                 afterMap = mapper.convertValue(afterJsonNode, new 
>>> TypeReference<Map<String, Object>>() {
>>>                 });
>>>             }
>>>
>>>             if (afterMap != null) {
>>>                 // System.out.println("Data in ProcessElem  afterMap  " + 
>>> afterMap);
>>>                 TableRow row = new TableRow();
>>>                 for (Map.Entry<String, Object> entry : afterMap.entrySet()) 
>>> {
>>>                     row.set(entry.getKey(), entry.getValue());
>>>                 }
>>>
>>>                 // Insert eka UUID to our table
>>>                 UUID uuid = UUID.randomUUID();
>>>                 row.set(this.uuidKeyName, uuid.toString());
>>>
>>>
>>>                 LOG.info("T: {}D: {}", topic, row.toString());
>>>                 c.output(KV.of(topic, row));
>>>             }else{
>>>                 LOG.error("Data in ProcessElem  afterMap mostly null ");
>>>             }
>>>         } else {
>>>             LOG.error("The 'after' field is null.");
>>>         }
>>>     }
>>> }
>>>
>>>
>>>
>>> public static class GetSchemaViewFromBigQueryRows {
>>>
>>>     private static final Logger LOG = 
>>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class);
>>>     private static final Map<String, TableSchema> schemaCache = new 
>>> ConcurrentHashMap<>();
>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>
>>>     private static String schemaToJson(TableSchema schema) {
>>>         try {
>>>             return OBJECT_MAPPER.writeValueAsString(schema);
>>>         } catch (JsonProcessingException e) {
>>>             throw new RuntimeException("Error converting schema to JSON", 
>>> e);
>>>         }
>>>     }
>>>
>>>     // Method to generate a schema for the given type
>>>     public static String getSchemaForType(String type, TableRow tableRow) {
>>>
>>>         TableSchema schema = schemaCache.get(type);
>>>
>>>         if (schema == null) {
>>>             schema = new TableSchema().setFields(new ArrayList<>());
>>>             schemaCache.put(type, schema);
>>>         }
>>>
>>>         // old fields
>>>         // Create a set of existing field names to avoid duplicates
>>>         Set<String> existingFieldNames = new HashSet<>();
>>>         for (TableFieldSchema field : schema.getFields()) {
>>>             existingFieldNames.add(field.getName());
>>>         }
>>>
>>>         // set to new fields
>>>         TableSchema finalSchema = schema;
>>>         tableRow.forEach((fieldName, fieldValue) -> {
>>>             if (!existingFieldNames.contains(fieldName)) {
>>>                 TableFieldSchema fieldSchema = new TableFieldSchema()
>>>                         .setName(fieldName)
>>>                         .setType(determineFieldType(fieldValue));
>>>                 finalSchema.getFields().add(fieldSchema);
>>>             }
>>>         });
>>>
>>>         // TODO: Add cases for handling schema conflicts also here
>>>
>>>         // update final schema in map
>>>         schemaCache.put(type, finalSchema);
>>>         return schemaToJson(finalSchema);
>>>     }
>>>
>>>     // Method to create a view of schemas as a PCollectionView<Map<String, 
>>> TableSchema>>
>>>     public static PCollectionView<Map<String, String>> createSchemasView(
>>>             PCollection<KV<String, TableRow>> input) {
>>>
>>>         // Map InputData to KV<String, TableSchema>
>>>         PCollection<KV<String, String>> schemas = input.apply("MapElements 
>>> for converting KV",
>>>                 
>>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), 
>>> TypeDescriptors.strings()))
>>>                         .via(data -> {
>>>                             assert data != null;
>>>                             return KV.of(data.getKey(), 
>>> getSchemaForType(data.getKey(), data.getValue()));
>>>                         })
>>>         );
>>>
>>>         // Deduplicate by key (topic) using Combine.perKey() and select any 
>>> one schema for each key
>>>         PCollection<KV<String, String>> uniqueSchemas = schemas
>>>                 .apply("DeduplicateSchemas", Combine.perKey((schema1, 
>>> schema2) -> schema1));  // Keep one schema per topic
>>>
>>>         // Use View.asMap() to create a side input for schemas
>>>         return uniqueSchemas.apply("ToSchemaView", View.asMap());
>>>     }
>>> }
>>>
>>> public class BatchDynamicDestination extends DynamicDestinations<KV<String, 
>>> TableRow>, DestinationInfo> {
>>>
>>>     private static final Logger LOG = 
>>> LoggerFactory.getLogger(BatchDynamicDestination.class);
>>>
>>>     private final String datasetName;
>>>     private final PCollectionView<Map<String, String>> schemaView;
>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>
>>>     public BatchDynamicDestination(String datasetName, 
>>> PCollectionView<Map<String, String>> schemaView) {
>>>         this.datasetName = datasetName;
>>>         this.schemaView = schemaView;
>>>     }
>>>
>>>     @Override
>>>     public Coder<DestinationInfo> getDestinationCoder() {
>>>         // Using SerializableCoder because DestinationInfo implements 
>>> Serializable
>>>         return DestinationInfoCoder.of();
>>>     }
>>>
>>>     @Override
>>>     public java.util.List<PCollectionView<?>> getSideInputs() {
>>>         // Register the schemaView as a side input
>>>         return Collections.singletonList(schemaView);
>>>     }
>>>
>>>
>>>     @Override
>>>     public DestinationInfo getDestination(ValueInSingleWindow<KV<String, 
>>> TableRow>> element) {
>>>
>>>
>>>         Map<String, String> schemas = sideInput(schemaView);
>>>         String topic = element.getValue().getKey();
>>>
>>>         String tableSchema = schemas.get(topic);
>>>         if (tableSchema == null) {
>>>             throw new RuntimeException("Schema not found for topic: " + 
>>> topic);
>>>         }
>>>
>>>         TableSchema tableSchemaObj;
>>>         try {
>>>             tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, 
>>> TableSchema.class);  // Deserialize JSON to TableSchema
>>>         } catch (IOException e) {
>>>             throw new RuntimeException("Error parsing schema JSON", e);
>>>         }
>>>
>>>         LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , 
>>> datasetName, topic, tableSchema);
>>>
>>>         return new DestinationInfo("livewel-prod", datasetName, topic, 
>>> tableSchemaObj);
>>>     }
>>>
>>>     @Override
>>>     public TableDestination getTable(DestinationInfo destination) {
>>>         return new TableDestination(destination.toTableSpec(), "Dynamically 
>>> generated table");
>>>     }
>>>
>>>     @Override
>>>     public TableSchema getSchema(DestinationInfo destination) {
>>>         return destination.getTableSchema();
>>>     }
>>>
>>>
>>> }
>>>
>>>

Reply via email to