[ https://issues.apache.org/jira/browse/FLINK-22584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Igal Shilman updated FLINK-22584: --------------------------------- Fix Version/s: (was: statefun-3.1.0) > Use protobuf-shaded in StateFun core. > ------------------------------------- > > Key: FLINK-22584 > URL: https://issues.apache.org/jira/browse/FLINK-22584 > Project: Flink > Issue Type: Improvement > Components: Stateful Functions > Reporter: Igal Shilman > Priority: Minor > Labels: auto-deprioritized-major, developer-experience > > We have *statefun-protobuf-shaded* module, that was introduced for the remote > Java sdk. > we can use it to shade protobuf internally, to reduce the dependency surface. > The major hurdle we need to overcome is that, in embedded functions, we have > to be able to accept instances of protobuf generated messages by the user. > For example: > {code:java} > UserProfile userProfile = UserProfile.newBilder().build(); > context.send(..., userProfile) {code} > If we will simply use the shaded Protobuf version, we will get immediately a > class cast exception. > One way to overcome this is to use reflection and find the well known methods > on the generated classes and call toBytes() / parseFrom() reflectively. > This however will cause a significant slow down, even by using MethodHandles. > A small experiment that I've previously done with ByteBuddy mitigates this, > by generating > accessors, in pre-flight: > {code:java} > package org.apache.flink.statefun.flink.common.protobuf.serde; > import static net.bytebuddy.matcher.ElementMatchers.named;import > java.io.InputStream; > import java.io.OutputStream; > import java.lang.reflect.InvocationTargetException; > import java.lang.reflect.Method; > import net.bytebuddy.ByteBuddy; > import net.bytebuddy.dynamic.DynamicType; > import net.bytebuddy.implementation.FixedValue; > import net.bytebuddy.implementation.MethodCall; > import net.bytebuddy.implementation.bytecode.assign.Assigner;final class > ReflectiveProtobufSerde { @SuppressWarnings({"unchecked", "rawtypes"}) > static <M> ProtobufSerde<M> ofProtobufGeneratedType(Class<M> type) { > try { > DynamicType.Unloaded<ProtobufSerde> unloaded = > configureByteBuddy(type); Class<? extends ProtobufSerde> writer = > unloaded.load(type.getClassLoader()).getLoaded(); return > (ProtobufSerde<M>) writer.getDeclaredConstructor().newInstance(); > } catch (Throwable e) { > throw new IllegalArgumentException(); > } > } @SuppressWarnings("rawtypes") > private static DynamicType.Unloaded<ProtobufSerde> > configureByteBuddy(Class<?> type) > throws NoSuchMethodException, InvocationTargetException, > IllegalAccessException { > Method writeToMethod = type.getMethod("writeTo", OutputStream.class); > Method parseFromMethod = type.getMethod("parseFrom", InputStream.class); > Method getSerializedSizeMethod = type.getMethod("getSerializedSize"); > // get the message full name > Method getDescriptorMethod = type.getMethod("getDescriptor"); > Object descriptor = getDescriptorMethod.invoke(null); > Method getFullNameMethod = descriptor.getClass().getMethod("getFullName"); > String messageFullName = (String) getFullNameMethod.invoke(descriptor); > return new ByteBuddy() > .subclass(ProtobufSerde.class) > .typeVariable("M", type) > .method(named("writeTo")) > .intercept( > MethodCall.invoke(writeToMethod) > .onArgument(0) > .withArgument(1) > .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC)) > .method(named("parseFrom")) > .intercept(MethodCall.invoke(parseFromMethod).withArgument(0)) > .method(named("getSerializedSize")) > .intercept( > MethodCall.invoke(getSerializedSizeMethod) > .onArgument(0) > .withAssigner(Assigner.DEFAULT, Assigner.Typing.DYNAMIC)) > .method(named("getMessageFullName")) > .intercept(FixedValue.value(messageFullName)) > .make(); > } > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)