Hi Vishwas,

Since `DruidStreamJob` is an “object” of scala, and the initialization of your 
sds client is not within any method, it will be called every time ` 
DruidStreamJob` is loaded (like static block in Java).
Your taskmanagers are different JVM processes, and ` DruidStreamJob` needs to 
be loaded within them, so the initialization of your sds client is called each 
time.

You can try to put the initialization within `runJob` method, and pass it down 
as a parameter.
But I wonder if a sds client can be serialized or not, since this kind of 
client usually holds a http connection, which cannot be serialized.

Best,
Victor

From: Vishwas Siravara <vsirav...@gmail.com>
Date: Friday, August 16, 2019 at 11:48 PM
To: Steven Nelson <snel...@sourceallies.com>, user <user@flink.apache.org>
Subject: Re: Understanding job flow

I did not find this to be true. Here is my code snippet.


object DruidStreamJob extends Job with SinkFn {

  private[flink] val druidConfig = DruidConfig.current

  private[flink] val decryptMap = ExecutionEnv.loadDecryptionDictionary

  //  //TODO: Add this to sbt jvm, this should be set in sbt fork jvm. This is 
hack.
  //  System.setProperty("java.library.path", 
"/Users/vsiravar/workspace/aipcryptoclient/lib")
  //
  //  import java.lang.reflect.Field
  //
  //  val fieldSysPath: Field = 
classOf[ClassLoader].getDeclaredField("sys_paths")
  //  fieldSysPath.setAccessible(true)
  //  fieldSysPath.set(null, null)
  //
  //  print(System.getProperty("java.library.path"))

  private[flink] val aipSimpleAPIEncryptor = new AipCryptoClient(
    ExecutionEnv.mockEncryption,
    ExecutionEnv.enableEncryption,
    ExecutionEnv
      .loadEncryptionSet)

  aipSimpleAPIEncryptor.init("aip_crypto_config.properties")

  val appLogger: Logger = LoggerFactory.getLogger(DruidStreamJob.getClass)

  val errorLogger: Logger = LoggerFactory.getLogger("streaming.error")

  private[flink] val sdsClient = SDSEncryptor(decryptMap, 
ExecutionEnv.mockDecryption)

  sdsClient.init()

  /**
   * Start streaming job execution .
   *
   * @param argMap
   */
  private[flink] def runJob(argMap: Map[String, String]): Unit = {

    val env = ExecutionEnv.executionEnv(argMap)
    this.source = ExecutionEnv.sourceTopics

    env.enableCheckpointing(1000)
    env.setStateBackend(new FsStateBackend("s3://vishwas.test1/checkpoints"))
    sourceAndSinkFn(env, source)
    env.execute(jobName = name)
  }

  /**
   * @inheritdoc
   * @param env
   * @param topics
   */
  override private[flink] def sourceAndSinkFn(
    env: StreamExecutionEnvironment,
    topics: List[String]) = {
    val dataStream = addSource(env)
    log.info("Subscribed to topics" + topics)

    val filteredStream = dataStream.filter(new FilterFunction[GenericRecord] {

      override def filter(value: GenericRecord): Boolean = {
        
ExecutionEnv.messageTypeList.contains(value.get("CMLS_REQST_MSG_TYP").toString) 
& ExecutionEnv
          .pcrList.contains(value.get("CMLS_DEST_PCR").toString)
      }
    })

    val result = filteredStream.map(record => 
encryptWithAipCryptoClient(addTimeStamp(sdsClient
      .decrypt(applyValues(record)))))

    result.print()
    KafkaSink(result).sendToKafka
  }

  private[flink] def encryptWithAipCryptoClient(maptoEncrypt: 
mutable.Map[String, Any]): mutable.Map[String, Any] = {
    aipSimpleAPIEncryptor.encrypt(maptoEncrypt.asInstanceOf[mutable.Map[String, 
AnyRef]].asJava)
    maptoEncrypt
  }

  private[flink] def applyValues(
    genericRecord: GenericRecord): mutable.Map[String, Any] = {

    collection.mutable.Map(genericRecord.getSchema.getFields.asScala
      .map(field =>
        field.schema().getType match {
          case Schema.Type.LONG =>
            field.name<http://field.name>() -> 
genericRecord.get(field.name<http://field.name>()).asInstanceOf[Long]
          case Schema.Type.INT =>
            field.name<http://field.name>() -> 
genericRecord.get(field.name<http://field.name>()).asInstanceOf[Int]
          case Schema.Type.DOUBLE =>
            field.name<http://field.name>() -> 
genericRecord.get(field.name<http://field.name>()).asInstanceOf[Double]
          case Schema.Type.STRING =>
            field.name<http://field.name>() -> 
genericRecord.get(field.name<http://field.name>()).toString
          case _ =>
            field.name<http://field.name>() -> 
genericRecord.get(field.name<http://field.name>()).toString
        }): _*)

  }

  private[flink] def addTimeStamp(payload: mutable.Map[String, Any]): 
mutable.Map[String, Any] = {
    try {
      if (!payload("CMLS_CPD_ORIG_DT").equals("19000101")) {
        return payload + ("timestamp" -> 
TimeUtility.convertDateStringToLong(payload("CMLS_CPD_ORIG_DT").asInstanceOf[String],
 payload("CMLS_AUTH_TIME").asInstanceOf[Int]));
      }
      return payload + ("timestamp" -> System.currentTimeMillis())
    } catch {
      case e: Exception => {
        errorLogger.error("Unable to obtain epoch time, using currentSystem 
time" + e.printStackTrace())
        return payload + ("timestamp" -> System.currentTimeMillis())
      }
    }
  }

}

The code for initialization of sds client(font is green for that piece of code) 
is in the main thread, even before the job graph is created. However when I run 
this code on a cluster with 3 task managers on different nodes, it is 
initialized each time on all the 3 nodes(taskmanager). I wonder why this 
happens.

Thanks,
Vishwas

On Thu, Aug 15, 2019 at 11:42 AM Steven Nelson 
<snel...@sourceallies.com<mailto:snel...@sourceallies.com>> wrote:
@transient or use a static factory.

In Scala we use a @transient lazy val with an initializer to do this
Sent from my iPhone

On Aug 15, 2019, at 11:40 AM, Vishwas Siravara 
<vsirav...@gmail.com<mailto:vsirav...@gmail.com>> wrote:
Thanks Steven. Is there a way where in I can create a singleton instance in 
each task manager instead of serializing this object ?

Thanks,
Vishwas

On Thu, Aug 15, 2019 at 11:28 AM Steven Nelson 
<snel...@sourceallies.com<mailto:snel...@sourceallies.com>> wrote:
The encryptor will be serialized and sent with the rest of your Job Graph when 
the job is submitted. If it’s not serializable you get an error.

Sent from my iPhone

> On Aug 15, 2019, at 11:00 AM, Vishwas Siravara 
> <vsirav...@gmail.com<mailto:vsirav...@gmail.com>> wrote:
>
> Hi guys,
> I have a map job where I want to encrypt certain keys . I initialize the 
> encryptor in the main method and apply it in the map function. How is this 
> encryptor shared when I have my job running on multiple task managers with 
> parallelism > 1 ?
>
> Thanks,
> Vishwas

Reply via email to