Hi Burak,

I recommend not using Mock libraries with Scala as it does not really need
that. Just substitute the CaffeineHelper with another implementation for
tests.
You could create two implementations of the trait CacheHelper[V]:

trait CacheHelper[V] extends Serializable {
  def get(id: String): Option[V]

  def put(key: String)(value: V): Unit
}


class CaffeineHelper[V](cache: CaffeineCache[V]) extends CacheHelper[V] {

  def get(id: String): Option[V] = Try(cache.get(id).get).toOption

  def put(key: String)(value: V): Unit = cache.put(key)(value); ()

}


class CacheHelperMock[V](map: mutable.Map[V, V]) extends CacheHelper[V] {

  def get(id: String): Option[V] = map.get(id)

  def put(key: String)(value: V): Unit = cache.put(key, value); ()

}

Then use the second one in your tests.

Also, I see that Flink complains that scala.Option is serializable. It
seems this import "import org.apache.flink.api.scala.createTypeInformation"
or this one "import org.apache.flink.api.scala._" is missing in one of your
Scala classes/files.
In more details, this could be a case when Scala implicit type converter
was not found/available in the scope of the Flink job graph construction,
so that Flink either fallbacks to Kryo serialization framework or gives up
completely like in your case.


Alexey

On Wed, Oct 30, 2024 at 3:38 PM Burak Dursunlar <
burak.dursun...@trendyol.com> wrote:

