Sorry, I didn't open a support case. You should open the support case. On Fri, Jun 26, 2020 at 10:41 AM Mohil Khare <mo...@prosimo.io> wrote:
> Thanks a lot Luke for following up on this and opening a dataflow > support. It would be good to know how streamingEngine solved the problem. > I will really appreciate it if you can share a link for a support case > once you open it (if it is possible). > > Thanks and Regards > Mohil > > > > On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik <lc...@google.com> wrote: > >> It seems as though we have seen this failure before for Dataflow and it >> was caused because the side input tags needed to be unique in a streaming >> pipeline. >> >> It looked like this used to be a common occurrence in the Python SDK[1, >> 2] because it generated tags that weren't unique enough. >> >> I would open up a case with Dataflow support with all the information you >> have provided here. >> >> 1: https://issues.apache.org/jira/browse/BEAM-4549 >> 2: https://issues.apache.org/jira/browse/BEAM-4534 >> >> On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare <mo...@prosimo.io> wrote: >> >>> Hi Luke and all, >>> >>> UPDATE: So when I started my job by *enabling the streaming engine and >>> keeping the machine type default for the streaming engine (n1-standard-2)*, >>> the pipeline started successfully. >>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine >>> >>> Still evaluating to make sure that there is no performance degradation >>> by doing so. >>> >>> Thanks and regards >>> Mohil >>> >>> >>> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare <mo...@prosimo.io> wrote: >>> >>>> Hi Luke, >>>> >>>> Let me give you some more details about the code. >>>> As I mentioned before, I am using java sdk 2.19.0 on dataflow. >>>> >>>> Default machine type which n1-standard-4. >>>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks >>>> it up based on number of cores available) >>>> >>>> 1: Code listens for some trigger on pubsub topic: >>>> /** >>>> >>>> * Read From PubSub for topic ANALYTICS_UPDATE and create >>>> PCollection<String> indicating main pipeline to reload * relevant >>>> DataAnalyticsData from BQ table */ static class >>>> MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin, >>>> PCollection<POJORepresentingJobCompleteInfo>> { private final >>>> String subscriptionName; private final String jobProject; >>>> MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String >>>> jobProject) { this.subscriptionName = subscriptionName; >>>> this.jobProject = jobProject; } @Override public >>>> PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) { >>>> return input.getPipeline() >>>> .apply("Read_PubSub_Messages", >>>> PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName)) >>>> .apply("Applying_Windowing", >>>> Window.<PubsubMessage>into(new GlobalWindows()) >>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >>>> .discardingFiredPanes()) >>>> .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, >>>> POJORepresentingJobCompleteInfo>() { @ProcessElement >>>> public void processElement(@Element PubsubMessage input, >>>> OutputReceiver<POJORepresentingJobCompleteInfo> out) { >>>> /*** Read and CReate ***/ >>>> >>>> out.output(POJORepresentingJobCompleteInfo); >>>> } })); } } >>>> >>>> 2: Get Latest Updated and Reload new Updates from various BQ tables using >>>> google cloud bigquery library >>>> (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries) >>>> >>>> PCollection<POJORepresentingJobCompleteInfo> >>>> analyticsDataStatusUpdates = >>>> p.apply("Get_Analytics_Data_Status_Updates_pubsub", >>>> >>>> new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, >>>> jobProject)); >>>> >>>> >>>> 3. Create various PCollectionViews to be used as side input for decorating >>>> stream of logs coming from Kafka (will be shown later) >>>> >>>> PCollectionView<Map<Stats1Key, Stats1>> Stats1View = >>>> >>>> analyticsDataStatusUpdates >>>> .apply("Reload_Stats1_FromBQ", new ReadStats1()) >>>> .apply("View_Stats1", View.asSingleton()); >>>> >>>> >>>> PCollectionView<Map<Stats2Key, Stats2>> Stats2View = >>>> >>>> analyticsDataStatusUpdates >>>> .apply("Reload_Stats1_FromBQ", new ReadStats2()) >>>> .apply("View_Stats1", View.asSingleton()); >>>> >>>> . >>>> >>>> . >>>> >>>> . >>>> >>>> . and so one >>>> >>>> >>>> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), >>>> ReadStats2() and so on >>>> >>>> class ReadStatsS1 extends >>>> PTransform<PCollection<POJORepresentingJobCompleteInfo>, >>>> PCollection<Map<Stats1Key, Stats1>>> { >>>> >>>> @Override public PCollection<Map<Stats1Key, Stats1>> >>>> expand(PCollection<POJORepresentingJobCompleteInfo> input) { return >>>> input .apply("Read_From_BigQuery", ParDo.of(new >>>> BigQueryRread())) .apply("Applying_Windowing", >>>> Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows()) >>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >>>> .discardingFiredPanes()); } private class BigQueryRread >>>> extends DoFn<POJORepresentingJobCompleteInfo, Map<Stats1Key, Stats1>> { >>>> @ProcessElement public void process(@Element >>>> POJORepresentingJobCompleteInfo input, ProcessContext c) { >>>> Map<Stats1Key, Stats1> resultMap = new HashMap<>(); >>>> try { BigQuery bigQueryClient = >>>> BigQueryOptions.getDefaultInstance().getService(); String >>>> sqlQuery = getSqlQuery(input); ///some method to return desired sql query >>>> based on info present in input QueryJobConfiguration >>>> queryJobConfiguration = >>>> QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build(); >>>> // Create a job ID so that we can safely retry. >>>> JobId jobId = JobId.of(UUID.randomUUID().toString()); >>>> Job queryJob = >>>> bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build()); >>>> // Wait for the query to complete. queryJob >>>> = queryJob.waitFor(); if (queryJob == null) { >>>> logger.p1Error("Big Query Job no longer exists"); } >>>> else if (queryJob.getStatus().getError() != null) { // >>>> You can also look at queryJob.getStatus().getExecutionErrors() for all >>>> // errors, not just the latest one. >>>> logger.p1Error("Big Query job returned error: {}", >>>> queryJob.getStatus().getError().toString()); } else { >>>> //successful case logger.info("Parsing >>>> results executed by BigQuery"); // Get the results. >>>> TableResult result = queryJob.getQueryResults(); >>>> if (null == result || !result.iterateAll().iterator().hasNext()) { >>>> logger.info("No data found for query: {}", >>>> sqlQuery); } else { // Print all >>>> pages of the results. for (FieldValueList row : >>>> result.iterateAll()) { /*** Parse row and >>>> create Stats1Key and Stats from that row/ >>>> resultMap.put(key, stats); } >>>> } } } } catch (Exception >>>> ex) { logger.p1Error("Error in executing sql query against >>>> Big Query", ex); } logger.info("Emitting map of >>>> size: {}", resultMap.size()); c.output(resultMap); } } >>>> >>>> As I mentioned before all classes : ReadStats1(), ReadStats2() etc >>>> follow above code design >>>> >>>> >>>> 5. Using KafkaIO we read continuous stream of data from kafka >>>> >>>> PCollection<POJO> Logs = >>>> >>>> p .apply("Read__Logs_From_Kafka", KafkaIO.<String, >>>> byte[]>read() .withBootstrapServers(String.join(",", >>>> bootstrapServerToConnectTo)) .withTopic("topic") >>>> .withKeyDeserializer(StringDeserializer.class) >>>> .withValueDeserializer(ByteArrayDeserializer.class) >>>> .withConsumerConfigUpdates(kafkaConsumerProperties) >>>> .withConsumerFactoryFn(consumerFactoryObj) >>>> .commitOffsetsInFinalize()) >>>> .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, >>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) >>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))) >>>> .withAllowedLateness(Duration.standardDays(1)) >>>> .discardingFiredPanes()) >>>> .apply("Convert_KafkaRecord_To_PCollection<POJO>", >>>> ParDo.of(new ParseLogs()); >>>> >>>> >>>> 6. Take these logs and apply another Transform providing aforementioned BQ >>>> reads as side input i.e. something like this >>>> >>>> Logs.apply("decorate", new Decorate().withSideInput(Stats1View, >>>> Stats2View...); >>>> >>>> >>>> Please Note: I tried commenting out code where I added side input to the >>>> above transform and still landed up in the same crash. So Issue is >>>> definitely in adding >>>> >>>> more than a certain number of PCollectionView transforms. I already had >>>> 3-4 such transforms and it was working fine. Yesterday I added a few more >>>> and started seeing crashes. >>>> >>>> If I enable just one of the newly added PCollectionView transforms >>>> (keeping old 3-4 intact), then everything works fine. Moment I enable >>>> another new transform, a crash happens. >>>> >>>> >>>> Hope this provides some more insight. Let me know if I need to open a >>>> ticket or I am doing something wrong or some extra configuration or >>>> different worker machine type need to be provided. >>>> >>>> >>>> Thanks and Regards >>>> >>>> Mohil >>>> >>>> >>>> On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote: >>>> >>>>> Hi Luke, >>>>> >>>>> I can send you a code snippet with more details if it helps. >>>>> >>>>> BTW found similar issue here: >>>>> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3ccaf9t7_74pkr7fj51-6_tbsycz9aiz_xsm7rcali5kmkd1ng...@mail.gmail.com%3E >>>>> >>>>> Thanks and Regards >>>>> Mohil >>>>> >>>>> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote: >>>>> >>>>>> Hi Luke, >>>>>> Thanks for your response, I tried looking at worker logs using the >>>>>> logging service of GCP and unable to get a clear picture. Not sure if its >>>>>> due to memory pressure or low number of harness threads. >>>>>> Attaching a few more screenshots of crash logs that I found as wells >>>>>> json dump of logs. >>>>>> >>>>>> Let me know if you still think opening a ticket is a right way to go. >>>>>> >>>>>> Thanks and regards >>>>>> Mohil >>>>>> >>>>>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote: >>>>>> >>>>>>> Try looking at the worker logs to get a full stack trace. Take a >>>>>>> look at this page for some debugging guidance[1] or consider opening a >>>>>>> support case with GCP. >>>>>>> >>>>>>> 1: >>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline >>>>>>> >>>>>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> >>>>>>> wrote: >>>>>>> >>>>>>>> BTW, just to make sure that there is no issue with any individual >>>>>>>> PTransform, I enabled each one of them one by one and the pipeline >>>>>>>> started >>>>>>>> successfully. Issue happens as soon as I enable more than one new >>>>>>>> aforementioned PTransform. >>>>>>>> >>>>>>>> Thanks and regards >>>>>>>> Mohil >>>>>>>> >>>>>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello All, >>>>>>>>> >>>>>>>>> I am using the BEAM java 2.19.0 version on google dataflow. >>>>>>>>> >>>>>>>>> Need urgent help in debugging one issue. >>>>>>>>> >>>>>>>>> I recently added 3-4 new PTransformations. to an existing pipeline >>>>>>>>> where I read data from BQ for a certain timestamp and create >>>>>>>>> PCollectionView<Map<Key,value>> to be used as side input in other >>>>>>>>> PTransforms. >>>>>>>>> >>>>>>>>> i.e. something like this: >>>>>>>>> >>>>>>>>> /** >>>>>>>>> * Get PCollectionView Stats1 >>>>>>>>> */ >>>>>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View = >>>>>>>>> jobCompleteStatus >>>>>>>>> .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1()) >>>>>>>>> .apply("View_S1STATS", View.asSingleton()); >>>>>>>>> >>>>>>>>> /** >>>>>>>>> * Get PCollectionView of Stats2 >>>>>>>>> */ >>>>>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View = >>>>>>>>> jobCompleteStatus >>>>>>>>> .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2()) >>>>>>>>> .apply("View_S2STATS", View.asSingleton()); >>>>>>>>> >>>>>>>>> >>>>>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a >>>>>>>>> message >>>>>>>>> >>>>>>>>> received from PubSub that act as a trigger to reload these views. >>>>>>>>> >>>>>>>>> The moment I deployed the above pipeline, it didn't start and >>>>>>>>> >>>>>>>>> error reporting gave weird exceptions(see attached screenshot1 and >>>>>>>>> screenshot) which I don't know how to debug. >>>>>>>>> >>>>>>>>> >>>>>>>>> Then as an experiment I made a change where I enabled only one new >>>>>>>>> transformation >>>>>>>>> >>>>>>>>> and disabled others. This time I didn't see any issue. >>>>>>>>> >>>>>>>>> So it looks like some memory issue. >>>>>>>>> >>>>>>>>> I also compared worker logs between working case and non working case >>>>>>>>> >>>>>>>>> and it looks resources were not granted in non working case. >>>>>>>>> >>>>>>>>> (See attached working-workerlogs and nonworking-workerlogs) >>>>>>>>> >>>>>>>>> I could't find any other log. >>>>>>>>> >>>>>>>>> >>>>>>>>> I would really appreciate quick help here. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks and Regards >>>>>>>>> >>>>>>>>> Mohil >>>>>>>>> >>>>>>>>> >>>>>>>>>