Hi Vishwas, Since `DruidStreamJob` is an “object” of scala, and the initialization of your sds client is not within any method, it will be called every time ` DruidStreamJob` is loaded (like static block in Java). Your taskmanagers are different JVM processes, and ` DruidStreamJob` needs to be loaded within them, so the initialization of your sds client is called each time.
You can try to put the initialization within `runJob` method, and pass it down as a parameter. But I wonder if a sds client can be serialized or not, since this kind of client usually holds a http connection, which cannot be serialized. Best, Victor From: Vishwas Siravara <vsirav...@gmail.com> Date: Friday, August 16, 2019 at 11:48 PM To: Steven Nelson <snel...@sourceallies.com>, user <user@flink.apache.org> Subject: Re: Understanding job flow I did not find this to be true. Here is my code snippet. object DruidStreamJob extends Job with SinkFn { private[flink] val druidConfig = DruidConfig.current private[flink] val decryptMap = ExecutionEnv.loadDecryptionDictionary // //TODO: Add this to sbt jvm, this should be set in sbt fork jvm. This is hack. // System.setProperty("java.library.path", "/Users/vsiravar/workspace/aipcryptoclient/lib") // // import java.lang.reflect.Field // // val fieldSysPath: Field = classOf[ClassLoader].getDeclaredField("sys_paths") // fieldSysPath.setAccessible(true) // fieldSysPath.set(null, null) // // print(System.getProperty("java.library.path")) private[flink] val aipSimpleAPIEncryptor = new AipCryptoClient( ExecutionEnv.mockEncryption, ExecutionEnv.enableEncryption, ExecutionEnv .loadEncryptionSet) aipSimpleAPIEncryptor.init("aip_crypto_config.properties") val appLogger: Logger = LoggerFactory.getLogger(DruidStreamJob.getClass) val errorLogger: Logger = LoggerFactory.getLogger("streaming.error") private[flink] val sdsClient = SDSEncryptor(decryptMap, ExecutionEnv.mockDecryption) sdsClient.init() /** * Start streaming job execution . * * @param argMap */ private[flink] def runJob(argMap: Map[String, String]): Unit = { val env = ExecutionEnv.executionEnv(argMap) this.source = ExecutionEnv.sourceTopics env.enableCheckpointing(1000) env.setStateBackend(new FsStateBackend("s3://vishwas.test1/checkpoints")) sourceAndSinkFn(env, source) env.execute(jobName = name) } /** * @inheritdoc * @param env * @param topics */ override private[flink] def sourceAndSinkFn( env: StreamExecutionEnvironment, topics: List[String]) = { val dataStream = addSource(env) log.info("Subscribed to topics" + topics) val filteredStream = dataStream.filter(new FilterFunction[GenericRecord] { override def filter(value: GenericRecord): Boolean = { ExecutionEnv.messageTypeList.contains(value.get("CMLS_REQST_MSG_TYP").toString) & ExecutionEnv .pcrList.contains(value.get("CMLS_DEST_PCR").toString) } }) val result = filteredStream.map(record => encryptWithAipCryptoClient(addTimeStamp(sdsClient .decrypt(applyValues(record))))) result.print() KafkaSink(result).sendToKafka } private[flink] def encryptWithAipCryptoClient(maptoEncrypt: mutable.Map[String, Any]): mutable.Map[String, Any] = { aipSimpleAPIEncryptor.encrypt(maptoEncrypt.asInstanceOf[mutable.Map[String, AnyRef]].asJava) maptoEncrypt } private[flink] def applyValues( genericRecord: GenericRecord): mutable.Map[String, Any] = { collection.mutable.Map(genericRecord.getSchema.getFields.asScala .map(field => field.schema().getType match { case Schema.Type.LONG => field.name<http://field.name>() -> genericRecord.get(field.name<http://field.name>()).asInstanceOf[Long] case Schema.Type.INT => field.name<http://field.name>() -> genericRecord.get(field.name<http://field.name>()).asInstanceOf[Int] case Schema.Type.DOUBLE => field.name<http://field.name>() -> genericRecord.get(field.name<http://field.name>()).asInstanceOf[Double] case Schema.Type.STRING => field.name<http://field.name>() -> genericRecord.get(field.name<http://field.name>()).toString case _ => field.name<http://field.name>() -> genericRecord.get(field.name<http://field.name>()).toString }): _*) } private[flink] def addTimeStamp(payload: mutable.Map[String, Any]): mutable.Map[String, Any] = { try { if (!payload("CMLS_CPD_ORIG_DT").equals("19000101")) { return payload + ("timestamp" -> TimeUtility.convertDateStringToLong(payload("CMLS_CPD_ORIG_DT").asInstanceOf[String], payload("CMLS_AUTH_TIME").asInstanceOf[Int])); } return payload + ("timestamp" -> System.currentTimeMillis()) } catch { case e: Exception => { errorLogger.error("Unable to obtain epoch time, using currentSystem time" + e.printStackTrace()) return payload + ("timestamp" -> System.currentTimeMillis()) } } } } The code for initialization of sds client(font is green for that piece of code) is in the main thread, even before the job graph is created. However when I run this code on a cluster with 3 task managers on different nodes, it is initialized each time on all the 3 nodes(taskmanager). I wonder why this happens. Thanks, Vishwas On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson <snel...@sourceallies.com<mailto:snel...@sourceallies.com>> wrote: @transient or use a static factory. In Scala we use a @transient lazy val with an initializer to do this Sent from my iPhone On Aug 15, 2019, at 11:40 AM, Vishwas Siravara <vsirav...@gmail.com<mailto:vsirav...@gmail.com>> wrote: Thanks Steven. Is there a way where in I can create a singleton instance in each task manager instead of serializing this object ? Thanks, Vishwas On Thu, Aug 15, 2019 at 11:28 AM Steven Nelson <snel...@sourceallies.com<mailto:snel...@sourceallies.com>> wrote: The encryptor will be serialized and sent with the rest of your Job Graph when the job is submitted. If it’s not serializable you get an error. Sent from my iPhone > On Aug 15, 2019, at 11:00 AM, Vishwas Siravara > <vsirav...@gmail.com<mailto:vsirav...@gmail.com>> wrote: > > Hi guys, > I have a map job where I want to encrypt certain keys . I initialize the > encryptor in the main method and apply it in the map function. How is this > encryptor shared when I have my job running on multiple task managers with > parallelism > 1 ? > > Thanks, > Vishwas