[
https://issues.apache.org/jira/browse/AVRO-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16461359#comment-16461359
]
ASF GitHub Bot commented on AVRO-2172:
--------------------------------------
cutting closed pull request #308: AVRO-2172: Avro binding for gRPC (Java
Implementation)
URL: https://github.com/apache/avro/pull/308
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/lang/java/grpc/pom.xml b/lang/java/grpc/pom.xml
new file mode 100644
index 000000000..a64d8cd54
--- /dev/null
+++ b/lang/java/grpc/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-parent</artifactId>
+ <version>1.9.0-SNAPSHOT</version>
+ <relativePath>../</relativePath>
+ </parent>
+
+ <artifactId>avro-grpc</artifactId>
+
+ <name>Apache Avro gRPC</name>
+ <description>Avro IDL based RPC and serialization over gRPC</description>
+ <packaging>bundle</packaging>
+
+ <properties>
+ <osgi.import>
+ !org.apache.avro.grpc*,
+ org.apache.avro*;version="${project.version}",
+ io.grpc*
+ *
+ </osgi.import>
+
<osgi.export>org.apache.avro.grpc*;version="${project.version}"</osgi.export>
+ </properties>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/velocity</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>${project.version}</version>
+ <executions>
+ <execution>
+ <id>schemas</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ <goal>protocol</goal>
+ <goal>idl-protocol</goal>
+ </goals>
+ <configuration>
+ <stringType>String</stringType>
+
<testSourceDirectory>${project.basedir}/src/test/avro</testSourceDirectory>
+
<testOutputDirectory>${project.build.directory}/generated-test-sources/java</testOutputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${grpc.version}</version>
+ <!-- use netty only for tests as user can use grpc transports other than
Netty -->
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>avro</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>avro-ipc</artifactId>
+ <version>${project.version}</version>
+ <!-- exclude all dependencies from avro-ipc as only Callback class is
needed -->
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroGrpcClient.java
b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroGrpcClient.java
new file mode 100644
index 000000000..f0e7d22f6
--- /dev/null
+++ b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroGrpcClient.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.grpc;
+
+import com.google.common.base.Throwables;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.ipc.CallFuture;
+import org.apache.avro.ipc.Callback;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.MethodDescriptor;
+import io.grpc.stub.ClientCalls;
+import io.grpc.stub.StreamObserver;
+
+/** Component that sets up a gRPC client for Avro's IDL and Serialization. */
+public abstract class AvroGrpcClient {
+
+ private AvroGrpcClient() {
+ }
+
+ /**
+ * Creates a gRPC client for Avro's interface with default {@link
CallOptions}.
+ *
+ * @param channel the channel used for gRPC {@link ClientCalls}.
+ * @param iface Avro interface for which client is built.
+ * @param <T> type of Avro Interface.
+ * @return a new client proxy.
+ */
+ public static <T> T create(Channel channel, Class<T> iface) {
+ return create(channel, iface, CallOptions.DEFAULT);
+ }
+
+ /**
+ * Creates a gRPC client for Avro's interface with provided {@link
CallOptions}.
+ *
+ * @param channel the channel used for gRPC {@link ClientCalls}.
+ * @param iface Avro interface for which client is built.
+ * @param callOptions client call options for gRPC.
+ * @param <T> type of Avro Interface.
+ * @return a new client proxy.
+ */
+ public static <T> T create(Channel channel, Class<T> iface, CallOptions
+ callOptions) {
+ Protocol protocol = AvroGrpcUtils.getProtocol(iface);
+ ServiceDescriptor serviceDescriptor = ServiceDescriptor.create(iface);
+ ServiceInvocationHandler proxyHandler = new
ServiceInvocationHandler(channel, callOptions,
+ protocol, serviceDescriptor);
+ return (T) Proxy.newProxyInstance(iface.getClassLoader(), new
Class[]{iface}, proxyHandler);
+ }
+
+ private static class ServiceInvocationHandler implements InvocationHandler {
+ private final Channel channel;
+ private final CallOptions callOptions;
+ private final Protocol protocol;
+ private final ServiceDescriptor serviceDescriptor;
+
+ ServiceInvocationHandler(Channel channel, CallOptions callOptions, Protocol
+ protocol, ServiceDescriptor serviceDescriptor) {
+ this.channel = channel;
+ this.callOptions = callOptions;
+ this.protocol = protocol;
+ this.serviceDescriptor = serviceDescriptor;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws
Throwable {
+ try {
+ return invokeUnaryMethod(method, args);
+ } catch (Exception e) {
+ //throw any of the declared exceptions
+ for (Class<?> exceptionClass : method.getExceptionTypes()) {
+ Throwables.propagateIfInstanceOf(e, (Class<Exception>)
exceptionClass);
+ }
+ //also throw if any runtime exception
+ Throwables.propagateIfInstanceOf(e, RuntimeException.class);
+ //wrap all other exceptions
+ throw new AvroRemoteException(e);
+ }
+ }
+
+ private Object invokeUnaryMethod(Method method, Object[] args) throws
Exception {
+ Type[] parameterTypes = method.getParameterTypes();
+ if ((parameterTypes.length > 0) &&
+ (parameterTypes[parameterTypes.length - 1] instanceof Class) &&
+ Callback.class.isAssignableFrom(((Class<?>)
parameterTypes[parameterTypes.length - 1]))) {
+ // get the callback argument from the end
+ Object[] finalArgs = Arrays.copyOf(args, args.length - 1);
+ Callback<?> callback = (Callback<?>) args[args.length - 1];
+ unaryRequest(method.getName(), finalArgs, callback);
+ return null;
+ } else {
+ return unaryRequest(method.getName(), args);
+ }
+ }
+
+ private Object unaryRequest(String methodName, Object[] args) throws
Exception {
+ CallFuture<Object> callFuture = new CallFuture<>();
+ unaryRequest(methodName, args, callFuture);
+ try {
+ return callFuture.get();
+ } catch (Exception e) {
+ Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
+ throw new AvroRemoteException(e.getCause());
+ }
+ }
+
+ private <RespT> void unaryRequest(String methodName, Object[] args,
Callback<RespT> callback) throws
+ Exception {
+ StreamObserver<Object> observerAdpater = new
CallbackToResponseStreamObserverAdpater<>
+ (callback);
+
ClientCalls.asyncUnaryCall(channel.newCall(serviceDescriptor.getMethod(methodName,
+ MethodDescriptor.MethodType.UNARY), callOptions), args,
observerAdpater);
+ }
+
+ private static class CallbackToResponseStreamObserverAdpater<T> implements
StreamObserver<Object> {
+ private final Callback<T> callback;
+
+ CallbackToResponseStreamObserverAdpater(Callback<T> callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public void onNext(Object value) {
+ if (value instanceof Throwable) {
+ callback.handleError((Throwable) value);
+ } else {
+ callback.handleResult((T) value);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ callback.handleError(new AvroRuntimeException(t));
+ }
+
+ @Override
+ public void onCompleted() {
+ // do nothing as there is no equivalent in Callback.
+ }
+ }
+ }
+}
diff --git
a/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroGrpcServer.java
b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroGrpcServer.java
new file mode 100644
index 000000000..c18822ee4
--- /dev/null
+++ b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroGrpcServer.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.grpc;
+
+import com.google.common.base.Throwables;
+
+import org.apache.avro.Protocol;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerServiceDefinition;
+import io.grpc.stub.ServerCalls;
+import io.grpc.stub.StreamObserver;
+
+/** Provides components to set up a gRPC Server for Avro's IDL and
serialization. */
+public abstract class AvroGrpcServer {
+
+ private AvroGrpcServer() {
+ }
+
+ /**
+ * Creates a {@link ServerServiceDefinition} for Avro Interface and its
implementation that can
+ * be passed a gRPC Server.
+ *
+ * @param iface Avro generated RPC service interface for which service
defintion is created.
+ * @param impl Implementation of the service interface to be invoked for
requests.
+ * @return a new server service definition.
+ */
+ public static ServerServiceDefinition createServiceDefinition(Class iface,
Object impl) {
+ Protocol protocol = AvroGrpcUtils.getProtocol(iface);
+ ServiceDescriptor serviceDescriptor = ServiceDescriptor.create(iface);
+ ServerServiceDefinition.Builder serviceDefinitionBuilder =
ServerServiceDefinition.builder
+ (serviceDescriptor.getServiceName());
+ Map<String, Protocol.Message> messages = protocol.getMessages();
+ for (Method method : iface.getMethods()) {
+ Protocol.Message msg = messages.get(method.getName());
+ //setup a method handler only if corresponding message exists in avro
protocol.
+ if (msg != null) {
+ UnaryMethodHandler methodHandler = msg.isOneWay() ? new
OneWayUnaryMethodHandler(impl,
+ method) : new UnaryMethodHandler(impl, method);
+
serviceDefinitionBuilder.addMethod(serviceDescriptor.getMethod(method.getName(),
+ MethodDescriptor.MethodType.UNARY),
ServerCalls.asyncUnaryCall(methodHandler));
+ }
+ }
+ return serviceDefinitionBuilder.build();
+ }
+
+ private static class UnaryMethodHandler implements
ServerCalls.UnaryMethod<Object[], Object> {
+ private final Object serviceImpl;
+ private final Method method;
+
+ UnaryMethodHandler(Object serviceImpl, Method method) {
+ this.serviceImpl = serviceImpl;
+ this.method = method;
+ }
+
+ public void invoke(Object[] request, StreamObserver<Object>
responseObserver) {
+ Object methodResponse = null;
+ try {
+ methodResponse = method.invoke(getServiceImpl(), request);
+ } catch (InvocationTargetException e) {
+ methodResponse = e.getTargetException();
+ } catch (Exception e) {
+ methodResponse = e;
+ }
+ responseObserver.onNext(methodResponse);
+ responseObserver.onCompleted();
+ }
+
+ public Method getMethod() {
+ return method;
+ }
+
+ public Object getServiceImpl() {
+ return serviceImpl;
+ }
+ }
+
+ private static class OneWayUnaryMethodHandler extends UnaryMethodHandler {
+ private static final Logger LOG =
Logger.getLogger(OneWayUnaryMethodHandler.class.getName());
+
+ OneWayUnaryMethodHandler(Object serviceImpl, Method method) {
+ super(serviceImpl, method);
+ }
+
+ @Override
+ public void invoke(Object[] request, StreamObserver<Object>
responseObserver) {
+ //first respond back with a fixed void response in order for call to be
complete
+ responseObserver.onNext(null);
+ responseObserver.onCompleted();
+ //process the rpc request
+ try {
+ getMethod().invoke(getServiceImpl(), request);
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Error processing one-way rpc",
Throwables.getRootCause(e));
+ }
+ }
+ }
+}
diff --git
a/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroGrpcUtils.java
b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroGrpcUtils.java
new file mode 100644
index 000000000..f3ab07bc7
--- /dev/null
+++ b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroGrpcUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.grpc;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+
+import java.io.InputStream;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import io.grpc.KnownLength;
+
+/** Utility methods for using Avro IDL and serialization with gRPC. */
+public final class AvroGrpcUtils {
+ private static Logger log = Logger.getLogger(AvroGrpcUtils.class.getName());
+
+ private AvroGrpcUtils() {
+ }
+
+ /**
+ * Provides a a unique gRPC service name for Avro RPC interface or its
subclass Callback
+ * Interface.
+ *
+ * @param iface Avro RPC interface.
+ * @return unique service name for gRPC.
+ */
+ public static String getServiceName(Class iface) {
+ Protocol protocol = getProtocol(iface);
+ return protocol.getNamespace() + "." + protocol.getName();
+ }
+
+ /**
+ * Gets the {@link Protocol} from the Avro Interface.
+ */
+ public static Protocol getProtocol(Class iface) {
+ try {
+ Protocol p = (Protocol) (iface.getDeclaredField("PROTOCOL").get(null));
+ return p;
+ } catch (NoSuchFieldException e) {
+ throw new AvroRuntimeException("Not a Specific protocol: " + iface);
+ } catch (IllegalAccessException e) {
+ throw new AvroRuntimeException(e);
+ }
+ }
+
+ /**
+ * Skips any unread bytes from InputStream and closes it.
+ */
+ static void skipAndCloseQuietly(InputStream stream) {
+ try {
+ if (stream instanceof KnownLength && stream.available() > 0) {
+ stream.skip(stream.available());
+ } else {
+ //don't expect this for an inputStream provided by gRPC but just to be
on safe side.
+ byte[] skipBuffer = new byte[4096];
+ while (true) {
+ int read = stream.read(skipBuffer);
+ if (read < skipBuffer.length) {
+ break;
+ }
+ }
+ }
+ stream.close();
+ } catch (Exception e) {
+ log.log(Level.WARNING, "failed to skip/close the input stream, may cause
memory leak", e);
+ }
+ }
+}
diff --git
a/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroInputStream.java
b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroInputStream.java
new file mode 100644
index 000000000..79793ee6c
--- /dev/null
+++ b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroInputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.grpc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import io.grpc.Drainable;
+
+/** An {@link InputStream} backed by Avro RPC request/response that can
drained to
+ * a{@link OutputStream}. */
+@NotThreadSafe
+public abstract class AvroInputStream extends InputStream implements Drainable
{
+ /**
+ * Container to hold the serialized Avro payload when its read before
draining it.
+ */
+ @Nullable
+ private ByteArrayInputStream partial;
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return getPartialInternal().read(b, off, len);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return getPartialInternal().read();
+ }
+
+ private ByteArrayInputStream getPartialInternal() throws IOException {
+ if (partial == null) {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ drainTo(outputStream);
+ partial = new ByteArrayInputStream(outputStream.toByteArray());
+ }
+ return partial;
+ }
+
+ protected ByteArrayInputStream getPartial() {
+ return partial;
+ }
+
+ /**
+ * An {@link OutputStream} that writes to a target {@link OutputStream} and
provides total
+ * number of bytes written to it.
+ */
+ protected class CountingOutputStream extends OutputStream {
+ private final OutputStream target;
+ private int writtenCount = 0;
+
+ public CountingOutputStream(OutputStream target) {
+ this.target = target;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ target.write(b, off, len);
+ writtenCount += len;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ target.write(b);
+ writtenCount += 1;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ target.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ target.close();
+ }
+
+ public int getWrittenCount() {
+ return writtenCount;
+ }
+ }
+}
diff --git
a/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroRequestMarshaller.java
b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroRequestMarshaller.java
new file mode 100644
index 000000000..618e2af5a
--- /dev/null
+++
b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroRequestMarshaller.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.grpc;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.annotation.Nullable;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.internal.IoUtils;
+
+/** Marshaller for Avro RPC request. */
+public class AvroRequestMarshaller implements
MethodDescriptor.Marshaller<Object[]> {
+ private static final EncoderFactory ENCODER_FACTORY = new EncoderFactory();
+ private static final DecoderFactory DECODER_FACTORY = new DecoderFactory();
+ private final Protocol.Message message;
+
+ public AvroRequestMarshaller(Protocol.Message message) {
+ this.message = message;
+ }
+
+ @Override
+ public InputStream stream(Object[] value) {
+ return new AvroRequestInputStream(value, message);
+ }
+
+ @Override
+ public Object[] parse(InputStream stream) {
+ try {
+ BinaryDecoder in = DECODER_FACTORY.binaryDecoder(stream, null);
+ Schema reqSchema = message.getRequest();
+ GenericRecord request = (GenericRecord) new
SpecificDatumReader<>(reqSchema).read(null, in);
+ Object[] args = new Object[reqSchema.getFields().size()];
+ int i = 0;
+ for (Schema.Field field : reqSchema.getFields()) {
+ args[i++] = request.get(field.name());
+ }
+ return args;
+ } catch (IOException e) {
+ throw Status.INTERNAL.withCause(e).
+ withDescription("Error deserializing avro request
arguments").asRuntimeException();
+ } finally {
+ AvroGrpcUtils.skipAndCloseQuietly(stream);
+ }
+ }
+
+ private class AvroRequestInputStream extends AvroInputStream {
+ private final Protocol.Message message;
+ @Nullable
+ private Object[] args;
+
+ AvroRequestInputStream(@Nullable Object[] args, Protocol.Message message) {
+ this.args = args;
+ this.message = message;
+ }
+
+ @Override
+ public int drainTo(OutputStream target) throws IOException {
+ int written;
+ if (getPartial() != null) {
+ written = (int) IoUtils.copy(getPartial(), target);
+ } else {
+ Schema reqSchema = message.getRequest();
+ CountingOutputStream outputStream = new CountingOutputStream(target);
+ BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(outputStream, null);
+ int i = 0;
+ for (Schema.Field param : reqSchema.getFields()) {
+ new SpecificDatumWriter<>(param.schema()).write(args[i++], out);
+ }
+ out.flush();
+ args = null;
+ written = outputStream.getWrittenCount();
+ }
+ return written;
+ }
+ }
+}
diff --git
a/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroResponseMarshaller.java
b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroResponseMarshaller.java
new file mode 100644
index 000000000..f1f9b0145
--- /dev/null
+++
b/lang/java/grpc/src/main/java/org/apache/avro/grpc/AvroResponseMarshaller.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.grpc;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Protocol;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.annotation.Nullable;
+
+import io.grpc.MethodDescriptor;
+import io.grpc.Status;
+import io.grpc.internal.IoUtils;
+
+/** Marshaller for Avro RPC response. */
+public class AvroResponseMarshaller implements
MethodDescriptor.Marshaller<Object> {
+ private static final EncoderFactory ENCODER_FACTORY = new EncoderFactory();
+ private static final DecoderFactory DECODER_FACTORY = new DecoderFactory();
+ private final Protocol.Message message;
+
+ public AvroResponseMarshaller(Protocol.Message message) {
+ this.message = message;
+ }
+
+ @Override
+ public InputStream stream(Object value) {
+ return new AvroResponseInputStream(value, message);
+ }
+
+ @Override
+ public Object parse(InputStream stream) {
+ try {
+ if (message.isOneWay()) return null;
+ BinaryDecoder in = DECODER_FACTORY.binaryDecoder(stream, null);
+ if (!in.readBoolean()) {
+ Object response = new
SpecificDatumReader(message.getResponse()).read(null, in);
+ return response;
+ } else {
+ Object value = new SpecificDatumReader(message.getErrors()).read(null,
in);
+ if (value instanceof Exception) {
+ return value;
+ }
+ return new AvroRuntimeException(value.toString());
+ }
+ } catch (IOException e) {
+ throw Status.INTERNAL.withCause(e).
+ withDescription("Error deserializing avro
response").asRuntimeException();
+ } finally {
+ AvroGrpcUtils.skipAndCloseQuietly(stream);
+ }
+ }
+
+ private class AvroResponseInputStream extends AvroInputStream {
+ private final Protocol.Message message;
+ @Nullable
+ private Object response;
+
+ AvroResponseInputStream(@Nullable Object response, Protocol.Message
message) {
+ this.response = response;
+ this.message = message;
+ }
+
+ @Override
+ public int drainTo(OutputStream target) throws IOException {
+ int written;
+ if (getPartial() != null) {
+ written = (int) IoUtils.copy(getPartial(), target);
+ } else {
+ written = writeResponse(target);
+ }
+ return written;
+ }
+
+ private int writeResponse(OutputStream target) throws IOException {
+ int written;
+ if (message.isOneWay()) {
+ written = 0;
+ } else if (response instanceof Exception) {
+ ByteArrayOutputStream bao = new ByteArrayOutputStream();
+ BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bao, null);
+ try {
+ out.writeBoolean(true);
+ new SpecificDatumWriter(message.getErrors()).write(response, out);
+ } catch (Exception e) {
+ bao = new ByteArrayOutputStream();
+ out = ENCODER_FACTORY.binaryEncoder(bao, null);
+ out.writeBoolean(true);
+ new SpecificDatumWriter(Protocol.SYSTEM_ERRORS).write(new
Utf8(e.toString()), out);
+ }
+ out.flush();
+ byte[] serializedError = bao.toByteArray();
+ target.write(serializedError);
+ written = serializedError.length;
+ } else {
+ CountingOutputStream outputStream = new CountingOutputStream(target);
+ BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(outputStream, null);
+ out.writeBoolean(false);
+ new SpecificDatumWriter(message.getResponse()).write(response, out);
+ out.flush();
+ written = outputStream.getWrittenCount();
+ }
+ response = null;
+ return written;
+ }
+ }
+}
diff --git
a/lang/java/grpc/src/main/java/org/apache/avro/grpc/ServiceDescriptor.java
b/lang/java/grpc/src/main/java/org/apache/avro/grpc/ServiceDescriptor.java
new file mode 100644
index 000000000..d9fa97ab6
--- /dev/null
+++ b/lang/java/grpc/src/main/java/org/apache/avro/grpc/ServiceDescriptor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.grpc;
+
+import org.apache.avro.Protocol;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import io.grpc.MethodDescriptor;
+
+import static io.grpc.MethodDescriptor.generateFullMethodName;
+
+/** Descriptor for a gRPC service based on a Avro interface. */
+class ServiceDescriptor {
+
+ // cache for service descriptors.
+ private static final ConcurrentMap<String, ServiceDescriptor>
SERVICE_DESCRIPTORS =
+ new ConcurrentHashMap<>();
+ private final Class iface;
+ private final String serviceName;
+ private final Protocol protocol;
+ // cache for method descriptors.
+ private final ConcurrentMap<String, MethodDescriptor<Object[], Object>>
methods =
+ new ConcurrentHashMap<>();
+
+ private ServiceDescriptor(Class iface, String serviceName) {
+ this.iface = iface;
+ this.serviceName = serviceName;
+ this.protocol = AvroGrpcUtils.getProtocol(iface);
+ }
+
+ /**
+ * Creates a Service Descriptor.
+ *
+ * @param iface Avro RPC interface.
+ */
+ public static ServiceDescriptor create(Class iface) {
+ String serviceName = AvroGrpcUtils.getServiceName(iface);
+ return SERVICE_DESCRIPTORS.computeIfAbsent(serviceName, key -> new
ServiceDescriptor
+ (iface, serviceName));
+ }
+
+ /**
+ * provides name of the service.
+ */
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ /**
+ * Provides a gRPC {@link MethodDescriptor} for a RPC method/message of Avro
{@link Protocol}.
+ *
+ * @param methodType gRPC type for the method.
+ * @return a {@link MethodDescriptor}
+ */
+ public MethodDescriptor<Object[], Object> getMethod(String methodName,
MethodDescriptor
+ .MethodType methodType) {
+ return methods.computeIfAbsent(methodName,
+ key -> MethodDescriptor.<Object[], Object>newBuilder()
+ .setFullMethodName(generateFullMethodName(serviceName, methodName))
+ .setType(methodType)
+ .setRequestMarshaller(new
AvroRequestMarshaller(protocol.getMessages().get(methodName)))
+ .setResponseMarshaller(
+ new
AvroResponseMarshaller(protocol.getMessages().get(methodName)))
+ .build());
+ }
+}
diff --git a/lang/java/grpc/src/test/avro/TestService.avdl
b/lang/java/grpc/src/test/avro/TestService.avdl
new file mode 100644
index 000000000..8a18db080
--- /dev/null
+++ b/lang/java/grpc/src/test/avro/TestService.avdl
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** An example protocol in Avro IDL */
+@namespace("org.apache.avro.grpc.test")
+protocol TestService {
+
+ enum Kind {
+ FOO,
+ BAR,
+ BAZ
+ }
+
+ fixed MD5(4);
+
+ record TestRecord {
+ @order("ignore")
+ string name;
+
+ @order("descending")
+ Kind kind;
+
+ MD5 hash;
+
+ union { MD5, null} @aliases(["hash"]) nullableHash;
+
+ array<long> arrayOfLongs;
+ }
+
+ error TestError {
+ string message;
+ }
+
+ TestRecord echo(TestRecord `record`);
+ int add(int arg1, int arg2, int arg3);
+ void `error`(boolean declared) throws TestError;
+ void ping() oneway;
+ union {null, string} concatenate(string val1, boolean val2, long val3, int
val4);
+}
diff --git
a/lang/java/grpc/src/test/java/org/apache/avro/grpc/TestAvroMarshaller.java
b/lang/java/grpc/src/test/java/org/apache/avro/grpc/TestAvroMarshaller.java
new file mode 100644
index 000000000..2efb42f01
--- /dev/null
+++ b/lang/java/grpc/src/test/java/org/apache/avro/grpc/TestAvroMarshaller.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.grpc;
+
+import org.apache.avro.Protocol;
+import org.apache.avro.grpc.test.Kind;
+import org.apache.avro.grpc.test.MD5;
+import org.apache.avro.grpc.test.TestRecord;
+import org.apache.avro.grpc.test.TestService;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Random;
+
+import io.grpc.Drainable;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestAvroMarshaller {
+ private final TestRecord record =
TestRecord.newBuilder().setName("foo").setKind(Kind.FOO)
+ .setArrayOfLongs(Arrays.asList(42L, 424L, 4242L)).setHash(new MD5(new
byte[]{4, 2, 4, 2}))
+ .setNullableHash(null).build();
+ private final Protocol.Message message =
TestService.PROTOCOL.getMessages().get("echo");
+ private Random random = new Random();
+
+ private void readPratialAndDrain(int partialToRead, InputStream inputStream,
OutputStream target)
+ throws IOException {
+ // read specified partial bytes from request InputStream to target and
then drain the rest.
+ for (int i = 0; i < partialToRead; i++) {
+ int readByte = inputStream.read();
+ if (readByte >= 0) {
+ target.write(readByte);
+ } else {
+ break;
+ }
+ }
+ Drainable drainableRequest = (Drainable) inputStream;
+ drainableRequest.drainTo(target);
+ }
+
+ @Test
+ public void testAvroRequestReadPartialAndDrain() throws IOException {
+ AvroRequestMarshaller requestMarshaller = new
AvroRequestMarshaller(message);
+ InputStream requestInputStream = requestMarshaller.stream(new
Object[]{record});
+ ByteArrayOutputStream requestOutputStream = new ByteArrayOutputStream();
+ readPratialAndDrain(random.nextInt(7) + 1, requestInputStream,
requestOutputStream);
+ InputStream serialized = new
ByteArrayInputStream(requestOutputStream.toByteArray());
+ Object[] parsedArgs = requestMarshaller.parse(serialized);
+ assertEquals(1, parsedArgs.length);
+ assertEquals(record, parsedArgs[0]);
+ }
+
+ @Test
+ public void testAvroResponseReadPartialAndDrain() throws IOException {
+ AvroResponseMarshaller responseMarshaller = new
AvroResponseMarshaller(message);
+ InputStream responseInputStream = responseMarshaller.stream(record);
+ ByteArrayOutputStream responseOutputStream = new ByteArrayOutputStream();
+ readPratialAndDrain(random.nextInt(7) + 1, responseInputStream,
responseOutputStream);
+ InputStream serialized = new
ByteArrayInputStream(responseOutputStream.toByteArray());
+ Object parsedResponse = responseMarshaller.parse(serialized);
+ assertEquals(record, parsedResponse);
+ }
+}
diff --git
a/lang/java/grpc/src/test/java/org/apache/avro/grpc/TestAvroProtocolGrpc.java
b/lang/java/grpc/src/test/java/org/apache/avro/grpc/TestAvroProtocolGrpc.java
new file mode 100644
index 000000000..d11aa3d9b
--- /dev/null
+++
b/lang/java/grpc/src/test/java/org/apache/avro/grpc/TestAvroProtocolGrpc.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.grpc;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.grpc.test.Kind;
+import org.apache.avro.grpc.test.MD5;
+import org.apache.avro.grpc.test.TestError;
+import org.apache.avro.grpc.test.TestRecord;
+import org.apache.avro.grpc.test.TestService;
+import org.apache.avro.ipc.CallFuture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Server;
+import io.grpc.ServerBuilder;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestAvroProtocolGrpc {
+ private final TestRecord record =
TestRecord.newBuilder().setName("foo").setKind(Kind.FOO)
+ .setArrayOfLongs(Arrays.asList(42L, 424L, 4242L)).setHash(new MD5(new
byte[]{4, 2, 4, 2}))
+ .setNullableHash(null).build();
+ private final String declaredErrMsg = "Declared error";
+ private final String undeclaredErrMsg = "Undeclared error";
+ private final TestError declaredError =
TestError.newBuilder().setMessage$(declaredErrMsg).build();
+ private final RuntimeException undeclaredError = new
RuntimeException(undeclaredErrMsg);
+ private CountDownLatch oneWayStart;
+ private CountDownLatch oneWayDone;
+ private AtomicInteger oneWayCount;
+ private TestService stub;
+ private TestService.Callback callbackStub;
+ private Server server;
+ private ManagedChannel channel;
+
+ @Before
+ public void setUp() throws IOException {
+ TestService serviceImpl = new TestServiceImplBase();
+ setUpServerAndClient(serviceImpl);
+ }
+
+ private void setUpServerAndClient(TestService serviceImpl) throws
IOException {
+ if (server != null && !server.isShutdown()) {
+ server.shutdown();
+ }
+ if (channel != null && !channel.isShutdown()) {
+ channel.shutdownNow();
+ }
+ server = ServerBuilder.forPort(0)
+ .addService(AvroGrpcServer.createServiceDefinition(TestService.class,
serviceImpl))
+ .build();
+ server.start();
+ int port = server.getPort();
+ channel = ManagedChannelBuilder.forAddress("localhost",
port).usePlaintext(true).build();
+ stub = AvroGrpcClient.create(channel, TestService.class);
+ callbackStub = AvroGrpcClient.create(channel, TestService.Callback.class);
+ }
+
+ @After
+ public void cleanUp() {
+ channel.shutdownNow();
+ server.shutdownNow();
+ }
+
+ @Test
+ public void testEchoRecord() throws Exception {
+ TestRecord echoedRecord = stub.echo(record);
+ assertEquals(record, echoedRecord);
+ }
+
+ @Test
+ public void testMultipleArgsAdd() throws Exception {
+ int result = stub.add(3, 5, 2);
+ assertEquals(10, result);
+ }
+
+ @Test
+ public void testMultipleArgsConcatenate() throws Exception {
+ String val1 = "foo-bar";
+ Boolean val2 = true;
+ long val3 = 123321L;
+ int val4 = 42;
+ String result = stub.concatenate(val1, val2, val3, val4);
+ assertEquals(val1 + val2 + val3 + val4, result);
+ }
+
+ @Test
+ public void testCallbackInterface() throws Exception {
+ CallFuture<TestRecord> future = new CallFuture<TestRecord>();
+ callbackStub.echo(record, future);
+ assertEquals(record, future.get(1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testOneWayRpc() throws Exception {
+ oneWayStart = new CountDownLatch(1);
+ oneWayDone = new CountDownLatch(3);
+ oneWayCount = new AtomicInteger();
+ stub.ping();
+ stub.ping();
+ //client is not stalled while server is waiting for processing requests
+ assertEquals(0, oneWayCount.get());
+ oneWayStart.countDown();
+ stub.ping();
+ oneWayDone.await(1, TimeUnit.SECONDS);
+ assertEquals(3, oneWayCount.get());
+ }
+
+ @Test
+ public void testDeclaredError() throws Exception {
+ try {
+ stub.error(true);
+ fail("Expected exception but none thrown");
+ } catch (TestError te) {
+ assertEquals(declaredErrMsg, te.getMessage$());
+ }
+ }
+
+ @Test
+ public void testUndeclaredError() throws Exception {
+ try {
+ stub.error(false);
+ fail("Expected exception but none thrown");
+ } catch (AvroRuntimeException e) {
+ assertTrue(e.getMessage().contains(undeclaredErrMsg));
+ }
+ }
+
+ @Test
+ public void testNullableResponse() throws Exception {
+ setUpServerAndClient(new TestServiceImplBase() {
+ @Override
+ public String concatenate(String val1, boolean val2, long val3, int val4)
+ throws AvroRemoteException {
+ return null;
+ }
+ });
+ String response = stub.concatenate("foo", true, 42L, 42);
+ assertEquals(null, response);
+ }
+
+ @Test(expected = AvroRuntimeException.class)
+ public void testGrpcConnectionError() throws Exception {
+ //close the channel and initiate request
+ channel.shutdownNow();
+ stub.add(0, 1, 2);
+ }
+
+ @Test
+ public void testRepeatedRequests() throws Exception {
+ TestRecord[] echoedRecords = new TestRecord[5];
+ // validate results after all requests are done
+ for (int i = 0; i < 5; i++) {
+ echoedRecords[i] = stub.echo(record);
+ }
+ for (TestRecord result : echoedRecords) {
+ assertEquals(record, result);
+ }
+ }
+
+ @Test
+ public void testConcurrentClientAccess() throws Exception {
+ ExecutorService es = Executors.newCachedThreadPool();
+ Future<TestRecord>[] records = new Future[5];
+ Future<Integer>[] adds = new Future[5];
+ //submit requests in parallel
+ for (int i = 0; i < 5; i++) {
+ records[i] = es.submit(() -> stub.echo(record));
+ int j = i;
+ adds[i] = es.submit(() -> stub.add(j, 2 * j, 3 * j));
+ }
+ //validate all results
+ for (int i = 0; i < 5; i++) {
+ assertEquals(record, records[i].get());
+ assertEquals(6 * i, (long) adds[i].get());
+ }
+ }
+
+ @Test
+ public void testConcurrentChannels() throws Exception {
+ ManagedChannel otherChannel =
ManagedChannelBuilder.forAddress("localhost", server.getPort())
+ .usePlaintext(true).build();
+ TestService otherStub = AvroGrpcClient.create(otherChannel,
TestService.class);
+ Future<Integer>[] adds = new Future[5];
+ Future<Integer>[] otherAdds = new Future[5];
+ ExecutorService es = Executors.newCachedThreadPool();
+ //submit requests on clients with different channels
+ for (int i = 0; i < 5; i++) {
+ int j = i;
+ adds[i] = es.submit(() -> stub.add(j, j - 1, j - 2));
+ otherAdds[i] = es.submit(() -> otherStub.add(j, j + 1, j + 2));
+ }
+ //validate all results
+ for (int i = 0; i < 5; i++) {
+ assertEquals((3 * i) - 3, (long) adds[i].get());
+ assertEquals((3 * i) + 3, (long) otherAdds[i].get());
+ }
+ otherChannel.shutdownNow();
+ }
+
+ private class TestServiceImplBase implements TestService {
+ @Override
+ public TestRecord echo(TestRecord record) throws AvroRemoteException {
+ return record;
+ }
+
+ @Override
+ public int add(int arg1, int arg2, int arg3) throws AvroRemoteException {
+ return arg1 + arg2 + arg3;
+ }
+
+ @Override
+ public Void error(boolean declared) throws AvroRemoteException, TestError {
+ if (declared) {
+ throw declaredError;
+ }
+ throw undeclaredError;
+ }
+
+ @Override
+ public void ping() {
+ try {
+ oneWayStart.await();
+ oneWayCount.incrementAndGet();
+ oneWayDone.countDown();
+ } catch (InterruptedException e) {
+ fail("thread interrupted when waiting for all one-way messages");
+ }
+ }
+
+ @Override
+ public String concatenate(String val1, boolean val2, long val3, int val4)
throws AvroRemoteException {
+ return val1 + val2 + val3 + val4;
+ }
+ }
+}
diff --git a/lang/java/pom.xml b/lang/java/pom.xml
index 2d1df1f1c..78abcdcbf 100644
--- a/lang/java/pom.xml
+++ b/lang/java/pom.xml
@@ -61,6 +61,7 @@
<easymock.version>3.4</easymock.version>
<hamcrest.version>1.3</hamcrest.version>
<joda.version>2.9.7</joda.version>
+ <grpc.version>1.7.0</grpc.version>
<!-- This Guava version should match Hadoop's Guava version. See
AVRO-1781. -->
<guava.version>11.0.2</guava.version>
<findbugs-annotations.version>1.3.9-1</findbugs-annotations.version>
@@ -94,6 +95,7 @@
<module>protobuf</module>
<module>thrift</module>
<module>archetypes</module>
+ <module>grpc</module>
</modules>
<build>
@@ -464,6 +466,16 @@
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Avro over gRPC (Java Implementation)
> ------------------------------------
>
> Key: AVRO-2172
> URL: https://issues.apache.org/jira/browse/AVRO-2172
> Project: Avro
> Issue Type: New Feature
> Components: java
> Reporter: Srujan Narkedamalli
> Priority: Major
>
> We (wavefront/VMware) have implemented a java library/module for using RPC
> defined using Avro (.avdl/.avpr) over gRPC, and would like to contribute this
> to Apache Avro. gRPC provides ability to build streaming RPC and also in
> terms of Java implementation its built on recent version of Netty (4.x).
> Overview of our Avro-gRPC Java Implementation:
> gRPC by default provides support for Protobuf IDL and the APIs are tuned
> towards it. Following are main differences in gRPC API/Protobuf and Avro's
> RPC IDL that our library bridges:
> * Protobuf/gRPC supports only single argument for RPC request vs Avro's
> multiple arguments. This is handled in serialization logic.
> * Protobuf/gRPC does not support throwing typed exceptions over wire
> natively. Avro’s typed RPC exceptions are handled in the serialization logic.
> * Protobuf/gRPC does not support one-way RPC. We don’t avoid doing
> round-trip to server but respond back with null response as soon as request
> is received at server, and then invoke the server implementation.
> * gRPC provides RPC code generation for Protobuf. For Avro, we use the
> current minimal code generation, and provide Client and Server Invocation
> handlers.
>
> Can we have this as a new artifact/maven child project in Avro Java project
> with a name something like `avro-grpc` ?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)