Ah, I suspect I know what's going on. What you're doing won't work unless
the DestinationInfoCoder is a deterministic coder (i.e. two equivalent
objects map to the same encoded representation). A json encoding often
isn't deterministic (for one reason, it's legal to encode the json fields
in any order).

On Fri, Oct 25, 2024 at 2:22 PM Pranjal Pandit <pran...@eka.care> wrote:

> Hi Kenneth / Reuven,
>
> Actually I did a little more observation on the root cause on what might
> be the actual problem here,
>
> Here's what I found:
>
>
> 409s on Bigquery duplicate jobs might be the side effect of something else
> related to schema,
> What I have came to observe is that when I am giving DynamicDestinations
> as below it works (*Working* / *Not Working*)
>
> Basically, upon returning DestinationInfo object it somehow is not able to
> get correct schema downstream into the pipeline while writing to bigquery
> However if I return a normal string in getDestination() and construct the
> actual schema in getSchema() then it works correctly
>
> I have also shared DestinationInfo's coder (if there might be something
> there)
>
> Seems like this may be a bug in Beam ? I am trying to get past GCP product
> / engg teams but not much luch there :(
>
> If this is bug, where I should report it,
> May be I can give a fix if I have understood the actual problem correctly,
>
> Now, as a side effect, what's happening is that, Job shows successful on
> GCP Dataflow, but only a very small portion of data gets written to BQ,
> At the least the pipeline should throw some error on schema if that's the
> actual problem,
>
> Reuven,
> While working on fixing my write to BQ's I tried a combination of GCP
> runner legacy / GCP runner V2 / older versions of beam 2.52.0, 2.53.0 ,
> latest (2.60.0), but problem still persisted
> Seems to me root cause is independent on anything on runner / versions,
>
>
>
> *Working*
>
> public class BatchDynamicDestination extends DynamicDestinations<KV<String, 
> TableRow>, String> {
>
>     private static final Logger LOG = 
> LoggerFactory.getLogger(BatchDynamicDestination.class);
>
>     private static final Map<String, TableSchema> SCHEMA_MAP = new 
> HashMap<>();
>
>     private final String datasetName;
>     private final PCollectionView<Map<String, String>> schemaView;
>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>
>     public BatchDynamicDestination(String datasetName, 
> PCollectionView<Map<String, String>> schemaView) {
>         this.datasetName = datasetName;
>         this.schemaView = schemaView;
>     }
>
>     @Override
>     public java.util.List<PCollectionView<?>> getSideInputs() {
>         // Register the schemaView as a side input
>         return Collections.singletonList(schemaView);
>     }
>
>
>     @Override
>     public String getDestination(ValueInSingleWindow<KV<String, TableRow>> 
> element) {
>         String topic = element.getValue().getKey();
>         return String.format("livewel-prod:%s.%s", datasetName, topic);
>     }
>
>     @Override
>     public TableDestination getTable(String destination) {
>         String tableName = destination.toLowerCase();
>         return new TableDestination(tableName, "Table for category ");
>     }
>
>     @Override
>     public TableSchema getSchema(String destination) {
>         Map<String, String> schemasMap = sideInput(schemaView);
>         LOG.info("destination in getSchema: {}", destination);
>         StringBuilder sb = new StringBuilder();
>         schemasMap.entrySet().stream().forEach(entry -> {
>             sb.append(entry.getKey()).append(" : ");
>         });
>         LOG.info("destination in getSchema map: {}", sb.toString());
>         String[] parts = destination.split("\\.");
>
>         //assert parts.length > 1;
>
>         if (parts.length < 2) {
>             throw new RuntimeException("parts.length in 
> BatchDynamicDestinations is less than 2: " + destination);
>         }
>         String tableSchema = schemasMap.get(parts[1]);
>         TableSchema tableSchemaObj;
>         try {
>             tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, 
> TableSchema.class);  // Deserialize JSON to TableSchema
>         } catch (IOException e) {
>             throw new RuntimeException("Error parsing schema JSON", e);
>         }
>
>         return tableSchemaObj;
>     }
>
> }
>
>
>
> *Not working*
>
> public class StreamDynamicDestination extends 
> DynamicDestinations<BigQueryRow, DestinationInfo> {
>
>     private final String bigQueryDataset;
>
>     public StreamDynamicDestination(String bigQueryDataset){
>         this.bigQueryDataset = bigQueryDataset;
>     }
>
>     @Override
>     public Coder<DestinationInfo> getDestinationCoder() {
>         // Using SerializableCoder because DestinationInfo implements 
> Serializable
>         return DestinationInfoCoder.of();
>     }
>
>     @Override
>     public DestinationInfo getDestination(ValueInSingleWindow<BigQueryRow> 
> element) {
>         String tablePrefix = "testvitalsdb";
>         BigQueryRow bigQueryRow = element.getValue();
>         assert bigQueryRow != null;
>         TableSchema tableSchema = bigQueryRow.generateSchemaFromRow();
>         //return new DestinationInfo("livewel-prod", dataset, 
> String.format("%s_%d", tablePrefix, BigQueryRow.getTimeStampMod()), 
> tableSchema);
>         return new DestinationInfo("livewel-prod", bigQueryDataset, 
> tablePrefix, tableSchema);
>     }
>
>     @Override
>     public TableDestination getTable(DestinationInfo destination) {
>         return new TableDestination(destination.toTableSpec(), "Dynamically 
> generated table");
>     }
>
>     @Override
>     public TableSchema getSchema(DestinationInfo destination) {
>         return destination.getTableSchema();
>     }
>
> }
>
>
>
> *My DestinationInfoCoder*
>
> public class DestinationInfoCoder extends AtomicCoder<DestinationInfo> {
>
>     private static final ObjectMapper objectMapper = new ObjectMapper();
>
>     @Override
>     public void encode(DestinationInfo value, OutputStream outStream) throws 
> CoderException, IOException {
>         // Encode the String fields
>         try {
>             StringUtf8Coder.of().encode(value.getProject(), outStream);
>             StringUtf8Coder.of().encode(value.getDataset(), outStream);
>             StringUtf8Coder.of().encode(value.getTable(), outStream);
>
>             // Encode TableSchema as JSON string
>             String schemaJson = 
> objectMapper.writeValueAsString(value.getTableSchema());
>             StringUtf8Coder.of().encode(schemaJson, outStream);
>         }catch (Exception e) {
>             throw new CoderException("Failed to decode project field", e);
>         }
>     }
>
>     @Override
>     public DestinationInfo decode(InputStream inStream) throws 
> CoderException, IOException {
>         // Decode the String fields
>         String project;
>         String dataset;
>         String table;
>         TableSchema tableSchema;
>         try{
>             project = StringUtf8Coder.of().decode(inStream);
>             dataset = StringUtf8Coder.of().decode(inStream);
>             table = StringUtf8Coder.of().decode(inStream);
>
>             // Decode TableSchema from JSON string
>             String schemaJson = StringUtf8Coder.of().decode(inStream);
>             tableSchema = objectMapper.readValue(schemaJson, 
> TableSchema.class);
>         } catch (Exception e) {
>             throw new CoderException("Failed to decode project field", e);
>         }
>         return new DestinationInfo(project, dataset, table, tableSchema);
>     }
>
>     public static DestinationInfoCoder of() {
>         return new DestinationInfoCoder();
>     }
> }
>
>
> On Fri, Oct 25, 2024 at 9:50 PM Reuven Lax <re...@google.com> wrote:
>
>> Are you using runner v2? The issue you linked to implies that this only
>> happened on runner v2.
>>
>> On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Hi Pranjal,
>>>
>>> If there is a bug in Beam, this is a good list to contact. If there is a
>>> problem with a GCP service, then GCP support is better.
>>>
>>> I see the code you shared, but what error or difficulty are you
>>> encountering?
>>>
>>> Kenn
>>>
>>> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote:
>>>
>>>> Hi Kenneth / Yi Hun,
>>>>
>>>> I have been grappling since past few days to solve for a pipeline which
>>>> runs as batch pipeline and loads data from GCS to BigQuery,
>>>>
>>>>
>>>> While striving to find solution I found out similar issue and have
>>>> posted what I am observing her in github issues link (
>>>> https://github.com/apache/beam/issues/28219),
>>>>
>>>> We are using GCP Dataflow runner  as such I have written to GCP
>>>> support but not much luck there,
>>>>
>>>> I will try to briefly explain my problem I am facing here again ,
>>>>
>>>> Really appreciate any leads on how I can resolve this issue,
>>>>
>>>>
>>>> Trying to use dynamic destinations to load data from GCS to Bigquery to
>>>> multiple tables / multiple schemas ,
>>>>
>>>>
>>>> Tried to put down all required classes I am using here,
>>>>
>>>>
>>>> PCollection<KV<String, TableRow>> kvRows = decompressedLines
>>>>         .apply("Convert To BigQueryRow", ParDo.of(new 
>>>> Utility.ConvertToBigQueryRow(ekaUUID)));
>>>>
>>>> // Group by TableId to manage different schemas per table type
>>>>
>>>> PCollectionView<Map<String, String>> schemaView = 
>>>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows);
>>>>
>>>> WriteResult result = kvRows.apply("WriteToBigQuery",
>>>>         BigQueryIO.<KV<String, TableRow>>write()
>>>>                 .to(new BatchDynamicDestination(bigqueryDatasetName, 
>>>> schemaView))
>>>>                 
>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>                 
>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>>                 .withFormatFunction(stringTableRowKV -> stringTableRowKV 
>>>> != null ? stringTableRowKV.getValue() : null)
>>>> );
>>>>
>>>>
>>>> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, 
>>>> KV<String, TableRow>> {
>>>>
>>>>     private static final Logger LOG = 
>>>> LoggerFactory.getLogger(ConvertToBigQueryRow.class);
>>>>
>>>>     String uuidKeyName;
>>>>
>>>>     public ConvertToBigQueryRow(String uuidKeyName){
>>>>         this.uuidKeyName = uuidKeyName;
>>>>     }
>>>>
>>>>     @ProcessElement
>>>>     public void processElement(ProcessContext c) throws 
>>>> JsonProcessingException {
>>>>         ObjectMapper mapper = new ObjectMapper();
>>>>         String value = Objects.requireNonNull(c.element().getValue());
>>>>         String topic = Objects.requireNonNull(c.element().getKey());
>>>>         // System.out.println("Data in ProcessElem   " + value);
>>>>         // Parse the entire value as a JSON tree
>>>>         JsonNode rootNode = mapper.readTree(value);
>>>>
>>>>         // Extract the "after" field as a JSON node (not as a string)
>>>>         JsonNode afterNode = rootNode.get("after");
>>>>
>>>>         // System.out.println("Data in ProcessElem  after  " + 
>>>> afterNode.asText());
>>>>         // Check if "after" field exists and is not null
>>>>         if (!afterNode.isNull()) {
>>>>
>>>>             String afterJsonString = afterNode.asText(); // Extract the 
>>>> string
>>>>             // representation for json node for after
>>>>             JsonNode afterJsonNode = mapper.readTree(afterJsonString); // 
>>>> Parse the string into a JsonNode
>>>>
>>>>             Map<String, Object> afterMap = new HashMap<>();
>>>>             try {
>>>>                 afterMap = mapper.convertValue(afterNode, new 
>>>> TypeReference<Map<String, Object>>() {
>>>>                 });
>>>>             } catch (IllegalArgumentException e) {
>>>>                 afterMap = mapper.convertValue(afterJsonNode, new 
>>>> TypeReference<Map<String, Object>>() {
>>>>                 });
>>>>             }
>>>>
>>>>             if (afterMap != null) {
>>>>                 // System.out.println("Data in ProcessElem  afterMap  " + 
>>>> afterMap);
>>>>                 TableRow row = new TableRow();
>>>>                 for (Map.Entry<String, Object> entry : 
>>>> afterMap.entrySet()) {
>>>>                     row.set(entry.getKey(), entry.getValue());
>>>>                 }
>>>>
>>>>                 // Insert eka UUID to our table
>>>>                 UUID uuid = UUID.randomUUID();
>>>>                 row.set(this.uuidKeyName, uuid.toString());
>>>>
>>>>
>>>>                 LOG.info("T: {}D: {}", topic, row.toString());
>>>>                 c.output(KV.of(topic, row));
>>>>             }else{
>>>>                 LOG.error("Data in ProcessElem  afterMap mostly null ");
>>>>             }
>>>>         } else {
>>>>             LOG.error("The 'after' field is null.");
>>>>         }
>>>>     }
>>>> }
>>>>
>>>>
>>>>
>>>> public static class GetSchemaViewFromBigQueryRows {
>>>>
>>>>     private static final Logger LOG = 
>>>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class);
>>>>     private static final Map<String, TableSchema> schemaCache = new 
>>>> ConcurrentHashMap<>();
>>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>>
>>>>     private static String schemaToJson(TableSchema schema) {
>>>>         try {
>>>>             return OBJECT_MAPPER.writeValueAsString(schema);
>>>>         } catch (JsonProcessingException e) {
>>>>             throw new RuntimeException("Error converting schema to JSON", 
>>>> e);
>>>>         }
>>>>     }
>>>>
>>>>     // Method to generate a schema for the given type
>>>>     public static String getSchemaForType(String type, TableRow tableRow) {
>>>>
>>>>         TableSchema schema = schemaCache.get(type);
>>>>
>>>>         if (schema == null) {
>>>>             schema = new TableSchema().setFields(new ArrayList<>());
>>>>             schemaCache.put(type, schema);
>>>>         }
>>>>
>>>>         // old fields
>>>>         // Create a set of existing field names to avoid duplicates
>>>>         Set<String> existingFieldNames = new HashSet<>();
>>>>         for (TableFieldSchema field : schema.getFields()) {
>>>>             existingFieldNames.add(field.getName());
>>>>         }
>>>>
>>>>         // set to new fields
>>>>         TableSchema finalSchema = schema;
>>>>         tableRow.forEach((fieldName, fieldValue) -> {
>>>>             if (!existingFieldNames.contains(fieldName)) {
>>>>                 TableFieldSchema fieldSchema = new TableFieldSchema()
>>>>                         .setName(fieldName)
>>>>                         .setType(determineFieldType(fieldValue));
>>>>                 finalSchema.getFields().add(fieldSchema);
>>>>             }
>>>>         });
>>>>
>>>>         // TODO: Add cases for handling schema conflicts also here
>>>>
>>>>         // update final schema in map
>>>>         schemaCache.put(type, finalSchema);
>>>>         return schemaToJson(finalSchema);
>>>>     }
>>>>
>>>>     // Method to create a view of schemas as a PCollectionView<Map<String, 
>>>> TableSchema>>
>>>>     public static PCollectionView<Map<String, String>> createSchemasView(
>>>>             PCollection<KV<String, TableRow>> input) {
>>>>
>>>>         // Map InputData to KV<String, TableSchema>
>>>>         PCollection<KV<String, String>> schemas = input.apply("MapElements 
>>>> for converting KV",
>>>>                 
>>>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), 
>>>> TypeDescriptors.strings()))
>>>>                         .via(data -> {
>>>>                             assert data != null;
>>>>                             return KV.of(data.getKey(), 
>>>> getSchemaForType(data.getKey(), data.getValue()));
>>>>                         })
>>>>         );
>>>>
>>>>         // Deduplicate by key (topic) using Combine.perKey() and select 
>>>> any one schema for each key
>>>>         PCollection<KV<String, String>> uniqueSchemas = schemas
>>>>                 .apply("DeduplicateSchemas", Combine.perKey((schema1, 
>>>> schema2) -> schema1));  // Keep one schema per topic
>>>>
>>>>         // Use View.asMap() to create a side input for schemas
>>>>         return uniqueSchemas.apply("ToSchemaView", View.asMap());
>>>>     }
>>>> }
>>>>
>>>> public class BatchDynamicDestination extends 
>>>> DynamicDestinations<KV<String, TableRow>, DestinationInfo> {
>>>>
>>>>     private static final Logger LOG = 
>>>> LoggerFactory.getLogger(BatchDynamicDestination.class);
>>>>
>>>>     private final String datasetName;
>>>>     private final PCollectionView<Map<String, String>> schemaView;
>>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>>
>>>>     public BatchDynamicDestination(String datasetName, 
>>>> PCollectionView<Map<String, String>> schemaView) {
>>>>         this.datasetName = datasetName;
>>>>         this.schemaView = schemaView;
>>>>     }
>>>>
>>>>     @Override
>>>>     public Coder<DestinationInfo> getDestinationCoder() {
>>>>         // Using SerializableCoder because DestinationInfo implements 
>>>> Serializable
>>>>         return DestinationInfoCoder.of();
>>>>     }
>>>>
>>>>     @Override
>>>>     public java.util.List<PCollectionView<?>> getSideInputs() {
>>>>         // Register the schemaView as a side input
>>>>         return Collections.singletonList(schemaView);
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>     public DestinationInfo getDestination(ValueInSingleWindow<KV<String, 
>>>> TableRow>> element) {
>>>>
>>>>
>>>>         Map<String, String> schemas = sideInput(schemaView);
>>>>         String topic = element.getValue().getKey();
>>>>
>>>>         String tableSchema = schemas.get(topic);
>>>>         if (tableSchema == null) {
>>>>             throw new RuntimeException("Schema not found for topic: " + 
>>>> topic);
>>>>         }
>>>>
>>>>         TableSchema tableSchemaObj;
>>>>         try {
>>>>             tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, 
>>>> TableSchema.class);  // Deserialize JSON to TableSchema
>>>>         } catch (IOException e) {
>>>>             throw new RuntimeException("Error parsing schema JSON", e);
>>>>         }
>>>>
>>>>         LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , 
>>>> datasetName, topic, tableSchema);
>>>>
>>>>         return new DestinationInfo("livewel-prod", datasetName, topic, 
>>>> tableSchemaObj);
>>>>     }
>>>>
>>>>     @Override
>>>>     public TableDestination getTable(DestinationInfo destination) {
>>>>         return new TableDestination(destination.toTableSpec(), 
>>>> "Dynamically generated table");
>>>>     }
>>>>
>>>>     @Override
>>>>     public TableSchema getSchema(DestinationInfo destination) {
>>>>         return destination.getTableSchema();
>>>>     }
>>>>
>>>>
>>>> }
>>>>
>>>>

Reply via email to