I did not find this to be true. Here is my code snippet. object DruidStreamJob extends Job with SinkFn {
private[flink] val druidConfig = DruidConfig.current private[flink] val decryptMap = ExecutionEnv.loadDecryptionDictionary // //TODO: Add this to sbt jvm, this should be set in sbt fork jvm. This is hack. // System.setProperty("java.library.path", "/Users/vsiravar/workspace/aipcryptoclient/lib") // // import java.lang.reflect.Field // // val fieldSysPath: Field = classOf[ClassLoader].getDeclaredField("sys_paths") // fieldSysPath.setAccessible(true) // fieldSysPath.set(null, null) // // print(System.getProperty("java.library.path")) private[flink] val aipSimpleAPIEncryptor = new AipCryptoClient( ExecutionEnv.mockEncryption, ExecutionEnv.enableEncryption, ExecutionEnv .loadEncryptionSet) aipSimpleAPIEncryptor.init("aip_crypto_config.properties") val appLogger: Logger = LoggerFactory.getLogger(DruidStreamJob.getClass) val errorLogger: Logger = LoggerFactory.getLogger("streaming.error") * private[flink] val sdsClient = SDSEncryptor(decryptMap, ExecutionEnv.mockDecryption)* sdsClient.init() /** * Start streaming job execution . * * @param argMap */ private[flink] def runJob(argMap: Map[String, String]): Unit = { val env = ExecutionEnv.executionEnv(argMap) this.source = ExecutionEnv.sourceTopics env.enableCheckpointing(1000) env.setStateBackend(new FsStateBackend("s3://vishwas.test1/checkpoints")) sourceAndSinkFn(env, source) env.execute(jobName = name) } /** * @inheritdoc * @param env * @param topics */ override private[flink] def sourceAndSinkFn( env: StreamExecutionEnvironment, topics: List[String]) = { val dataStream = addSource(env) log.info("Subscribed to topics" + topics) val filteredStream = dataStream.filter(new FilterFunction[GenericRecord] { override def filter(value: GenericRecord): Boolean = { ExecutionEnv.messageTypeList.contains(value.get("CMLS_REQST_MSG_TYP").toString) & ExecutionEnv .pcrList.contains(value.get("CMLS_DEST_PCR").toString) } }) val result = filteredStream.map(record => encryptWithAipCryptoClient(addTimeStamp(sdsClient .decrypt(applyValues(record))))) result.print() KafkaSink(result).sendToKafka } private[flink] def encryptWithAipCryptoClient(maptoEncrypt: mutable.Map[String, Any]): mutable.Map[String, Any] = { aipSimpleAPIEncryptor.encrypt(maptoEncrypt.asInstanceOf[mutable.Map[String, AnyRef]].asJava) maptoEncrypt } private[flink] def applyValues( genericRecord: GenericRecord): mutable.Map[String, Any] = { collection.mutable.Map(genericRecord.getSchema.getFields.asScala .map(field => field.schema().getType match { case Schema.Type.LONG => field.name() -> genericRecord.get(field.name()).asInstanceOf[Long] case Schema.Type.INT => field.name() -> genericRecord.get(field.name()).asInstanceOf[Int] case Schema.Type.DOUBLE => field.name() -> genericRecord.get(field.name()).asInstanceOf[Double] case Schema.Type.STRING => field.name() -> genericRecord.get(field.name()).toString case _ => field.name() -> genericRecord.get(field.name()).toString }): _*) } private[flink] def addTimeStamp(payload: mutable.Map[String, Any]): mutable.Map[String, Any] = { try { if (!payload("CMLS_CPD_ORIG_DT").equals("19000101")) { return payload + ("timestamp" -> TimeUtility.convertDateStringToLong(payload("CMLS_CPD_ORIG_DT").asInstanceOf[String], payload("CMLS_AUTH_TIME").asInstanceOf[Int])); } return payload + ("timestamp" -> System.currentTimeMillis()) } catch { case e: Exception => { errorLogger.error("Unable to obtain epoch time, using currentSystem time" + e.printStackTrace()) return payload + ("timestamp" -> System.currentTimeMillis()) } } } } The code for initialization of sds client(font is green for that piece of code) is in the main thread, even before the job graph is created. However when I run this code on a cluster with 3 task managers on different nodes, it is initialized each time on all the 3 nodes(taskmanager). I wonder why this happens. Thanks, Vishwas On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson <snel...@sourceallies.com> wrote: > @transient or use a static factory. > > In Scala we use a @transient lazy val with an initializer to do this > > Sent from my iPhone > > On Aug 15, 2019, at 11:40 AM, Vishwas Siravara <vsirav...@gmail.com> > wrote: > > Thanks Steven. Is there a way where in I can create a singleton instance > in each task manager instead of serializing this object ? > > Thanks, > Vishwas > > On Thu, Aug 15, 2019 at 11:28 AM Steven Nelson <snel...@sourceallies.com> > wrote: > >> The encryptor will be serialized and sent with the rest of your Job Graph >> when the job is submitted. If it’s not serializable you get an error. >> >> Sent from my iPhone >> >> > On Aug 15, 2019, at 11:00 AM, Vishwas Siravara <vsirav...@gmail.com> >> wrote: >> > >> > Hi guys, >> > I have a map job where I want to encrypt certain keys . I initialize >> the encryptor in the main method and apply it in the map function. How is >> this encryptor shared when I have my job running on multiple task managers >> with parallelism > 1 ? >> > >> > Thanks, >> > Vishwas >> >