Understood, Let me make the changes and I will get back on this thread
On Sat, Oct 26, 2024 at 9:19 PM Reuven Lax <re...@google.com> wrote: > One more thing - in addition to a deterministic coder, DestinationInfo > needs to have a proper equals and hashTo method defined. > > On Fri, Oct 25, 2024 at 10:30 PM Reuven Lax <re...@google.com> wrote: > >> Ah, I suspect I know what's going on. What you're doing won't work unless >> the DestinationInfoCoder is a deterministic coder (i.e. two equivalent >> objects map to the same encoded representation). A json encoding often >> isn't deterministic (for one reason, it's legal to encode the json fields >> in any order). >> >> On Fri, Oct 25, 2024 at 2:22 PM Pranjal Pandit <pran...@eka.care> wrote: >> >>> Hi Kenneth / Reuven, >>> >>> Actually I did a little more observation on the root cause on what might >>> be the actual problem here, >>> >>> Here's what I found: >>> >>> >>> 409s on Bigquery duplicate jobs might be the side effect of something >>> else related to schema, >>> What I have came to observe is that when I am giving DynamicDestinations >>> as below it works (*Working* / *Not Working*) >>> >>> Basically, upon returning DestinationInfo object it somehow is not able >>> to get correct schema downstream into the pipeline while writing to >>> bigquery >>> However if I return a normal string in getDestination() and construct >>> the actual schema in getSchema() then it works correctly >>> >>> I have also shared DestinationInfo's coder (if there might be something >>> there) >>> >>> Seems like this may be a bug in Beam ? I am trying to get past GCP >>> product / engg teams but not much luch there :( >>> >>> If this is bug, where I should report it, >>> May be I can give a fix if I have understood the actual problem >>> correctly, >>> >>> Now, as a side effect, what's happening is that, Job shows successful on >>> GCP Dataflow, but only a very small portion of data gets written to BQ, >>> At the least the pipeline should throw some error on schema if that's >>> the actual problem, >>> >>> Reuven, >>> While working on fixing my write to BQ's I tried a combination of GCP >>> runner legacy / GCP runner V2 / older versions of beam 2.52.0, 2.53.0 , >>> latest (2.60.0), but problem still persisted >>> Seems to me root cause is independent on anything on runner / versions, >>> >>> >>> >>> *Working* >>> >>> public class BatchDynamicDestination extends DynamicDestinations<KV<String, >>> TableRow>, String> { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(BatchDynamicDestination.class); >>> >>> private static final Map<String, TableSchema> SCHEMA_MAP = new >>> HashMap<>(); >>> >>> private final String datasetName; >>> private final PCollectionView<Map<String, String>> schemaView; >>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>> >>> public BatchDynamicDestination(String datasetName, >>> PCollectionView<Map<String, String>> schemaView) { >>> this.datasetName = datasetName; >>> this.schemaView = schemaView; >>> } >>> >>> @Override >>> public java.util.List<PCollectionView<?>> getSideInputs() { >>> // Register the schemaView as a side input >>> return Collections.singletonList(schemaView); >>> } >>> >>> >>> @Override >>> public String getDestination(ValueInSingleWindow<KV<String, TableRow>> >>> element) { >>> String topic = element.getValue().getKey(); >>> return String.format("livewel-prod:%s.%s", datasetName, topic); >>> } >>> >>> @Override >>> public TableDestination getTable(String destination) { >>> String tableName = destination.toLowerCase(); >>> return new TableDestination(tableName, "Table for category "); >>> } >>> >>> @Override >>> public TableSchema getSchema(String destination) { >>> Map<String, String> schemasMap = sideInput(schemaView); >>> LOG.info("destination in getSchema: {}", destination); >>> StringBuilder sb = new StringBuilder(); >>> schemasMap.entrySet().stream().forEach(entry -> { >>> sb.append(entry.getKey()).append(" : "); >>> }); >>> LOG.info("destination in getSchema map: {}", sb.toString()); >>> String[] parts = destination.split("\\."); >>> >>> //assert parts.length > 1; >>> >>> if (parts.length < 2) { >>> throw new RuntimeException("parts.length in >>> BatchDynamicDestinations is less than 2: " + destination); >>> } >>> String tableSchema = schemasMap.get(parts[1]); >>> TableSchema tableSchemaObj; >>> try { >>> tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, >>> TableSchema.class); // Deserialize JSON to TableSchema >>> } catch (IOException e) { >>> throw new RuntimeException("Error parsing schema JSON", e); >>> } >>> >>> return tableSchemaObj; >>> } >>> >>> } >>> >>> >>> >>> *Not working* >>> >>> public class StreamDynamicDestination extends >>> DynamicDestinations<BigQueryRow, DestinationInfo> { >>> >>> private final String bigQueryDataset; >>> >>> public StreamDynamicDestination(String bigQueryDataset){ >>> this.bigQueryDataset = bigQueryDataset; >>> } >>> >>> @Override >>> public Coder<DestinationInfo> getDestinationCoder() { >>> // Using SerializableCoder because DestinationInfo implements >>> Serializable >>> return DestinationInfoCoder.of(); >>> } >>> >>> @Override >>> public DestinationInfo getDestination(ValueInSingleWindow<BigQueryRow> >>> element) { >>> String tablePrefix = "testvitalsdb"; >>> BigQueryRow bigQueryRow = element.getValue(); >>> assert bigQueryRow != null; >>> TableSchema tableSchema = bigQueryRow.generateSchemaFromRow(); >>> //return new DestinationInfo("livewel-prod", dataset, >>> String.format("%s_%d", tablePrefix, BigQueryRow.getTimeStampMod()), >>> tableSchema); >>> return new DestinationInfo("livewel-prod", bigQueryDataset, >>> tablePrefix, tableSchema); >>> } >>> >>> @Override >>> public TableDestination getTable(DestinationInfo destination) { >>> return new TableDestination(destination.toTableSpec(), "Dynamically >>> generated table"); >>> } >>> >>> @Override >>> public TableSchema getSchema(DestinationInfo destination) { >>> return destination.getTableSchema(); >>> } >>> >>> } >>> >>> >>> >>> *My DestinationInfoCoder* >>> >>> public class DestinationInfoCoder extends AtomicCoder<DestinationInfo> { >>> >>> private static final ObjectMapper objectMapper = new ObjectMapper(); >>> >>> @Override >>> public void encode(DestinationInfo value, OutputStream outStream) >>> throws CoderException, IOException { >>> // Encode the String fields >>> try { >>> StringUtf8Coder.of().encode(value.getProject(), outStream); >>> StringUtf8Coder.of().encode(value.getDataset(), outStream); >>> StringUtf8Coder.of().encode(value.getTable(), outStream); >>> >>> // Encode TableSchema as JSON string >>> String schemaJson = >>> objectMapper.writeValueAsString(value.getTableSchema()); >>> StringUtf8Coder.of().encode(schemaJson, outStream); >>> }catch (Exception e) { >>> throw new CoderException("Failed to decode project field", e); >>> } >>> } >>> >>> @Override >>> public DestinationInfo decode(InputStream inStream) throws >>> CoderException, IOException { >>> // Decode the String fields >>> String project; >>> String dataset; >>> String table; >>> TableSchema tableSchema; >>> try{ >>> project = StringUtf8Coder.of().decode(inStream); >>> dataset = StringUtf8Coder.of().decode(inStream); >>> table = StringUtf8Coder.of().decode(inStream); >>> >>> // Decode TableSchema from JSON string >>> String schemaJson = StringUtf8Coder.of().decode(inStream); >>> tableSchema = objectMapper.readValue(schemaJson, >>> TableSchema.class); >>> } catch (Exception e) { >>> throw new CoderException("Failed to decode project field", e); >>> } >>> return new DestinationInfo(project, dataset, table, tableSchema); >>> } >>> >>> public static DestinationInfoCoder of() { >>> return new DestinationInfoCoder(); >>> } >>> } >>> >>> >>> On Fri, Oct 25, 2024 at 9:50 PM Reuven Lax <re...@google.com> wrote: >>> >>>> Are you using runner v2? The issue you linked to implies that this only >>>> happened on runner v2. >>>> >>>> On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> Hi Pranjal, >>>>> >>>>> If there is a bug in Beam, this is a good list to contact. If there is >>>>> a problem with a GCP service, then GCP support is better. >>>>> >>>>> I see the code you shared, but what error or difficulty are you >>>>> encountering? >>>>> >>>>> Kenn >>>>> >>>>> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> >>>>> wrote: >>>>> >>>>>> Hi Kenneth / Yi Hun, >>>>>> >>>>>> I have been grappling since past few days to solve for a pipeline >>>>>> which runs as batch pipeline and loads data from GCS to BigQuery, >>>>>> >>>>>> >>>>>> While striving to find solution I found out similar issue and have >>>>>> posted what I am observing her in github issues link ( >>>>>> https://github.com/apache/beam/issues/28219), >>>>>> >>>>>> We are using GCP Dataflow runner as such I have written to GCP >>>>>> support but not much luck there, >>>>>> >>>>>> I will try to briefly explain my problem I am facing here again , >>>>>> >>>>>> Really appreciate any leads on how I can resolve this issue, >>>>>> >>>>>> >>>>>> Trying to use dynamic destinations to load data from GCS to Bigquery >>>>>> to multiple tables / multiple schemas , >>>>>> >>>>>> >>>>>> Tried to put down all required classes I am using here, >>>>>> >>>>>> >>>>>> PCollection<KV<String, TableRow>> kvRows = decompressedLines >>>>>> .apply("Convert To BigQueryRow", ParDo.of(new >>>>>> Utility.ConvertToBigQueryRow(ekaUUID))); >>>>>> >>>>>> // Group by TableId to manage different schemas per table type >>>>>> >>>>>> PCollectionView<Map<String, String>> schemaView = >>>>>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows); >>>>>> >>>>>> WriteResult result = kvRows.apply("WriteToBigQuery", >>>>>> BigQueryIO.<KV<String, TableRow>>write() >>>>>> .to(new BatchDynamicDestination(bigqueryDatasetName, >>>>>> schemaView)) >>>>>> >>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>>>> >>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>>>>> .withFormatFunction(stringTableRowKV -> stringTableRowKV >>>>>> != null ? stringTableRowKV.getValue() : null) >>>>>> ); >>>>>> >>>>>> >>>>>> public static class ConvertToBigQueryRow extends DoFn<KV<String, >>>>>> String>, KV<String, TableRow>> { >>>>>> >>>>>> private static final Logger LOG = >>>>>> LoggerFactory.getLogger(ConvertToBigQueryRow.class); >>>>>> >>>>>> String uuidKeyName; >>>>>> >>>>>> public ConvertToBigQueryRow(String uuidKeyName){ >>>>>> this.uuidKeyName = uuidKeyName; >>>>>> } >>>>>> >>>>>> @ProcessElement >>>>>> public void processElement(ProcessContext c) throws >>>>>> JsonProcessingException { >>>>>> ObjectMapper mapper = new ObjectMapper(); >>>>>> String value = Objects.requireNonNull(c.element().getValue()); >>>>>> String topic = Objects.requireNonNull(c.element().getKey()); >>>>>> // System.out.println("Data in ProcessElem " + value); >>>>>> // Parse the entire value as a JSON tree >>>>>> JsonNode rootNode = mapper.readTree(value); >>>>>> >>>>>> // Extract the "after" field as a JSON node (not as a string) >>>>>> JsonNode afterNode = rootNode.get("after"); >>>>>> >>>>>> // System.out.println("Data in ProcessElem after " + >>>>>> afterNode.asText()); >>>>>> // Check if "after" field exists and is not null >>>>>> if (!afterNode.isNull()) { >>>>>> >>>>>> String afterJsonString = afterNode.asText(); // Extract the >>>>>> string >>>>>> // representation for json node for after >>>>>> JsonNode afterJsonNode = mapper.readTree(afterJsonString); >>>>>> // Parse the string into a JsonNode >>>>>> >>>>>> Map<String, Object> afterMap = new HashMap<>(); >>>>>> try { >>>>>> afterMap = mapper.convertValue(afterNode, new >>>>>> TypeReference<Map<String, Object>>() { >>>>>> }); >>>>>> } catch (IllegalArgumentException e) { >>>>>> afterMap = mapper.convertValue(afterJsonNode, new >>>>>> TypeReference<Map<String, Object>>() { >>>>>> }); >>>>>> } >>>>>> >>>>>> if (afterMap != null) { >>>>>> // System.out.println("Data in ProcessElem afterMap " >>>>>> + afterMap); >>>>>> TableRow row = new TableRow(); >>>>>> for (Map.Entry<String, Object> entry : >>>>>> afterMap.entrySet()) { >>>>>> row.set(entry.getKey(), entry.getValue()); >>>>>> } >>>>>> >>>>>> // Insert eka UUID to our table >>>>>> UUID uuid = UUID.randomUUID(); >>>>>> row.set(this.uuidKeyName, uuid.toString()); >>>>>> >>>>>> >>>>>> LOG.info("T: {}D: {}", topic, row.toString()); >>>>>> c.output(KV.of(topic, row)); >>>>>> }else{ >>>>>> LOG.error("Data in ProcessElem afterMap mostly null "); >>>>>> } >>>>>> } else { >>>>>> LOG.error("The 'after' field is null."); >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> public static class GetSchemaViewFromBigQueryRows { >>>>>> >>>>>> private static final Logger LOG = >>>>>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class); >>>>>> private static final Map<String, TableSchema> schemaCache = new >>>>>> ConcurrentHashMap<>(); >>>>>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>>>>> >>>>>> private static String schemaToJson(TableSchema schema) { >>>>>> try { >>>>>> return OBJECT_MAPPER.writeValueAsString(schema); >>>>>> } catch (JsonProcessingException e) { >>>>>> throw new RuntimeException("Error converting schema to >>>>>> JSON", e); >>>>>> } >>>>>> } >>>>>> >>>>>> // Method to generate a schema for the given type >>>>>> public static String getSchemaForType(String type, TableRow >>>>>> tableRow) { >>>>>> >>>>>> TableSchema schema = schemaCache.get(type); >>>>>> >>>>>> if (schema == null) { >>>>>> schema = new TableSchema().setFields(new ArrayList<>()); >>>>>> schemaCache.put(type, schema); >>>>>> } >>>>>> >>>>>> // old fields >>>>>> // Create a set of existing field names to avoid duplicates >>>>>> Set<String> existingFieldNames = new HashSet<>(); >>>>>> for (TableFieldSchema field : schema.getFields()) { >>>>>> existingFieldNames.add(field.getName()); >>>>>> } >>>>>> >>>>>> // set to new fields >>>>>> TableSchema finalSchema = schema; >>>>>> tableRow.forEach((fieldName, fieldValue) -> { >>>>>> if (!existingFieldNames.contains(fieldName)) { >>>>>> TableFieldSchema fieldSchema = new TableFieldSchema() >>>>>> .setName(fieldName) >>>>>> .setType(determineFieldType(fieldValue)); >>>>>> finalSchema.getFields().add(fieldSchema); >>>>>> } >>>>>> }); >>>>>> >>>>>> // TODO: Add cases for handling schema conflicts also here >>>>>> >>>>>> // update final schema in map >>>>>> schemaCache.put(type, finalSchema); >>>>>> return schemaToJson(finalSchema); >>>>>> } >>>>>> >>>>>> // Method to create a view of schemas as a >>>>>> PCollectionView<Map<String, TableSchema>> >>>>>> public static PCollectionView<Map<String, String>> createSchemasView( >>>>>> PCollection<KV<String, TableRow>> input) { >>>>>> >>>>>> // Map InputData to KV<String, TableSchema> >>>>>> PCollection<KV<String, String>> schemas = >>>>>> input.apply("MapElements for converting KV", >>>>>> >>>>>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), >>>>>> TypeDescriptors.strings())) >>>>>> .via(data -> { >>>>>> assert data != null; >>>>>> return KV.of(data.getKey(), >>>>>> getSchemaForType(data.getKey(), data.getValue())); >>>>>> }) >>>>>> ); >>>>>> >>>>>> // Deduplicate by key (topic) using Combine.perKey() and select >>>>>> any one schema for each key >>>>>> PCollection<KV<String, String>> uniqueSchemas = schemas >>>>>> .apply("DeduplicateSchemas", Combine.perKey((schema1, >>>>>> schema2) -> schema1)); // Keep one schema per topic >>>>>> >>>>>> // Use View.asMap() to create a side input for schemas >>>>>> return uniqueSchemas.apply("ToSchemaView", View.asMap()); >>>>>> } >>>>>> } >>>>>> >>>>>> public class BatchDynamicDestination extends >>>>>> DynamicDestinations<KV<String, TableRow>, DestinationInfo> { >>>>>> >>>>>> private static final Logger LOG = >>>>>> LoggerFactory.getLogger(BatchDynamicDestination.class); >>>>>> >>>>>> private final String datasetName; >>>>>> private final PCollectionView<Map<String, String>> schemaView; >>>>>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>>>>> >>>>>> public BatchDynamicDestination(String datasetName, >>>>>> PCollectionView<Map<String, String>> schemaView) { >>>>>> this.datasetName = datasetName; >>>>>> this.schemaView = schemaView; >>>>>> } >>>>>> >>>>>> @Override >>>>>> public Coder<DestinationInfo> getDestinationCoder() { >>>>>> // Using SerializableCoder because DestinationInfo implements >>>>>> Serializable >>>>>> return DestinationInfoCoder.of(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public java.util.List<PCollectionView<?>> getSideInputs() { >>>>>> // Register the schemaView as a side input >>>>>> return Collections.singletonList(schemaView); >>>>>> } >>>>>> >>>>>> >>>>>> @Override >>>>>> public DestinationInfo getDestination(ValueInSingleWindow<KV<String, >>>>>> TableRow>> element) { >>>>>> >>>>>> >>>>>> Map<String, String> schemas = sideInput(schemaView); >>>>>> String topic = element.getValue().getKey(); >>>>>> >>>>>> String tableSchema = schemas.get(topic); >>>>>> if (tableSchema == null) { >>>>>> throw new RuntimeException("Schema not found for topic: " + >>>>>> topic); >>>>>> } >>>>>> >>>>>> TableSchema tableSchemaObj; >>>>>> try { >>>>>> tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, >>>>>> TableSchema.class); // Deserialize JSON to TableSchema >>>>>> } catch (IOException e) { >>>>>> throw new RuntimeException("Error parsing schema JSON", e); >>>>>> } >>>>>> >>>>>> LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , >>>>>> datasetName, topic, tableSchema); >>>>>> >>>>>> return new DestinationInfo("livewel-prod", datasetName, topic, >>>>>> tableSchemaObj); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public TableDestination getTable(DestinationInfo destination) { >>>>>> return new TableDestination(destination.toTableSpec(), >>>>>> "Dynamically generated table"); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public TableSchema getSchema(DestinationInfo destination) { >>>>>> return destination.getTableSchema(); >>>>>> } >>>>>> >>>>>> >>>>>> } >>>>>> >>>>>>