> I am struggling with writing unittests on scala flink application.
>
> For instance I have an async mapper like below. It takes a User object
> with id, and enriches with age:
>
> ```
>
> case class User(id: String)case class UserWithMeta(id: String, age: Long)
> class EnrichWithUserMeta extends RichAsyncFunction[User, UserWithMeta] {
>   protected var userMetadataCache: CaffeineHelper[Long] = _
>
>   override def open(parameters: Configuration): Unit = {
>     userMetadataCache = CaffeineHelper.newCache[Long](10.seconds)
>   }
>
>   override def asyncInvoke(input: User, resultFuture: 
> ResultFuture[UserWithMeta]): Unit = {
>     val age = getUserAge(input.id)
>     resultFuture.complete(Seq(UserWithMeta(input.id, age)))
>   }
>
>   private def getUserAge(userId: String): Long = {
>     userMetadataCache.get(userId) match {
>       case Some(age) => age
>       case None => { // Cache miss. Get it from DB.
>         val age = getAgeFromDB(content)
>         userMetadataCache.put(userId)(age)
>         age
>       }
>     }
>   }
> }
>
> ```
>
> The above mapper has a cache instance of CaffeineHelper. CaffeineHelper is a 
> wrapper of a Caffeine caching library.
>
> Here is the CaffeineHelper.scala:
>
> ```
>
> trait CaffeineHelper[V] extends Serializable {
>   @transient protected var cache: CaffeineCache[V]
>
>   def get(id: String): Option[V] = Try(cache.get(id).get).toOption
>
>   def put(key: String)(value: V) = cache.put(key)(value)
> }
> object CaffeineHelper {
>   def newCache[V](d: Duration): CaffeineHelper[V] = {
>     new CaffeineHelper[V] {
>       override protected var cache: CaffeineCache[V] = ??? // Actual 
> implementation.
>     }
>   }
> }
>
> ```
>
> The main flink application that uses the async mapper above works fine. 
> However, when I try to write unittests mocked CaffeineHelper cannot be 
> serialized.
>
> ```
>
> package org.example
>
> import org.apache.flink.api.scala.createTypeInformation
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
> import org.apache.flink.streaming.api.scala.{AsyncDataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.test.util.MiniClusterWithClientResource
> import org.mockito.IdiomaticMockito.StubbingOps
> import org.mockito.Mockito.withSettings
> import org.mockito.mock.SerializableMode
> import org.scalatest.BeforeAndAfter
> import org.scalatest.flatspec.AnyFlatSpec
> import org.scalatest.matchers.should.Matchers
> import org.scalatestplus.mockito.MockitoSugar.mock
> import scalacache.caffeine.CaffeineCache
> import java.util.concurrent.TimeUnit
>
>
> class EnrichWithUserMetaTest extends AnyFlatSpec with Matchers with 
> BeforeAndAfter {
>   private val flinkCluster = new MiniClusterWithClientResource(new 
> MiniClusterResourceConfiguration.Builder().build())
>
>   before(flinkCluster.before())
>   after(flinkCluster.after())
>
>   "EnrichWithUserMetaMapper" should "enrich with brand" in {
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>     val caffeineMock = 
> mock[CaffeineHelper[Long]](withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS))
>
>     val ageMapper = new EnrichWithUserMeta() {
>       override def open(parameters: Configuration): Unit = {
>         userMetadataCache = caffeineMock
>       }
>     }
>
>     caffeineMock.get("content1") returns Some(4L)
>
>     val stream = env.fromElements(
>       User("user1")
>     )
>
>     val enrichedProducts = AsyncDataStream
>       .unorderedWait(stream, ageMapper, 5, TimeUnit.SECONDS, 1)
>       .executeAndCollect(1)
>
>     enrichedProducts.head shouldBe UserWithMeta("user1", 4L)
>   }
> }
>
> ```
>
> Error message:
>
> ```
>
> public default scala.Option org.example.CaffeineHelper.get(java.lang.String) 
> is not serializable. The object probably contains or references non 
> serializable fields.
> org.apache.flink.api.common.InvalidProgramException: public default 
> scala.Option org.example.CaffeineHelper.get(java.lang.String) is not 
> serializable. The object probably contains or references non serializable 
> fields.
>     at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
>     at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
>
> ```
>
>
> Therefore I've tried configuring the mock as serializable
>
> ```
>
> val caffeineMock = mock[CaffeineHelper[Long]](withSettings().serializable())
>
> ```
>
> But I've got another error:
>
> ```
>
> org.mockito.internal.invocation.RealMethod$FromCallable@656a3d6b is not 
> serializable. The object probably contains or references non serializable 
> fields.
> org.apache.flink.api.common.InvalidProgramException: 
> org.mockito.internal.invocation.RealMethod$FromCallable@656a3d6b is not 
> serializable. The object probably contains or references non serializable 
> fields.
>     at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
>
> ```
>
>
> Other mocking libraries like ScalaMock did not work either. As a last chance, 
> I’ve tried test harness.
>
> ```
>
>   "EnrichWithUserMetaMapper" should "enrich with brand test harness" in {
>     val caffeineMock = mock[CaffeineHelper[Long]]
>
>     val ageMapper: AsyncFunction[User, UserWithMeta] = new EnrichWithUserMeta 
> {
>       override def open(parameters: Configuration): Unit = {
>         userMetadataCache = caffeineMock
>       }
>     }
>
>     caffeineMock.get("user1") returns Some(4L)
>
>     val asyncOperator = new AsyncWaitOperator[User, UserWithMeta](
>       ageMapper,
>       60,
>       1,
>       OutputMode.UNORDERED,
>       
> AsyncRetryStrategies.NO_RETRY_STRATEGY.asInstanceOf[AsyncRetryStrategy[UserWithMeta]],
>       new TestProcessingTimeService,
>       new MailboxExecutorImpl(new TaskMailboxImpl, 1, 
> StreamTaskActionExecutor.IMMEDIATE)
>     )
>
>     val testHarness = new OneInputStreamOperatorTestHarness(asyncOperator)
>
>     testHarness.getExecutionConfig.setAutoWatermarkInterval(50)
>
>     testHarness.open()
>
>     testHarness.processElement(User("user1"), 100L)
>     testHarness.processWatermark(100)
>     testHarness.setProcessingTime(100)
>
>     val expectedOutput = 
> testHarness.getOutput.poll().asInstanceOf[StreamRecord[UserWithMeta]].getValue
>
>     expectedOutput shouldBe UserWithMeta("user1", 4L)
>   }
>
> ```
>
> Here is the error when I use test harness:
>
> ```
>
> java.lang.IllegalStateException was thrown.
> java.lang.IllegalStateException
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:356)
>       at 
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:346)
>       at 
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:185)
>       at 
> org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:78)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:81)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:434)
>       at 
> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:198)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:421)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:578)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:466)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:474)
>       at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:672)
>       at 
> org.example.EnrichWithUserMetaTest.$anonfun$new$1(EnrichWithUserMetaTest.scala:76)
>
> ```
>
>
> Are there any examples unittesting async functions that have mocked fields? 
> I’ve scanned the fiink documentation but I could not find how to proceed.
>
> Regards.
>
>
>
> YASAL UYARI:
> Bu e-posta, ekleri de dahil olmak üzere, yalnızca gönderildiği kişi veya
> kuruluşa özel olup gizli bilgi ve kişisel veri içerebilir. Bu e-postanın
> tarafınıza gönderim amacı ile sınırlı ve orantılı olarak kullanılması yasal
> bir zorunluluktur. E-posta içeriğinin gönderim amacı dışında kullanılması,
> ifşa edilmesi, kopyalanması, arşivlenmesi veya dağıtımının yapılması
> yasaktır. Ayrıca, işbu e-posta içeriğinde yer alan gizli bilgi ve kişisel
> verilerin güvenliğinin sağlanması sorumluluğu tarafınıza ait olup herhangi
> bir ihlal halinde şirketimizin sorumluluğu bulunmamaktadır. İşbu e-postanın
> muhatabı olmamanıza rağmen size ulaşmış olması halinde, e-postayı derhal
> imha ederek bu durumu gecikmeksizin tarafımıza bildirmenizi rica ederiz.
>
> LEGAL DISCLAIMER:
> This email, including any attachments, may contain confidential
> information and personal data intended solely for the use of the person or
> entity to whom it is addressed. You are under a legal obligation to use
> this email and any attachments only for their intended purpose. Any
> disclosure, duplication, misuse, archiving, copying, distribution of this
> email by and to others is strictly prohibited. In addition, the
> responsibility for ensuring the security of confidential information and
> personal data contained in the content of this email lies with you, and our
> company is not responsible in case of any violation. If this email has
> reached you despite not being the intended recipient, please promptly
> delete the email and notify us without undue delay.
>

Reply via email to