This is running in batch mode though? In that case I would expect the pane to always be 0.
On Fri, Oct 25, 2024 at 9:46 AM Priyans Desai via dev <dev@beam.apache.org> wrote: > From the issue report, it looks like it is happening for both with and > without runner V2: > https://github.com/apache/beam/issues/28219#issuecomment-2423624052 > > On Fri, Oct 25, 2024 at 12:21 PM Reuven Lax via dev <dev@beam.apache.org> > wrote: > >> Are you using runner v2? The issue you linked to implies that this only >> happened on runner v2. >> >> On Fri, Oct 25, 2024 at 8:26 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> Hi Pranjal, >>> >>> If there is a bug in Beam, this is a good list to contact. If there is a >>> problem with a GCP service, then GCP support is better. >>> >>> I see the code you shared, but what error or difficulty are you >>> encountering? >>> >>> Kenn >>> >>> On Mon, Oct 21, 2024 at 2:33 PM Pranjal Pandit <pran...@eka.care> wrote: >>> >>>> Hi Kenneth / Yi Hun, >>>> >>>> I have been grappling since past few days to solve for a pipeline which >>>> runs as batch pipeline and loads data from GCS to BigQuery, >>>> >>>> >>>> While striving to find solution I found out similar issue and have >>>> posted what I am observing her in github issues link ( >>>> https://github.com/apache/beam/issues/28219), >>>> >>>> We are using GCP Dataflow runner as such I have written to GCP >>>> support but not much luck there, >>>> >>>> I will try to briefly explain my problem I am facing here again , >>>> >>>> Really appreciate any leads on how I can resolve this issue, >>>> >>>> >>>> Trying to use dynamic destinations to load data from GCS to Bigquery to >>>> multiple tables / multiple schemas , >>>> >>>> >>>> Tried to put down all required classes I am using here, >>>> >>>> >>>> PCollection<KV<String, TableRow>> kvRows = decompressedLines >>>> .apply("Convert To BigQueryRow", ParDo.of(new >>>> Utility.ConvertToBigQueryRow(ekaUUID))); >>>> >>>> // Group by TableId to manage different schemas per table type >>>> >>>> PCollectionView<Map<String, String>> schemaView = >>>> Utility.GetSchemaViewFromBigQueryRows.createSchemasView(kvRows); >>>> >>>> WriteResult result = kvRows.apply("WriteToBigQuery", >>>> BigQueryIO.<KV<String, TableRow>>write() >>>> .to(new BatchDynamicDestination(bigqueryDatasetName, >>>> schemaView)) >>>> >>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) >>>> >>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>>> .withFormatFunction(stringTableRowKV -> stringTableRowKV >>>> != null ? stringTableRowKV.getValue() : null) >>>> ); >>>> >>>> >>>> public static class ConvertToBigQueryRow extends DoFn<KV<String, String>, >>>> KV<String, TableRow>> { >>>> >>>> private static final Logger LOG = >>>> LoggerFactory.getLogger(ConvertToBigQueryRow.class); >>>> >>>> String uuidKeyName; >>>> >>>> public ConvertToBigQueryRow(String uuidKeyName){ >>>> this.uuidKeyName = uuidKeyName; >>>> } >>>> >>>> @ProcessElement >>>> public void processElement(ProcessContext c) throws >>>> JsonProcessingException { >>>> ObjectMapper mapper = new ObjectMapper(); >>>> String value = Objects.requireNonNull(c.element().getValue()); >>>> String topic = Objects.requireNonNull(c.element().getKey()); >>>> // System.out.println("Data in ProcessElem " + value); >>>> // Parse the entire value as a JSON tree >>>> JsonNode rootNode = mapper.readTree(value); >>>> >>>> // Extract the "after" field as a JSON node (not as a string) >>>> JsonNode afterNode = rootNode.get("after"); >>>> >>>> // System.out.println("Data in ProcessElem after " + >>>> afterNode.asText()); >>>> // Check if "after" field exists and is not null >>>> if (!afterNode.isNull()) { >>>> >>>> String afterJsonString = afterNode.asText(); // Extract the >>>> string >>>> // representation for json node for after >>>> JsonNode afterJsonNode = mapper.readTree(afterJsonString); // >>>> Parse the string into a JsonNode >>>> >>>> Map<String, Object> afterMap = new HashMap<>(); >>>> try { >>>> afterMap = mapper.convertValue(afterNode, new >>>> TypeReference<Map<String, Object>>() { >>>> }); >>>> } catch (IllegalArgumentException e) { >>>> afterMap = mapper.convertValue(afterJsonNode, new >>>> TypeReference<Map<String, Object>>() { >>>> }); >>>> } >>>> >>>> if (afterMap != null) { >>>> // System.out.println("Data in ProcessElem afterMap " + >>>> afterMap); >>>> TableRow row = new TableRow(); >>>> for (Map.Entry<String, Object> entry : >>>> afterMap.entrySet()) { >>>> row.set(entry.getKey(), entry.getValue()); >>>> } >>>> >>>> // Insert eka UUID to our table >>>> UUID uuid = UUID.randomUUID(); >>>> row.set(this.uuidKeyName, uuid.toString()); >>>> >>>> >>>> LOG.info("T: {}D: {}", topic, row.toString()); >>>> c.output(KV.of(topic, row)); >>>> }else{ >>>> LOG.error("Data in ProcessElem afterMap mostly null "); >>>> } >>>> } else { >>>> LOG.error("The 'after' field is null."); >>>> } >>>> } >>>> } >>>> >>>> >>>> >>>> public static class GetSchemaViewFromBigQueryRows { >>>> >>>> private static final Logger LOG = >>>> LoggerFactory.getLogger(GetSchemaViewFromBigQueryRows.class); >>>> private static final Map<String, TableSchema> schemaCache = new >>>> ConcurrentHashMap<>(); >>>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>>> >>>> private static String schemaToJson(TableSchema schema) { >>>> try { >>>> return OBJECT_MAPPER.writeValueAsString(schema); >>>> } catch (JsonProcessingException e) { >>>> throw new RuntimeException("Error converting schema to JSON", >>>> e); >>>> } >>>> } >>>> >>>> // Method to generate a schema for the given type >>>> public static String getSchemaForType(String type, TableRow tableRow) { >>>> >>>> TableSchema schema = schemaCache.get(type); >>>> >>>> if (schema == null) { >>>> schema = new TableSchema().setFields(new ArrayList<>()); >>>> schemaCache.put(type, schema); >>>> } >>>> >>>> // old fields >>>> // Create a set of existing field names to avoid duplicates >>>> Set<String> existingFieldNames = new HashSet<>(); >>>> for (TableFieldSchema field : schema.getFields()) { >>>> existingFieldNames.add(field.getName()); >>>> } >>>> >>>> // set to new fields >>>> TableSchema finalSchema = schema; >>>> tableRow.forEach((fieldName, fieldValue) -> { >>>> if (!existingFieldNames.contains(fieldName)) { >>>> TableFieldSchema fieldSchema = new TableFieldSchema() >>>> .setName(fieldName) >>>> .setType(determineFieldType(fieldValue)); >>>> finalSchema.getFields().add(fieldSchema); >>>> } >>>> }); >>>> >>>> // TODO: Add cases for handling schema conflicts also here >>>> >>>> // update final schema in map >>>> schemaCache.put(type, finalSchema); >>>> return schemaToJson(finalSchema); >>>> } >>>> >>>> // Method to create a view of schemas as a PCollectionView<Map<String, >>>> TableSchema>> >>>> public static PCollectionView<Map<String, String>> createSchemasView( >>>> PCollection<KV<String, TableRow>> input) { >>>> >>>> // Map InputData to KV<String, TableSchema> >>>> PCollection<KV<String, String>> schemas = input.apply("MapElements >>>> for converting KV", >>>> >>>> MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), >>>> TypeDescriptors.strings())) >>>> .via(data -> { >>>> assert data != null; >>>> return KV.of(data.getKey(), >>>> getSchemaForType(data.getKey(), data.getValue())); >>>> }) >>>> ); >>>> >>>> // Deduplicate by key (topic) using Combine.perKey() and select >>>> any one schema for each key >>>> PCollection<KV<String, String>> uniqueSchemas = schemas >>>> .apply("DeduplicateSchemas", Combine.perKey((schema1, >>>> schema2) -> schema1)); // Keep one schema per topic >>>> >>>> // Use View.asMap() to create a side input for schemas >>>> return uniqueSchemas.apply("ToSchemaView", View.asMap()); >>>> } >>>> } >>>> >>>> public class BatchDynamicDestination extends >>>> DynamicDestinations<KV<String, TableRow>, DestinationInfo> { >>>> >>>> private static final Logger LOG = >>>> LoggerFactory.getLogger(BatchDynamicDestination.class); >>>> >>>> private final String datasetName; >>>> private final PCollectionView<Map<String, String>> schemaView; >>>> private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); >>>> >>>> public BatchDynamicDestination(String datasetName, >>>> PCollectionView<Map<String, String>> schemaView) { >>>> this.datasetName = datasetName; >>>> this.schemaView = schemaView; >>>> } >>>> >>>> @Override >>>> public Coder<DestinationInfo> getDestinationCoder() { >>>> // Using SerializableCoder because DestinationInfo implements >>>> Serializable >>>> return DestinationInfoCoder.of(); >>>> } >>>> >>>> @Override >>>> public java.util.List<PCollectionView<?>> getSideInputs() { >>>> // Register the schemaView as a side input >>>> return Collections.singletonList(schemaView); >>>> } >>>> >>>> >>>> @Override >>>> public DestinationInfo getDestination(ValueInSingleWindow<KV<String, >>>> TableRow>> element) { >>>> >>>> >>>> Map<String, String> schemas = sideInput(schemaView); >>>> String topic = element.getValue().getKey(); >>>> >>>> String tableSchema = schemas.get(topic); >>>> if (tableSchema == null) { >>>> throw new RuntimeException("Schema not found for topic: " + >>>> topic); >>>> } >>>> >>>> TableSchema tableSchemaObj; >>>> try { >>>> tableSchemaObj = OBJECT_MAPPER.readValue(tableSchema, >>>> TableSchema.class); // Deserialize JSON to TableSchema >>>> } catch (IOException e) { >>>> throw new RuntimeException("Error parsing schema JSON", e); >>>> } >>>> >>>> LOG.info("datasetname: {}, topic: {}, tableSchema: {}" , >>>> datasetName, topic, tableSchema); >>>> >>>> return new DestinationInfo("livewel-prod", datasetName, topic, >>>> tableSchemaObj); >>>> } >>>> >>>> @Override >>>> public TableDestination getTable(DestinationInfo destination) { >>>> return new TableDestination(destination.toTableSpec(), >>>> "Dynamically generated table"); >>>> } >>>> >>>> @Override >>>> public TableSchema getSchema(DestinationInfo destination) { >>>> return destination.getTableSchema(); >>>> } >>>> >>>> >>>> } >>>> >>>>