Understood,

Let me make the changes and I will get back on this thread

On Sat, Oct 26, 2024 at 9:19 PM Reuven Lax <re...@google.com> wrote:

> One more thing - in addition to a deterministic coder, DestinationInfo
> needs to have a proper equals and hashTo method defined.
>
> On Fri, Oct 25, 2024 at 10:30 PM Reuven Lax <re...@google.com> wrote:
>
>> Ah, I suspect I know what's going on. What you're doing won't work unless
>> the DestinationInfoCoder is a deterministic coder (i.e. two equivalent
>> objects map to the same encoded representation). A json encoding often
>> isn't deterministic (for one reason, it's legal to encode the json fields
>> in any order).
>>
>> On Fri, Oct 25, 2024 at 2:22 PM Pranjal Pandit <pran...@eka.care> wrote:
>>
>>> Hi Kenneth / Reuven,
>>>
>>> Actually I did a little more observation on the root cause on what might
>>> be the actual problem here,
>>>
>>> Here's what I found:
>>>
>>>
>>> 409s on Bigquery duplicate jobs might be the side effect of something
>>> else related to schema,
>>> What I have came to observe is that when I am giving DynamicDestinations
>>> as below it works (*Working* / *Not Working*)
>>>
>>> Basically, upon returning DestinationInfo object it somehow is not able
>>> to get correct schema downstream into the pipeline while writing to
>>> bigquery
>>> However if I return a normal string in getDestination() and construct
>>> the actual schema in getSchema() then it works correctly
>>>
>>> I have also shared DestinationInfo's coder (if there might be something
>>> there)
>>>
>>> Seems like this may be a bug in Beam ? I am trying to get past GCP
>>> product / engg teams but not much luch there :(
>>>
>>> If this is bug, where I should report it,
>>> May be I can give a fix if I have understood the actual problem
>>> correctly,
>>>
>>> Now, as a side effect, what's happening is that, Job shows successful on
>>> GCP Dataflow, but only a very small portion of data gets written to BQ,
>>> At the least the pipeline should throw some error on schema if that's
>>> the actual problem,
>>>
>>> Reuven,
>>> While working on fixing my write to BQ's I tried a combination of GCP
>>> runner legacy / GCP runner V2 / older versions of beam 2.52.0, 2.53.0 ,
>>> latest (2.60.0), but problem still persisted
>>> Seems to me root cause is independent on anything on runner / versions,
>>>
>>>
>>>
>>> *Working*
>>>
>>> public class BatchDynamicDestination extends DynamicDestinations<KV<String, 
>>> TableRow>, String> {
>>>
>>>     private static final Logger LOG = 
>>> LoggerFactory.getLogger(BatchDynamicDestination.class);
>>>
>>>     private static final Map<String, TableSchema> SCHEMA_MAP = new 
>>> HashMap<>();
>>>
>>>     private final String datasetName;
>>>     private final PCollectionView<Map<String, String>> schemaView;
>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>
>>>     public BatchDynamicDestination(String datasetName, 
>>> PCollectionView<Map<String, String>> schemaView) {
>>>         this.datasetName = datasetName;
>>>         this.schemaView = schemaView;
>>>     }
>>>
>>>     @Override
>>>     public java.util.List<PCollectionView<?>> getSideInputs() {
>>>         // Register the schemaView as a side input
>>>         return Collections.singletonList(schemaView);
>>>     }
>>>
>>>
>>>     @Override
>>>     public String getDestination(ValueInSingleWindow<KV<String, TableRow>> 
>>> element) {
>>>         String topic = element.getValue().getKey();
>>>         return String.format("livewel-prod:%s.%s", datasetName, topic);
>>>     }
>>>
>>>     @Override
>>>     public TableDestination getTable(String destination) {
>>>         String tableName = destination.toLowerCase();
>>>         return new TableDestination(tableName, "Table for category ");
>>>     }
>>>
>>>     @Override
>>>     public TableSchema getSchema(String destination) {
>>>         Map<String, String> schemasMap = sideInput(schemaView);
>>>         LOG.info("destination in getSchema: {}", destination);
>>>         StringBuilder sb = new StringBuilder();
>>>         schemasMap.entrySet().stream().forEach(entry -> {
>>>             sb.append(entry.getKey()).append(" : ");
>>>         });
>>>         LOG.info("destination in getSchema map: {}", sb.toString());
>>>         String[] parts = destination.split("\\.");
>>>
>>>         //assert parts.length > 1;
>>>
>>>         if (parts.length < 2) {
>>>             throw new RuntimeException("parts.length in 
>>> BatchDynamicDestinations is less than 2: " + destination);
>>>         }
>>>         String tableSchema = schemasMap.get(parts[1]);
>>>         TableSchema tableSchemaObj;
>>>         try {
>>>             tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, 
>>> TableSchema.class);  // Deserialize JSON to TableSchema
>>>         } catch (IOException e) {
>>>             throw new RuntimeException("Error parsing schema JSON", e);
>>>         }
>>>
>>>         return tableSchemaObj;
>>>     }
>>>
>>> }
>>>
>>>
>>>
>>> *Not working*
>>>
>>> public class StreamDynamicDestination extends 
>>> DynamicDestinations<BigQueryRow, DestinationInfo> {
>>>
>>>     private final String bigQueryDataset;
>>>
>>>     public StreamDynamicDestination(String bigQueryDataset){
>>>         this.bigQueryDataset = bigQueryDataset;
>>>     }
>>>
>>>     @Override
>>>     public Coder<DestinationInfo> getDestinationCoder() {
>>>         // Using SerializableCoder because DestinationInfo implements 
>>> Serializable
>>>         return DestinationInfoCoder.of();
>>>     }
>>>
>>>     @Override
>>>     public DestinationInfo getDestination(ValueInSingleWindow<BigQueryRow> 
>>> element) {
>>>         String tablePrefix = "testvitalsdb";
>>>         BigQueryRow bigQueryRow = element.getValue();
>>>         assert bigQueryRow != null;
>>>         TableSchema tableSchema = bigQueryRow.generateSchemaFromRow();
>>>         //return new DestinationInfo("livewel-prod", dataset, 
>>> String.format("%s_%d", tablePrefix, BigQueryRow.getTimeStampMod()), 
>>> tableSchema);
>>>         return new DestinationInfo("livewel-prod", bigQueryDataset, 
>>> tablePrefix, tableSchema);
>>>     }
>>>
>>>     @Override
>>>     public TableDestination getTable(DestinationInfo destination) {
>>>         return new TableDestination(destination.toTableSpec(), "Dynamically 
>>> generated table");
>>>     }
>>>
>>>     @Override
>>>     public TableSchema getSchema(DestinationInfo destination) {
>>>         return destination.getTableSchema();
>>>     }
>>>
>>> }
>>>
>>>
>>>
>>> *My DestinationInfoCoder*
>>>
>>> public class DestinationInfoCoder extends AtomicCoder<DestinationInfo> {
>>>
>>>     private static final ObjectMapper objectMapper = new ObjectMapper();
>>>
>>>     @Override
>>>     public void encode(DestinationInfo value, OutputStream outStream) 
>>> throws CoderException, IOException {
>>>         // Encode the String fields
>>>         try {
>>>             StringUtf8Coder.of().encode(value.getProject(), outStream);
>>>             StringUtf8Coder.of().encode(value.getDataset(), outStream);
>>>             StringUtf8Coder.of().encode(value.getTable(), outStream);
>>>
>>>             // Encode TableSchema as JSON string
>>>             String schemaJson = 
>>> objectMapper.writeValueAsString(value.getTableSchema());
>>>             StringUtf8Coder.of().encode(schemaJson, outStream);
>>>         }catch (Exception e) {
>>>             throw new CoderException("Failed to decode project field", e);
>>>         }
>>>     }
>>>
>>>     @Override
>>>     public DestinationInfo decode(InputStream inStream) throws 
>>> CoderException, IOException {
>>>         // Decode the String fields
>>>         String project;
>>>         String dataset;
>>>         String table;
>>>         TableSchema tableSchema;
>>>         try{
>>>             project = StringUtf8Coder.of().decode(inStream);
>>>             dataset = StringUtf8Coder.of().decode(inStream);
>>>             table = StringUtf8Coder.of().decode(inStream);
>>>
>>>             // Decode TableSchema from JSON string
>>>             String schemaJson = StringUtf8Coder.of().decode(inStream);
>>>             tableSchema = objectMapper.readValue(schemaJson, 
>>> TableSchema.class);
>>>         } catch (Exception e) {
>>>             throw new CoderException("Failed to decode project field", e);
>>>         }
>>>         return new DestinationInfo(project, dataset, table, tableSchema);
>>>     }
>>>
>>>     public static DestinationInfoCoder of() {
>>>         return new DestinationInfoCoder();
>>>     }
>>> }
>>>
>>>
>>> On Fri, Oct 25, 2024 at 9:50 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Are you using runner v2? The issue you linked to implies that this only
>>>> happened on runner v2.
>>>>
>>>> On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Pranjal,
>>>>>
>>>>> If there is a bug in Beam, this is a good list to contact. If there is
>>>>> a problem with a GCP service, then GCP support is better.
>>>>>
>>>>> I see the code you shared, but what error or difficulty are you
>>>>> encountering?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care>
>>>>> wrote:
>>>>>
>>>>>> Hi Kenneth / Yi Hun,
>>>>>>
>>>>>> I have been grappling since past few days to solve for a pipeline
>>>>>> which runs as batch pipeline and loads data from GCS to BigQuery,
>>>>>>
>>>>>>
>>>>>> While striving to find solution I found out similar issue and have
>>>>>> posted what I am observing her in github issues link (
>>>>>> https://github.com/apache/beam/issues/28219),
>>>>>>
>>>>>> We are using GCP Dataflow runner  as such I have written to GCP
>>>>>> support but not much luck there,
>>>>>>
>>>>>> I will try to briefly explain my problem I am facing here again ,
>>>>>>
>>>>>> Really appreciate any leads on how I can resolve this issue,
>>>>>>
>>>>>>
>>>>>> Trying to use dynamic destinations to load data from GCS to Bigquery
>>>>>> to multiple tables / multiple schemas ,
>>>>>>
>>>>>>
>>>>>> Tried to put down all required classes I am using here,
>>>>>>
>>>>>>
>>>>>> PCollection<KV<String, TableRow>> kvRows = decompressedLines
>>>>>>         .apply("Convert To BigQueryRow", ParDo.of(new 
>>>>>> Utility.ConvertToBigQueryRow(ekaUUID)));
>>>>>>
>>>>>> // Group by TableId to manage different schemas per table type
>>>>>>
>>>>>> PCollectionView<Map<String, String>> schemaView = 
>>>>>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows);
>>>>>>
>>>>>> WriteResult result = kvRows.apply("WriteToBigQuery",
>>>>>>         BigQueryIO.<KV<String, TableRow>>write()
>>>>>>                 .to(new BatchDynamicDestination(bigqueryDatasetName, 
>>>>>> schemaView))
>>>>>>                 
>>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>>>                 
>>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>>>>                 .withFormatFunction(stringTableRowKV -> stringTableRowKV 
>>>>>> != null ? stringTableRowKV.getValue() : null)
>>>>>> );
>>>>>>
>>>>>>
>>>>>> public static class ConvertToBigQueryRow extends DoFn<KV<String, 
>>>>>> String>, KV<String, TableRow>> {
>>>>>>
>>>>>>     private static final Logger LOG = 
>>>>>> LoggerFactory.getLogger(ConvertToBigQueryRow.class);
>>>>>>
>>>>>>     String uuidKeyName;
>>>>>>
>>>>>>     public ConvertToBigQueryRow(String uuidKeyName){
>>>>>>         this.uuidKeyName = uuidKeyName;
>>>>>>     }
>>>>>>
>>>>>>     @ProcessElement
>>>>>>     public void processElement(ProcessContext c) throws 
>>>>>> JsonProcessingException {
>>>>>>         ObjectMapper mapper = new ObjectMapper();
>>>>>>         String value = Objects.requireNonNull(c.element().getValue());
>>>>>>         String topic = Objects.requireNonNull(c.element().getKey());
>>>>>>         // System.out.println("Data in ProcessElem   " + value);
>>>>>>         // Parse the entire value as a JSON tree
>>>>>>         JsonNode rootNode = mapper.readTree(value);
>>>>>>
>>>>>>         // Extract the "after" field as a JSON node (not as a string)
>>>>>>         JsonNode afterNode = rootNode.get("after");
>>>>>>
>>>>>>         // System.out.println("Data in ProcessElem  after  " + 
>>>>>> afterNode.asText());
>>>>>>         // Check if "after" field exists and is not null
>>>>>>         if (!afterNode.isNull()) {
>>>>>>
>>>>>>             String afterJsonString = afterNode.asText(); // Extract the 
>>>>>> string
>>>>>>             // representation for json node for after
>>>>>>             JsonNode afterJsonNode = mapper.readTree(afterJsonString); 
>>>>>> // Parse the string into a JsonNode
>>>>>>
>>>>>>             Map<String, Object> afterMap = new HashMap<>();
>>>>>>             try {
>>>>>>                 afterMap = mapper.convertValue(afterNode, new 
>>>>>> TypeReference<Map<String, Object>>() {
>>>>>>                 });
>>>>>>             } catch (IllegalArgumentException e) {
>>>>>>                 afterMap = mapper.convertValue(afterJsonNode, new 
>>>>>> TypeReference<Map<String, Object>>() {
>>>>>>                 });
>>>>>>             }
>>>>>>
>>>>>>             if (afterMap != null) {
>>>>>>                 // System.out.println("Data in ProcessElem  afterMap  " 
>>>>>> + afterMap);
>>>>>>                 TableRow row = new TableRow();
>>>>>>                 for (Map.Entry<String, Object> entry : 
>>>>>> afterMap.entrySet()) {
>>>>>>                     row.set(entry.getKey(), entry.getValue());
>>>>>>                 }
>>>>>>
>>>>>>                 // Insert eka UUID to our table
>>>>>>                 UUID uuid = UUID.randomUUID();
>>>>>>                 row.set(this.uuidKeyName, uuid.toString());
>>>>>>
>>>>>>
>>>>>>                 LOG.info("T: {}D: {}", topic, row.toString());
>>>>>>                 c.output(KV.of(topic, row));
>>>>>>             }else{
>>>>>>                 LOG.error("Data in ProcessElem  afterMap mostly null ");
>>>>>>             }
>>>>>>         } else {
>>>>>>             LOG.error("The 'after' field is null.");
>>>>>>         }
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> public static class GetSchemaViewFromBigQueryRows {
>>>>>>
>>>>>>     private static final Logger LOG = 
>>>>>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class);
>>>>>>     private static final Map<String, TableSchema> schemaCache = new 
>>>>>> ConcurrentHashMap<>();
>>>>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>>>>
>>>>>>     private static String schemaToJson(TableSchema schema) {
>>>>>>         try {
>>>>>>             return OBJECT_MAPPER.writeValueAsString(schema);
>>>>>>         } catch (JsonProcessingException e) {
>>>>>>             throw new RuntimeException("Error converting schema to 
>>>>>> JSON", e);
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     // Method to generate a schema for the given type
>>>>>>     public static String getSchemaForType(String type, TableRow 
>>>>>> tableRow) {
>>>>>>
>>>>>>         TableSchema schema = schemaCache.get(type);
>>>>>>
>>>>>>         if (schema == null) {
>>>>>>             schema = new TableSchema().setFields(new ArrayList<>());
>>>>>>             schemaCache.put(type, schema);
>>>>>>         }
>>>>>>
>>>>>>         // old fields
>>>>>>         // Create a set of existing field names to avoid duplicates
>>>>>>         Set<String> existingFieldNames = new HashSet<>();
>>>>>>         for (TableFieldSchema field : schema.getFields()) {
>>>>>>             existingFieldNames.add(field.getName());
>>>>>>         }
>>>>>>
>>>>>>         // set to new fields
>>>>>>         TableSchema finalSchema = schema;
>>>>>>         tableRow.forEach((fieldName, fieldValue) -> {
>>>>>>             if (!existingFieldNames.contains(fieldName)) {
>>>>>>                 TableFieldSchema fieldSchema = new TableFieldSchema()
>>>>>>                         .setName(fieldName)
>>>>>>                         .setType(determineFieldType(fieldValue));
>>>>>>                 finalSchema.getFields().add(fieldSchema);
>>>>>>             }
>>>>>>         });
>>>>>>
>>>>>>         // TODO: Add cases for handling schema conflicts also here
>>>>>>
>>>>>>         // update final schema in map
>>>>>>         schemaCache.put(type, finalSchema);
>>>>>>         return schemaToJson(finalSchema);
>>>>>>     }
>>>>>>
>>>>>>     // Method to create a view of schemas as a 
>>>>>> PCollectionView<Map<String, TableSchema>>
>>>>>>     public static PCollectionView<Map<String, String>> createSchemasView(
>>>>>>             PCollection<KV<String, TableRow>> input) {
>>>>>>
>>>>>>         // Map InputData to KV<String, TableSchema>
>>>>>>         PCollection<KV<String, String>> schemas = 
>>>>>> input.apply("MapElements for converting KV",
>>>>>>                 
>>>>>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), 
>>>>>> TypeDescriptors.strings()))
>>>>>>                         .via(data -> {
>>>>>>                             assert data != null;
>>>>>>                             return KV.of(data.getKey(), 
>>>>>> getSchemaForType(data.getKey(), data.getValue()));
>>>>>>                         })
>>>>>>         );
>>>>>>
>>>>>>         // Deduplicate by key (topic) using Combine.perKey() and select 
>>>>>> any one schema for each key
>>>>>>         PCollection<KV<String, String>> uniqueSchemas = schemas
>>>>>>                 .apply("DeduplicateSchemas", Combine.perKey((schema1, 
>>>>>> schema2) -> schema1));  // Keep one schema per topic
>>>>>>
>>>>>>         // Use View.asMap() to create a side input for schemas
>>>>>>         return uniqueSchemas.apply("ToSchemaView", View.asMap());
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> public class BatchDynamicDestination extends 
>>>>>> DynamicDestinations<KV<String, TableRow>, DestinationInfo> {
>>>>>>
>>>>>>     private static final Logger LOG = 
>>>>>> LoggerFactory.getLogger(BatchDynamicDestination.class);
>>>>>>
>>>>>>     private final String datasetName;
>>>>>>     private final PCollectionView<Map<String, String>> schemaView;
>>>>>>     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
>>>>>>
>>>>>>     public BatchDynamicDestination(String datasetName, 
>>>>>> PCollectionView<Map<String, String>> schemaView) {
>>>>>>         this.datasetName = datasetName;
>>>>>>         this.schemaView = schemaView;
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public Coder<DestinationInfo> getDestinationCoder() {
>>>>>>         // Using SerializableCoder because DestinationInfo implements 
>>>>>> Serializable
>>>>>>         return DestinationInfoCoder.of();
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public java.util.List<PCollectionView<?>> getSideInputs() {
>>>>>>         // Register the schemaView as a side input
>>>>>>         return Collections.singletonList(schemaView);
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public DestinationInfo getDestination(ValueInSingleWindow<KV<String, 
>>>>>> TableRow>> element) {
>>>>>>
>>>>>>
>>>>>>         Map<String, String> schemas = sideInput(schemaView);
>>>>>>         String topic = element.getValue().getKey();
>>>>>>
>>>>>>         String tableSchema = schemas.get(topic);
>>>>>>         if (tableSchema == null) {
>>>>>>             throw new RuntimeException("Schema not found for topic: " + 
>>>>>> topic);
>>>>>>         }
>>>>>>
>>>>>>         TableSchema tableSchemaObj;
>>>>>>         try {
>>>>>>             tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, 
>>>>>> TableSchema.class);  // Deserialize JSON to TableSchema
>>>>>>         } catch (IOException e) {
>>>>>>             throw new RuntimeException("Error parsing schema JSON", e);
>>>>>>         }
>>>>>>
>>>>>>         LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , 
>>>>>> datasetName, topic, tableSchema);
>>>>>>
>>>>>>         return new DestinationInfo("livewel-prod", datasetName, topic, 
>>>>>> tableSchemaObj);
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public TableDestination getTable(DestinationInfo destination) {
>>>>>>         return new TableDestination(destination.toTableSpec(), 
>>>>>> "Dynamically generated table");
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public TableSchema getSchema(DestinationInfo destination) {
>>>>>>         return destination.getTableSchema();
>>>>>>     }
>>>>>>
>>>>>>
>>>>>> }
>>>>>>
>>>>>>

Reply via email to