This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f60712fc7d2 CAMEL-21256: camel-github startingSha=last does not work
with large history (#15678)
f60712fc7d2 is described below
commit f60712fc7d2500b456e0a3d08e67808cd4da2fc2
Author: Chris Slater <[email protected]>
AuthorDate: Thu Sep 26 08:20:33 2024 -0600
CAMEL-21256: camel-github startingSha=last does not work with large history
(#15678)
* CAMEL-21256: use most recent sha when startingSha=last, prevent
CommitConsumer.poll() logic from executing until after CommitConsume.doStart()
method completes.
* CAMEL-21256: fix formatting of CommitConsumer constructor arguments
* CAMEL-21256: fix formatting on line 87 of CommitConsumer
* CAMEL-21256: remove extra empty line from CommitConsumerLastTest
---
.../component/github/consumer/CommitConsumer.java | 148 +++++++++++----------
.../github/consumer/CommitConsumerLastTest.java | 100 ++++++++++++++
2 files changed, 176 insertions(+), 72 deletions(-)
diff --git
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
index 0aa19ffe099..03bda8756e3 100644
---
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
+++
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
@@ -43,6 +43,7 @@ public class CommitConsumer extends AbstractGitHubConsumer {
// keep a chunk of the last 100 hashes, so we can filter out duplicates
private final Queue<String> commitHashes = new
LinkedBlockingQueue<>(CAPACITY);
private volatile String lastSha;
+ private boolean started = false;
public CommitConsumer(GitHubEndpoint endpoint, Processor processor, String
branchName,
String startingSha) throws Exception {
@@ -74,98 +75,101 @@ public class CommitConsumer extends AbstractGitHubConsumer
{
@Override
protected void doStart() throws Exception {
- super.doStart();
-
- // ensure we start from clean
- commitHashes.clear();
- lastSha = null;
-
- if (startingSha.equals("last")) {
- LOG.info("Indexing current commits on: {}/{}@{}",
getEndpoint().getRepoOwner(), getEndpoint().getRepoName(),
- branchName);
- List<RepositoryCommit> commits =
commitService.getCommits(getRepository(), branchName, null);
- for (RepositoryCommit commit : commits) {
- String sha = commit.getSha();
- if (!commitHashes.contains(sha)) {
- // make room when adding new elements
- while (commitHashes.size() > CAPACITY - 1) {
- commitHashes.remove();
- }
- commitHashes.add(sha);
+ synchronized (this) {
+ super.doStart();
+
+ // ensure we start from clean
+ commitHashes.clear();
+ lastSha = null;
+
+ if (startingSha.equals("last")) {
+ LOG.info("Indexing current commits on: {}/{}@{}",
getEndpoint().getRepoOwner(), getEndpoint().getRepoName(),
+ branchName);
+ List<RepositoryCommit> commits =
commitService.getCommits(getRepository(), branchName, null);
+ if (!commits.isEmpty()) {
+ lastSha = commits.get(0).getSha();
}
+ LOG.info("Starting from last sha: {}", lastSha);
+ } else if (!startingSha.equals("beginning")) {
+ lastSha = startingSha;
+ LOG.info("Starting from sha: {}", lastSha);
+ } else {
+ LOG.info("Starting from beginning");
}
- if (!commitHashes.isEmpty()) {
- lastSha = commitHashes.peek();
- }
- LOG.info("Starting from last sha: {}", lastSha);
- } else if (!startingSha.equals("beginning")) {
- lastSha = startingSha;
- LOG.info("Starting from sha: {}", lastSha);
- } else {
- LOG.info("Starting from beginning");
+ started = true;
}
}
@Override
protected void doStop() throws Exception {
- super.doStop();
+ synchronized (this) {
+ super.doStop();
- commitHashes.clear();
- lastSha = null;
+ commitHashes.clear();
+ lastSha = null;
+ started = false;
+ }
}
@Override
protected int poll() throws Exception {
- List<RepositoryCommit> commits =
commitService.getCommits(getRepository(), branchName, null);
-
- // clip the list after the last sha
- if (lastSha != null) {
- int pos = -1;
- for (int i = 0; i < commits.size(); i++) {
- RepositoryCommit commit = commits.get(i);
- if (lastSha.equals(commit.getSha())) {
- pos = i;
- break;
- }
- }
- if (pos != -1) {
- commits = commits.subList(0, pos);
+ synchronized (this) {
+
+ if (!started) {
+ return 0;
}
- }
- // In the end, we want tags oldest to newest.
- ArrayDeque<RepositoryCommit> newCommits = new ArrayDeque<>();
- for (RepositoryCommit commit : commits) {
- String sha = commit.getSha();
- if (!commitHashes.contains(sha)) {
- newCommits.push(commit);
- // make room when adding new elements
- while (commitHashes.size() > CAPACITY - 1) {
- commitHashes.remove();
+ List<RepositoryCommit> commits =
commitService.getCommits(getRepository(), branchName, null);
+
+ // clip the list after the last sha
+ if (lastSha != null) {
+ int pos = -1;
+ for (int i = 0; i < commits.size(); i++) {
+ RepositoryCommit commit = commits.get(i);
+ if (lastSha.equals(commit.getSha())) {
+ pos = i;
+ break;
+ }
+ }
+ if (pos != -1) {
+ commits = commits.subList(0, pos);
}
- commitHashes.add(sha);
}
- }
- int counter = 0;
- while (!newCommits.isEmpty()) {
- RepositoryCommit newCommit = newCommits.pop();
- lastSha = newCommit.getSha();
- Exchange e = createExchange(true);
- if (newCommit.getAuthor() != null) {
- e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR,
newCommit.getAuthor().getName());
+ // In the end, we want tags oldest to newest.
+ ArrayDeque<RepositoryCommit> newCommits = new ArrayDeque<>();
+ for (RepositoryCommit commit : commits) {
+ String sha = commit.getSha();
+ if (!commitHashes.contains(sha)) {
+ newCommits.push(commit);
+ // make room when adding new elements
+ while (commitHashes.size() > CAPACITY - 1) {
+ commitHashes.remove();
+ }
+ commitHashes.add(sha);
+ }
}
- if (newCommit.getCommitter() != null) {
-
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_COMMITTER,
newCommit.getCommitter().getName());
+
+ int counter = 0;
+ while (!newCommits.isEmpty()) {
+ RepositoryCommit newCommit = newCommits.pop();
+ lastSha = newCommit.getSha();
+ Exchange e = createExchange(true);
+ if (newCommit.getAuthor() != null) {
+
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR,
newCommit.getAuthor().getName());
+ }
+ if (newCommit.getCommitter() != null) {
+
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_COMMITTER,
newCommit.getCommitter().getName());
+ }
+ e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA,
newCommit.getSha());
+ e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_URL,
newCommit.getUrl());
+ e.getMessage().setBody(newCommit.getCommit().getMessage());
+ getProcessor().process(e);
+ counter++;
}
- e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA,
newCommit.getSha());
- e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_URL,
newCommit.getUrl());
- e.getMessage().setBody(newCommit.getCommit().getMessage());
- getProcessor().process(e);
- counter++;
+ LOG.debug("Last sha: {}", lastSha);
+ return counter;
}
- LOG.debug("Last sha: {}", lastSha);
- return counter;
}
}
diff --git
a/components/camel-github/src/test/java/org/apache/camel/component/github/consumer/CommitConsumerLastTest.java
b/components/camel-github/src/test/java/org/apache/camel/component/github/consumer/CommitConsumerLastTest.java
new file mode 100644
index 00000000000..fc933ba6f65
--- /dev/null
+++
b/components/camel-github/src/test/java/org/apache/camel/component/github/consumer/CommitConsumerLastTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.github.consumer;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.github.GitHubComponentTestBase;
+import org.apache.camel.component.github.GitHubConstants;
+import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
+import org.junit.jupiter.api.Test;
+
+public class CommitConsumerLastTest extends GitHubComponentTestBase {
+
+ @BindToRegistry("myScheduler")
+ private final MyScheduler scheduler = createScheduler();
+
+ private MyScheduler createScheduler() {
+ MyScheduler scheduler = new MyScheduler();
+ scheduler.setDelay(100);
+ scheduler.setInitialDelay(0);
+ return scheduler;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+
from("github://commit/master?startingSha=last&repoOwner=anotherguy&repoName=somerepo&scheduler=#myScheduler")
+ .routeId("foo").noAutoStartup()
+ .process(new GitHubCommitProcessor())
+ .to(mockResultEndpoint);
+ }
+ };
+ }
+
+ @Test
+ public void commitConsumerLongHistoryLastShaTest() throws Exception {
+ for (int i = 0; i < 2000; i++) {
+ commitService.addRepositoryCommit("existing commit " + i);
+ }
+
+ mockResultEndpoint.setAssertPeriod(500);
+ mockResultEndpoint.expectedBodiesReceived("new commit 1", "new commit
2");
+
+ context.getRouteController().startAllRoutes();
+
+ commitService.addRepositoryCommit("new commit 1");
+ commitService.addRepositoryCommit("new commit 2");
+
+ mockResultEndpoint.assertIsSatisfied();
+ }
+
+ public class GitHubCommitProcessor implements Processor {
+ @Override
+ public void process(Exchange exchange) {
+ String author =
exchange.getMessage().getHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR,
String.class);
+ String sha =
exchange.getMessage().getHeader(GitHubConstants.GITHUB_COMMIT_SHA,
String.class);
+ if (log.isDebugEnabled()) {
+ System.out.println(sha);
+ log.debug("Got commit with author: {}: SHA {}", author, sha);
+ }
+ }
+ }
+
+ private static final class MyScheduler extends
DefaultScheduledPollConsumerScheduler {
+
+ @Override
+ public void startScheduler() {
+ super.startScheduler();
+ try {
+ /*
+ adding a delay to the CommitConsumer.doStart() method to
force the CommitConsumer.poll()
+ method to be called before the CommitConsumer.doStart()
finishes which could leave the
+ lastSha variable null
+ */
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}