在使用kafka自定义序列化时会导致对应的class无法加载的问题。通过分析,现有代码在使用AppClassLoader类加载器先加载了KafkaSerializerWrapper时,用户提交任务自己编写的类是无法通过AppClassLoader加载的,通过FlinkUserCodeClassLoader加载的话需要KafkaSerializerWrapper也是通过FlinkUserCodeClassLoader加载。 public void open(InitializationContext context) throws Exception { final ClassLoader userCodeClassLoader = context.getUserCodeClassLoader().asClassLoader(); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userCodeClassLoader)) { serializer = InstantiationUtil.instantiate( serializerClass.getName(), Serializer.class, getClass().getClassLoader()); // ?? 似乎应该如此 Thread.currentThread().getContextClassLoader()
if (serializer instanceof Configurable) { ((Configurable) serializer).configure(config); } else { serializer.configure(config, isKey); } } catch (Exception e) { throw new IOException("Failed to instantiate the serializer of class " + serializer, e); } }