Sorry, I didn't open a support case. You should open the support case.

On Fri, Jun 26, 2020 at 10:41 AM Mohil Khare <mo...@prosimo.io> wrote:

> Thanks a lot Luke for following up on this and opening a dataflow
> support.  It would be good to know how streamingEngine solved the problem.
> I will really appreciate it if you can share a link for a support case
> once you open it (if it is possible).
>
> Thanks and Regards
> Mohil
>
>
>
> On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik <lc...@google.com> wrote:
>
>> It seems as though we have seen this failure before for Dataflow and it
>> was caused because the side input tags needed to be unique in a streaming
>> pipeline.
>>
>> It looked like this used to be a common occurrence in the Python SDK[1,
>> 2] because it generated tags that weren't unique enough.
>>
>> I would open up a case with Dataflow support with all the information you
>> have provided here.
>>
>> 1: https://issues.apache.org/jira/browse/BEAM-4549
>> 2: https://issues.apache.org/jira/browse/BEAM-4534
>>
>> On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare <mo...@prosimo.io> wrote:
>>
>>> Hi Luke and all,
>>>
>>> UPDATE: So when I started my job by *enabling the streaming engine and
>>> keeping the machine type default for the streaming engine (n1-standard-2)*,
>>> the pipeline started successfully.
>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine
>>>
>>> Still evaluating to make sure that there is no performance degradation
>>> by doing so.
>>>
>>> Thanks and regards
>>> Mohil
>>>
>>>
>>> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> Hi Luke,
>>>>
>>>> Let me give you some more details about the code.
>>>> As I mentioned before, I am using java sdk 2.19.0 on dataflow.
>>>>
>>>> Default machine type which n1-standard-4.
>>>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks
>>>> it up based on number of cores available)
>>>>
>>>> 1: Code listens for some trigger on pubsub topic:
>>>>         /**
>>>>
>>>>      * Read From PubSub for topic ANALYTICS_UPDATE and create 
>>>> PCollection<String> indicating main pipeline to reload     * relevant 
>>>> DataAnalyticsData from BQ table     */    static class 
>>>> MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin, 
>>>> PCollection<POJORepresentingJobCompleteInfo>> {        private final 
>>>> String subscriptionName;        private final String jobProject;        
>>>> MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String 
>>>> jobProject) {            this.subscriptionName = subscriptionName;         
>>>>    this.jobProject = jobProject;        }        @Override        public 
>>>> PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) {        
>>>>     return input.getPipeline()                
>>>> .apply("Read_PubSub_Messages", 
>>>> PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))
>>>>                 .apply("Applying_Windowing", 
>>>> Window.<PubsubMessage>into(new GlobalWindows())                    
>>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))          
>>>>           .discardingFiredPanes())                
>>>> .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, 
>>>> POJORepresentingJobCompleteInfo>() {                    @ProcessElement    
>>>>                 public void processElement(@Element PubsubMessage input, 
>>>> OutputReceiver<POJORepresentingJobCompleteInfo> out) {                     
>>>>    /*** Read and CReate ***/
>>>>
>>>>                         out.output(POJORepresentingJobCompleteInfo);       
>>>>                                     }                }));        }    }
>>>>
>>>> 2: Get Latest Updated and Reload new Updates from various BQ tables using 
>>>> google cloud bigquery library 
>>>> (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)
>>>>
>>>>     PCollection<POJORepresentingJobCompleteInfo> 
>>>> analyticsDataStatusUpdates = 
>>>> p.apply("Get_Analytics_Data_Status_Updates_pubsub",
>>>>
>>>>             new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, 
>>>> jobProject));
>>>>
>>>>
>>>> 3. Create various PCollectionViews to be used as side input for decorating 
>>>> stream of logs coming from Kafka (will be shown later)
>>>>
>>>>    PCollectionView<Map<Stats1Key, Stats1>> Stats1View =
>>>>
>>>>             analyticsDataStatusUpdates                 
>>>> .apply("Reload_Stats1_FromBQ", new ReadStats1())                 
>>>> .apply("View_Stats1", View.asSingleton());
>>>>
>>>>
>>>>    PCollectionView<Map<Stats2Key, Stats2>> Stats2View =
>>>>
>>>>             analyticsDataStatusUpdates                 
>>>> .apply("Reload_Stats1_FromBQ", new ReadStats2())                 
>>>> .apply("View_Stats1", View.asSingleton());
>>>>
>>>>    .
>>>>
>>>>    .
>>>>
>>>>    .
>>>>
>>>>    . and so one
>>>>
>>>>
>>>> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), 
>>>> ReadStats2() and so on
>>>>
>>>>    class ReadStatsS1 extends 
>>>> PTransform<PCollection<POJORepresentingJobCompleteInfo>, 
>>>> PCollection<Map<Stats1Key, Stats1>>> {
>>>>
>>>>       @Override    public PCollection<Map<Stats1Key, Stats1>> 
>>>> expand(PCollection<POJORepresentingJobCompleteInfo> input) {        return 
>>>> input            .apply("Read_From_BigQuery", ParDo.of(new 
>>>> BigQueryRread()))            .apply("Applying_Windowing", 
>>>> Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())                
>>>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))          
>>>>       .discardingFiredPanes());    }    private class BigQueryRread 
>>>> extends DoFn<POJORepresentingJobCompleteInfo, Map<Stats1Key, Stats1>> {    
>>>>     @ProcessElement        public void process(@Element 
>>>> POJORepresentingJobCompleteInfo input, ProcessContext c) {            
>>>> Map<Stats1Key, Stats1> resultMap = new HashMap<>();                       
>>>> try {                BigQuery bigQueryClient = 
>>>> BigQueryOptions.getDefaultInstance().getService();                String 
>>>> sqlQuery = getSqlQuery(input); ///some method to return desired sql query 
>>>> based on info present in input                QueryJobConfiguration 
>>>> queryJobConfiguration =                    
>>>> QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build(); 
>>>>                // Create a job ID so that we can safely retry.             
>>>>    JobId jobId = JobId.of(UUID.randomUUID().toString());                
>>>> Job queryJob = 
>>>> bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());
>>>>                 // Wait for the query to complete.                queryJob 
>>>> = queryJob.waitFor();                if (queryJob == null) {               
>>>>      logger.p1Error("Big Query Job no longer exists");                } 
>>>> else if (queryJob.getStatus().getError() != null) {                    // 
>>>> You can also look at queryJob.getStatus().getExecutionErrors() for all     
>>>>                // errors, not just the latest one.                    
>>>> logger.p1Error("Big Query job returned error: {}", 
>>>> queryJob.getStatus().getError().toString());                } else {       
>>>>              //successful case                    logger.info("Parsing 
>>>> results executed by BigQuery");                    // Get the results.     
>>>>                TableResult result = queryJob.getQueryResults();            
>>>>         if (null == result || !result.iterateAll().iterator().hasNext()) { 
>>>>                        logger.info("No data found for query: {}", 
>>>> sqlQuery);                    } else {                        // Print all 
>>>> pages of the results.                        for (FieldValueList row : 
>>>> result.iterateAll()) {                                /*** Parse row and 
>>>> create Stats1Key and Stats from that row/                                
>>>> resultMap.put(key, stats);                            }                    
>>>>     }                    }                }            } catch (Exception 
>>>> ex) {                logger.p1Error("Error in executing sql query against 
>>>> Big Query", ex);            }            logger.info("Emitting map of 
>>>> size: {}", resultMap.size());            c.output(resultMap);        }    }
>>>>
>>>>     As I mentioned before all classes : ReadStats1(), ReadStats2() etc 
>>>> follow above code design
>>>>
>>>>
>>>> 5. Using KafkaIO we read continuous stream of data from kafka
>>>>
>>>>     PCollection<POJO> Logs =
>>>>
>>>>         p            .apply("Read__Logs_From_Kafka", KafkaIO.<String, 
>>>> byte[]>read()                .withBootstrapServers(String.join(",", 
>>>> bootstrapServerToConnectTo))                .withTopic("topic")            
>>>>     .withKeyDeserializer(StringDeserializer.class)                
>>>> .withValueDeserializer(ByteArrayDeserializer.class)                
>>>> .withConsumerConfigUpdates(kafkaConsumerProperties)                
>>>> .withConsumerFactoryFn(consumerFactoryObj)                
>>>> .commitOffsetsInFinalize())            
>>>> .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, 
>>>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))                
>>>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
>>>>                 .withAllowedLateness(Duration.standardDays(1))             
>>>>    .discardingFiredPanes())            
>>>> .apply("Convert_KafkaRecord_To_PCollection<POJO>",                
>>>> ParDo.of(new ParseLogs());
>>>>
>>>>
>>>> 6. Take these logs and apply another Transform providing aforementioned BQ 
>>>> reads as side input i.e. something like this
>>>>
>>>>     Logs.apply("decorate", new Decorate().withSideInput(Stats1View, 
>>>> Stats2View...);
>>>>
>>>>
>>>> Please Note: I tried commenting out code where I added side input to the 
>>>> above transform and still landed up in the same crash. So Issue is 
>>>> definitely in adding
>>>>
>>>> more than a certain number of PCollectionView transforms. I already had 
>>>> 3-4 such transforms and it was working fine. Yesterday I added a few more 
>>>> and started seeing crashes.
>>>>
>>>> If I enable just one of the newly added PCollectionView transforms 
>>>> (keeping old 3-4 intact), then everything works fine. Moment I enable 
>>>> another new transform, a crash happens.
>>>>
>>>>
>>>> Hope this provides some more insight. Let me know if I need to open a 
>>>> ticket or I am doing something wrong or some extra configuration or 
>>>> different worker machine type need to be provided.
>>>>
>>>>
>>>> Thanks and Regards
>>>>
>>>> Mohil
>>>>
>>>>
>>>> On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hi Luke,
>>>>>
>>>>> I can send you a code snippet with more details if it helps.
>>>>>
>>>>> BTW found similar issue here:
>>>>> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3ccaf9t7_74pkr7fj51-6_tbsycz9aiz_xsm7rcali5kmkd1ng...@mail.gmail.com%3E
>>>>>
>>>>> Thanks and Regards
>>>>> Mohil
>>>>>
>>>>> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>>
>>>>>> Hi Luke,
>>>>>> Thanks for your response, I tried looking at worker logs using the
>>>>>> logging service of GCP and unable to get a clear picture. Not sure if its
>>>>>> due to memory pressure or low number of harness threads.
>>>>>> Attaching a few more screenshots of crash logs that I found as wells
>>>>>> json dump of logs.
>>>>>>
>>>>>> Let me know if you still think opening a ticket is a right way to go.
>>>>>>
>>>>>> Thanks and regards
>>>>>> Mohil
>>>>>>
>>>>>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote:
>>>>>>
>>>>>>> Try looking at the worker logs to get a full stack trace. Take a
>>>>>>> look at this page for some debugging guidance[1] or consider opening a
>>>>>>> support case with GCP.
>>>>>>>
>>>>>>> 1:
>>>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>>>>>
>>>>>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> BTW, just to make sure that there is no issue with any individual
>>>>>>>> PTransform, I enabled each one of them one by one and the pipeline 
>>>>>>>> started
>>>>>>>> successfully. Issue happens as soon as I enable more than one new
>>>>>>>> aforementioned PTransform.
>>>>>>>>
>>>>>>>> Thanks and regards
>>>>>>>> Mohil
>>>>>>>>
>>>>>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello All,
>>>>>>>>>
>>>>>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>>>>>
>>>>>>>>> Need urgent help in debugging one issue.
>>>>>>>>>
>>>>>>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>>>>>>> where I read data from BQ for a certain timestamp and create
>>>>>>>>> PCollectionView<Map<Key,value>> to be used as side input in other
>>>>>>>>> PTransforms.
>>>>>>>>>
>>>>>>>>> i.e. something like this:
>>>>>>>>>
>>>>>>>>> /**
>>>>>>>>>  * Get PCollectionView Stats1
>>>>>>>>>  */
>>>>>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>>>>>>>     jobCompleteStatus
>>>>>>>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>>>>>>         .apply("View_S1STATS", View.asSingleton());
>>>>>>>>>
>>>>>>>>> /**
>>>>>>>>>  * Get PCollectionView of Stats2
>>>>>>>>>  */
>>>>>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>>>>>>>     jobCompleteStatus
>>>>>>>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>>>>>>         .apply("View_S2STATS", View.asSingleton());
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a 
>>>>>>>>> message
>>>>>>>>>
>>>>>>>>> received from PubSub that act as a trigger to reload these views.
>>>>>>>>>
>>>>>>>>> The moment I deployed the above pipeline, it didn't start and
>>>>>>>>>
>>>>>>>>> error reporting gave weird exceptions(see attached screenshot1 and 
>>>>>>>>> screenshot) which I don't know how to debug.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Then as an experiment I made a change where I enabled only one new 
>>>>>>>>> transformation
>>>>>>>>>
>>>>>>>>> and disabled others. This time I didn't see any issue.
>>>>>>>>>
>>>>>>>>> So it looks like some memory issue.
>>>>>>>>>
>>>>>>>>> I also compared worker logs between working case and non working case
>>>>>>>>>
>>>>>>>>> and it looks resources were not granted in non working case.
>>>>>>>>>
>>>>>>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>>>>>>
>>>>>>>>> I could't find any other log.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I would really appreciate quick help here.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks and Regards
>>>>>>>>>
>>>>>>>>> Mohil
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Reply via email to