[ https://issues.apache.org/jira/browse/BEAM-14064?focusedWorklogId=765689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-765689 ]
ASF GitHub Bot logged work on BEAM-14064: ----------------------------------------- Author: ASF GitHub Bot Created on: 03/May/22 20:13 Start Date: 03/May/22 20:13 Worklog Time Spent: 10m Work Description: lukecwik merged PR #17112: URL: https://github.com/apache/beam/pull/17112 Issue Time Tracking ------------------- Worklog Id: (was: 765689) Time Spent: 7h 10m (was: 7h) > ElasticSearchIO#Write buffering and outputting across windows > ------------------------------------------------------------- > > Key: BEAM-14064 > URL: https://issues.apache.org/jira/browse/BEAM-14064 > Project: Beam > Issue Type: Bug > Components: io-java-elasticsearch > Affects Versions: 2.35.0, 2.36.0, 2.37.0 > Reporter: Luke Cwik > Assignee: Evan Galpin > Priority: P1 > Fix For: 2.39.0 > > Time Spent: 7h 10m > Remaining Estimate: 0h > > Source: https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db > Bug PR: https://github.com/apache/beam/pull/15381 > ElasticsearchIO is collecting results from elements in window X and then > trying to output them in window Y when flushing the batch. This exposed a bug > where elements that were being buffered were being output as part of a > different window than what the window that produced them was. > This became visible because validation was added recently to ensure that when > the pipeline is processing elements in window X that output with a timestamp > is valid for window X. Note that this validation only occurs in > *@ProcessElement* since output is associated with the current window with the > input element that is being processed. > It is ok to do this in *@FinishBundle* since there is no existing windowing > context and when you output that element is assigned to an appropriate window. > *Further Context* > We’ve bisected it to being introduced in 2.35.0, and I’m reasonably certain > it’s this PR https://github.com/apache/beam/pull/15381 > Our scenario is pretty trivial, we read off Pubsub and write to Elastic in a > streaming job, the config for the source and sink is respectively > {noformat} > pipeline.apply( > PubsubIO.readStrings().fromSubscription(subscription) > ).apply(ParseJsons.of(OurObject::class.java)) > .setCoder(KryoCoder.of()) > {noformat} > and > {noformat} > ElasticsearchIO.write() > .withUseStatefulBatches(true) > .withMaxParallelRequestsPerWindow(1) > .withMaxBufferingDuration(Duration.standardSeconds(30)) > // 5 bytes **> KiB **> MiB, so 5 MiB > .withMaxBatchSizeBytes(5L * 1024 * 1024) > // # of docs > .withMaxBatchSize(1000) > .withConnectionConfiguration( > ElasticsearchIO.ConnectionConfiguration.create( > arrayOf(host), > "fubar", > "_doc" > ).withConnectTimeout(5000) > .withSocketTimeout(30000) > ) > .withRetryConfiguration( > ElasticsearchIO.RetryConfiguration.create( > 10, > // the duration is wall clock, against the connection and > socket timeouts specified > // above. I.e., 10 x 30s is gonna be more than 3 minutes, > so if we're getting > // 10 socket timeouts in a row, this would ignore the > "10" part and terminate > // after 6. The idea is that in a mixed failure mode, > you'd get different timeouts > // of different durations, and on average 10 x fails < 4m. > // That said, 4m is arbitrary, so adjust as and when > needed. > Duration.standardMinutes(4) > ) > ) > .withIdFn { f: JsonNode -> f["id"].asText() } > .withIndexFn { f: JsonNode -> f["schema_name"].asText() } > .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") == > "delete" } > {noformat} > We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in the > consumer, due to alleged time skew, specifically > {noformat} > 2022-03-07 10:48:37.886 GMTError message from worker: > java.lang.IllegalArgumentException: Cannot output with timestamp > 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the > timestamp of the > current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 > milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the > DoFn#getAllowedTimestampSkew() Javadoc > for details on changing the allowed skew. > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446) > > org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422) > > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364) > > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404) > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419) > org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300) > {noformat} > I’ve bisected it and 2.34 works fine, 2.35 is the first version this breaks, > and it seems like the code in the trace is largely added by the PR linked > above. The error usually claims a skew of a few seconds, but obviously I > can’t override getAllowedTimestampSkew() on the internal Elastic DoFn, and > it’s marked deprecated anyway. > I’m happy to raise a JIRA but I’m not 100% sure what the code was intending > to fix, and additionally, I’d also be happy if someone else can reproduce > this or knows of similar reports. I feel like what we’re doing is not that > uncommon a scenario, so I would have thought someone else would have hit this > by now. -- This message was sent by Atlassian Jira (v8.20.7#820007)