One more thing - in addition to a deterministic coder, DestinationInfo
needs to have a proper equals and hashTo method defined.

On Fri, Oct 25, 2024 at 10:30 PM Reuven Lax <re...@google.com> wrote:

> Ah, I suspect I know what's going on. What you're doing won't work unless
> the DestinationInfoCoder is a deterministic coder (i.e. two equivalent
> objects map to the same encoded representation). A json encoding often
> isn't deterministic (for one reason, it's legal to encode the json fields
> in any order).
>
> On Fri, Oct 25, 2024 at 2:22 PM Pranjal Pandit <pran...@eka.care> wrote:
>
>> Hi Kenneth / Reuven,
>>
>> Actually I did a little more observation on the root cause on what might
>> be the actual problem here,
>>
>> Here's what I found:
>>
>>
>> 409s on Bigquery duplicate jobs might be the side effect of something
>> else related to schema,
>> What I have came to observe is that when I am giving DynamicDestinations
>> as below it works (*Working* / *Not Working*)
>>
>> Basically, upon returning DestinationInfo object it somehow is not able
>> to get correct schema downstream into the pipeline while writing to
>> bigquery
>> However if I return a normal string in getDestination() and construct the
>> actual schema in getSchema() then it works correctly
>>
>> I have also shared DestinationInfo's coder (if there might be something
>> there)
>>
>> Seems like this may be a bug in Beam ? I am trying to get past GCP
>> product / engg teams but not much luch there :(
>>
>> If this is bug, where I should report it,
>> May be I can give a fix if I have understood the actual problem
>> correctly,
>>
>> Now, as a side effect, what's happening is that, Job shows successful on
>> GCP Dataflow, but only a very small portion of data gets written to BQ,
>> At the least the pipeline should throw some error on schema if that's the
>> actual problem,
>>
>> Reuven,
>> While working on fixing my write to BQ's I tried a combination of GCP
>> runner legacy / GCP runner V2 / older versions of beam 2.52.0, 2.53.0 ,
>> latest (2.60.0), but problem still persisted
>> Seems to me root cause is independent on anything on runner / versions,
>>
>>
>>
>> *Working*
>>
>> public class BatchDynamicDestination extends DynamicDestinations<KV<String, 
>> TableRow>, String> {
>>
>>     private static final Logger LOG = 
>> LoggerFactory.getLogger(BatchDynamicDestination.class);
>>
>>     private static final Map<String, TableSchema> SCHEMA_MAP = new 
>> HashMap<>();
>>
>>     private final String datasetName;
>>     private final PCollectionView<Map<String, String>> schemaView;
>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>
>>     public BatchDynamicDestination(String datasetName, 
>> PCollectionView<Map<String, String>> schemaView) {
>>         this.datasetName = datasetName;
>>         this.schemaView = schemaView;
>>     }
>>
>>     @Override
>>     public java.util.List<PCollectionView<?>> getSideInputs() {
>>         // Register the schemaView as a side input
>>         return Collections.singletonList(schemaView);
>>     }
>>
>>
>>     @Override
>>     public String getDestination(ValueInSingleWindow<KV<String, TableRow>> 
>> element) {
>>         String topic = element.getValue().getKey();
>>         return String.format("livewel-prod:%s.%s", datasetName, topic);
>>     }
>>
>>     @Override
>>     public TableDestination getTable(String destination) {
>>         String tableName = destination.toLowerCase();
>>         return new TableDestination(tableName, "Table for category ");
>>     }
>>
>>     @Override
>>     public TableSchema getSchema(String destination) {
>>         Map<String, String> schemasMap = sideInput(schemaView);
>>         LOG.info("destination in getSchema: {}", destination);
>>         StringBuilder sb = new StringBuilder();
>>         schemasMap.entrySet().stream().forEach(entry -> {
>>             sb.append(entry.getKey()).append(" : ");
>>         });
>>         LOG.info("destination in getSchema map: {}", sb.toString());
>>         String[] parts = destination.split("\\.");
>>
>>         //assert parts.length > 1;
>>
>>         if (parts.length < 2) {
>>             throw new RuntimeException("parts.length in 
>> BatchDynamicDestinations is less than 2: " + destination);
>>         }
>>         String tableSchema = schemasMap.get(parts[1]);
>>         TableSchema tableSchemaObj;
>>         try {
>>             tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, 
>> TableSchema.class);  // Deserialize JSON to TableSchema
>>         } catch (IOException e) {
>>             throw new RuntimeException("Error parsing schema JSON", e);
>>         }
>>
>>         return tableSchemaObj;
>>     }
>>
>> }
>>
>>
>>
>> *Not working*
>>
>> public class StreamDynamicDestination extends 
>> DynamicDestinations<BigQueryRow, DestinationInfo> {
>>
>>     private final String bigQueryDataset;
>>
>>     public StreamDynamicDestination(String bigQueryDataset){
>>         this.bigQueryDataset = bigQueryDataset;
>>     }
>>
>>     @Override
>>     public Coder<DestinationInfo> getDestinationCoder() {
>>         // Using SerializableCoder because DestinationInfo implements 
>> Serializable
>>         return DestinationInfoCoder.of();
>>     }
>>
>>     @Override
>>     public DestinationInfo getDestination(ValueInSingleWindow<BigQueryRow> 
>> element) {
>>         String tablePrefix = "testvitalsdb";
>>         BigQueryRow bigQueryRow = element.getValue();
>>         assert bigQueryRow != null;
>>         TableSchema tableSchema = bigQueryRow.generateSchemaFromRow();
>>         //return new DestinationInfo("livewel-prod", dataset, 
>> String.format("%s_%d", tablePrefix, BigQueryRow.getTimeStampMod()), 
>> tableSchema);
>>         return new DestinationInfo("livewel-prod", bigQueryDataset, 
>> tablePrefix, tableSchema);
>>     }
>>
>>     @Override
>>     public TableDestination getTable(DestinationInfo destination) {
>>         return new TableDestination(destination.toTableSpec(), "Dynamically 
>> generated table");
>>     }
>>
>>     @Override
>>     public TableSchema getSchema(DestinationInfo destination) {
>>         return destination.getTableSchema();
>>     }
>>
>> }
>>
>>
>>
>> *My DestinationInfoCoder*
>>
>> public class DestinationInfoCoder extends AtomicCoder<DestinationInfo> {
>>
>>     private static final ObjectMapper objectMapper = new ObjectMapper();
>>
>>     @Override
>>     public void encode(DestinationInfo value, OutputStream outStream) throws 
>> CoderException, IOException {
>>         // Encode the String fields
>>         try {
>>             StringUtf8Coder.of().encode(value.getProject(), outStream);
>>             StringUtf8Coder.of().encode(value.getDataset(), outStream);
>>             StringUtf8Coder.of().encode(value.getTable(), outStream);
>>
>>             // Encode TableSchema as JSON string
>>             String schemaJson = 
>> objectMapper.writeValueAsString(value.getTableSchema());
>>             StringUtf8Coder.of().encode(schemaJson, outStream);
>>         }catch (Exception e) {
>>             throw new CoderException("Failed to decode project field", e);
>>         }
>>     }
>>
>>     @Override
>>     public DestinationInfo decode(InputStream inStream) throws 
>> CoderException, IOException {
>>         // Decode the String fields
>>         String project;
>>         String dataset;
>>         String table;
>>         TableSchema tableSchema;
>>         try{
>>             project = StringUtf8Coder.of().decode(inStream);
>>             dataset = StringUtf8Coder.of().decode(inStream);
>>             table = StringUtf8Coder.of().decode(inStream);
>>
>>             // Decode TableSchema from JSON string
>>             String schemaJson = StringUtf8Coder.of().decode(inStream);
>>             tableSchema = objectMapper.readValue(schemaJson, 
>> TableSchema.class);
>>         } catch (Exception e) {
>>             throw new CoderException("Failed to decode project field", e);
>>         }
>>         return new DestinationInfo(project, dataset, table, tableSchema);
>>     }
>>
>>     public static DestinationInfoCoder of() {
>>         return new DestinationInfoCoder();
>>     }
>> }
>>
>>
>> On Fri, Oct 25, 2024 at 9:50 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Are you using runner v2? The issue you linked to implies that this only
>>> happened on runner v2.
>>>
>>> On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> Hi Pranjal,
>>>>
>>>> If there is a bug in Beam, this is a good list to contact. If there is
>>>> a problem with a GCP service, then GCP support is better.
>>>>
>>>> I see the code you shared, but what error or difficulty are you
>>>> encountering?
>>>>
>>>> Kenn
>>>>
>>>> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care>
>>>> wrote:
>>>>
>>>>> Hi Kenneth / Yi Hun,
>>>>>
>>>>> I have been grappling since past few days to solve for a pipeline
>>>>> which runs as batch pipeline and loads data from GCS to BigQuery,
>>>>>
>>>>>
>>>>> While striving to find solution I found out similar issue and have
>>>>> posted what I am observing her in github issues link (
>>>>> https://github.com/apache/beam/issues/28219),
>>>>>
>>>>> We are using GCP Dataflow runner  as such I have written to GCP
>>>>> support but not much luck there,
>>>>>
>>>>> I will try to briefly explain my problem I am facing here again ,
>>>>>
>>>>> Really appreciate any leads on how I can resolve this issue,
>>>>>
>>>>>
>>>>> Trying to use dynamic destinations to load data from GCS to Bigquery
>>>>> to multiple tables / multiple schemas ,
>>>>>
>>>>>
>>>>> Tried to put down all required classes I am using here,
>>>>>
>>>>>
>>>>> PCollection<KV<String, TableRow>> kvRows = decompressedLines
>>>>>         .apply("Convert To BigQueryRow", ParDo.of(new 
>>>>> Utility.ConvertToBigQueryRow(ekaUUID)));
>>>>>
>>>>> // Group by TableId to manage different schemas per table type
>>>>>
>>>>> PCollectionView<Map<String, String>> schemaView = 
>>>>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows);
>>>>>
>>>>> WriteResult result = kvRows.apply("WriteToBigQuery",
>>>>>         BigQueryIO.<KV<String, TableRow>>write()
>>>>>                 .to(new BatchDynamicDestination(bigqueryDatasetName, 
>>>>> schemaView))
>>>>>                 
>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>                 
>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>>>                 .withFormatFunction(stringTableRowKV -> stringTableRowKV 
>>>>> != null ? stringTableRowKV.getValue() : null)
>>>>> );
>>>>>
>>>>>
>>>>> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, 
>>>>> KV<String, TableRow>> {
>>>>>
>>>>>     private static final Logger LOG = 
>>>>> LoggerFactory.getLogger(ConvertToBigQueryRow.class);
>>>>>
>>>>>     String uuidKeyName;
>>>>>
>>>>>     public ConvertToBigQueryRow(String uuidKeyName){
>>>>>         this.uuidKeyName = uuidKeyName;
>>>>>     }
>>>>>
>>>>>     @ProcessElement
>>>>>     public void processElement(ProcessContext c) throws 
>>>>> JsonProcessingException {
>>>>>         ObjectMapper mapper = new ObjectMapper();
>>>>>         String value = Objects.requireNonNull(c.element().getValue());
>>>>>         String topic = Objects.requireNonNull(c.element().getKey());
>>>>>         // System.out.println("Data in ProcessElem   " + value);
>>>>>         // Parse the entire value as a JSON tree
>>>>>         JsonNode rootNode = mapper.readTree(value);
>>>>>
>>>>>         // Extract the "after" field as a JSON node (not as a string)
>>>>>         JsonNode afterNode = rootNode.get("after");
>>>>>
>>>>>         // System.out.println("Data in ProcessElem  after  " + 
>>>>> afterNode.asText());
>>>>>         // Check if "after" field exists and is not null
>>>>>         if (!afterNode.isNull()) {
>>>>>
>>>>>             String afterJsonString = afterNode.asText(); // Extract the 
>>>>> string
>>>>>             // representation for json node for after
>>>>>             JsonNode afterJsonNode = mapper.readTree(afterJsonString); // 
>>>>> Parse the string into a JsonNode
>>>>>
>>>>>             Map<String, Object> afterMap = new HashMap<>();
>>>>>             try {
>>>>>                 afterMap = mapper.convertValue(afterNode, new 
>>>>> TypeReference<Map<String, Object>>() {
>>>>>                 });
>>>>>             } catch (IllegalArgumentException e) {
>>>>>                 afterMap = mapper.convertValue(afterJsonNode, new 
>>>>> TypeReference<Map<String, Object>>() {
>>>>>                 });
>>>>>             }
>>>>>
>>>>>             if (afterMap != null) {
>>>>>                 // System.out.println("Data in ProcessElem  afterMap  " + 
>>>>> afterMap);
>>>>>                 TableRow row = new TableRow();
>>>>>                 for (Map.Entry<String, Object> entry : 
>>>>> afterMap.entrySet()) {
>>>>>                     row.set(entry.getKey(), entry.getValue());
>>>>>                 }
>>>>>
>>>>>                 // Insert eka UUID to our table
>>>>>                 UUID uuid = UUID.randomUUID();
>>>>>                 row.set(this.uuidKeyName, uuid.toString());
>>>>>
>>>>>
>>>>>                 LOG.info("T: {}D: {}", topic, row.toString());
>>>>>                 c.output(KV.of(topic, row));
>>>>>             }else{
>>>>>                 LOG.error("Data in ProcessElem  afterMap mostly null ");
>>>>>             }
>>>>>         } else {
>>>>>             LOG.error("The 'after' field is null.");
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> public static class GetSchemaViewFromBigQueryRows {
>>>>>
>>>>>     private static final Logger LOG = 
>>>>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class);
>>>>>     private static final Map<String, TableSchema> schemaCache = new 
>>>>> ConcurrentHashMap<>();
>>>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>>>
>>>>>     private static String schemaToJson(TableSchema schema) {
>>>>>         try {
>>>>>             return OBJECT_MAPPER.writeValueAsString(schema);
>>>>>         } catch (JsonProcessingException e) {
>>>>>             throw new RuntimeException("Error converting schema to JSON", 
>>>>> e);
>>>>>         }
>>>>>     }
>>>>>
>>>>>     // Method to generate a schema for the given type
>>>>>     public static String getSchemaForType(String type, TableRow tableRow) 
>>>>> {
>>>>>
>>>>>         TableSchema schema = schemaCache.get(type);
>>>>>
>>>>>         if (schema == null) {
>>>>>             schema = new TableSchema().setFields(new ArrayList<>());
>>>>>             schemaCache.put(type, schema);
>>>>>         }
>>>>>
>>>>>         // old fields
>>>>>         // Create a set of existing field names to avoid duplicates
>>>>>         Set<String> existingFieldNames = new HashSet<>();
>>>>>         for (TableFieldSchema field : schema.getFields()) {
>>>>>             existingFieldNames.add(field.getName());
>>>>>         }
>>>>>
>>>>>         // set to new fields
>>>>>         TableSchema finalSchema = schema;
>>>>>         tableRow.forEach((fieldName, fieldValue) -> {
>>>>>             if (!existingFieldNames.contains(fieldName)) {
>>>>>                 TableFieldSchema fieldSchema = new TableFieldSchema()
>>>>>                         .setName(fieldName)
>>>>>                         .setType(determineFieldType(fieldValue));
>>>>>                 finalSchema.getFields().add(fieldSchema);
>>>>>             }
>>>>>         });
>>>>>
>>>>>         // TODO: Add cases for handling schema conflicts also here
>>>>>
>>>>>         // update final schema in map
>>>>>         schemaCache.put(type, finalSchema);
>>>>>         return schemaToJson(finalSchema);
>>>>>     }
>>>>>
>>>>>     // Method to create a view of schemas as a 
>>>>> PCollectionView<Map<String, TableSchema>>
>>>>>     public static PCollectionView<Map<String, String>> createSchemasView(
>>>>>             PCollection<KV<String, TableRow>> input) {
>>>>>
>>>>>         // Map InputData to KV<String, TableSchema>
>>>>>         PCollection<KV<String, String>> schemas = 
>>>>> input.apply("MapElements for converting KV",
>>>>>                 
>>>>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), 
>>>>> TypeDescriptors.strings()))
>>>>>                         .via(data -> {
>>>>>                             assert data != null;
>>>>>                             return KV.of(data.getKey(), 
>>>>> getSchemaForType(data.getKey(), data.getValue()));
>>>>>                         })
>>>>>         );
>>>>>
>>>>>         // Deduplicate by key (topic) using Combine.perKey() and select 
>>>>> any one schema for each key
>>>>>         PCollection<KV<String, String>> uniqueSchemas = schemas
>>>>>                 .apply("DeduplicateSchemas", Combine.perKey((schema1, 
>>>>> schema2) -> schema1));  // Keep one schema per topic
>>>>>
>>>>>         // Use View.asMap() to create a side input for schemas
>>>>>         return uniqueSchemas.apply("ToSchemaView", View.asMap());
>>>>>     }
>>>>> }
>>>>>
>>>>> public class BatchDynamicDestination extends 
>>>>> DynamicDestinations<KV<String, TableRow>, DestinationInfo> {
>>>>>
>>>>>     private static final Logger LOG = 
>>>>> LoggerFactory.getLogger(BatchDynamicDestination.class);
>>>>>
>>>>>     private final String datasetName;
>>>>>     private final PCollectionView<Map<String, String>> schemaView;
>>>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>>>
>>>>>     public BatchDynamicDestination(String datasetName, 
>>>>> PCollectionView<Map<String, String>> schemaView) {
>>>>>         this.datasetName = datasetName;
>>>>>         this.schemaView = schemaView;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public Coder<DestinationInfo> getDestinationCoder() {
>>>>>         // Using SerializableCoder because DestinationInfo implements 
>>>>> Serializable
>>>>>         return DestinationInfoCoder.of();
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public java.util.List<PCollectionView<?>> getSideInputs() {
>>>>>         // Register the schemaView as a side input
>>>>>         return Collections.singletonList(schemaView);
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public DestinationInfo getDestination(ValueInSingleWindow<KV<String, 
>>>>> TableRow>> element) {
>>>>>
>>>>>
>>>>>         Map<String, String> schemas = sideInput(schemaView);
>>>>>         String topic = element.getValue().getKey();
>>>>>
>>>>>         String tableSchema = schemas.get(topic);
>>>>>         if (tableSchema == null) {
>>>>>             throw new RuntimeException("Schema not found for topic: " + 
>>>>> topic);
>>>>>         }
>>>>>
>>>>>         TableSchema tableSchemaObj;
>>>>>         try {
>>>>>             tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, 
>>>>> TableSchema.class);  // Deserialize JSON to TableSchema
>>>>>         } catch (IOException e) {
>>>>>             throw new RuntimeException("Error parsing schema JSON", e);
>>>>>         }
>>>>>
>>>>>         LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , 
>>>>> datasetName, topic, tableSchema);
>>>>>
>>>>>         return new DestinationInfo("livewel-prod", datasetName, topic, 
>>>>> tableSchemaObj);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public TableDestination getTable(DestinationInfo destination) {
>>>>>         return new TableDestination(destination.toTableSpec(), 
>>>>> "Dynamically generated table");
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public TableSchema getSchema(DestinationInfo destination) {
>>>>>         return destination.getTableSchema();
>>>>>     }
>>>>>
>>>>>
>>>>> }
>>>>>
>>>>>

Reply via email to