This is an automated email from the ASF dual-hosted git repository.

philippus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f9c11b7 Separate csv formatting files and components (#508)
0f9c11b7 is described below

commit 0f9c11b7c0bbfb8b58e5c7f5dc3989b3e2ae6cd3
Author: Philippus Baalman <[email protected]>
AuthorDate: Thu Apr 16 11:45:12 2026 +0200

    Separate csv formatting files and components (#508)
    
    and remove todo
---
 .../pekko/kafka/benchmarks/CsvFormatter.scala      | 13 -----
 .../pekko/kafka/benchmarks/CsvFormatting.scala     | 56 ++++++++++++++++++++++
 .../pekko/kafka/benchmarks/CsvQuotingStyle.scala   | 26 ++++++++++
 .../org/apache/pekko/kafka/benchmarks/Timed.scala  | 28 ++---------
 4 files changed, 85 insertions(+), 38 deletions(-)

diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
index cd959083..1b2fbc4c 100644
--- 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
@@ -19,19 +19,6 @@ import org.apache.pekko.util.ByteString
 import java.nio.charset.{ Charset, StandardCharsets }
 import scala.collection.immutable
 
-private[benchmarks] sealed trait CsvQuotingStyle
-
-object CsvQuotingStyle {
-
-  /** Quote only fields requiring quotes */
-  case object Required extends CsvQuotingStyle
-
-  /** Quote all fields */
-  case object Always extends CsvQuotingStyle
-}
-
-// TODO: This needs to be deleted after migrating alpakka to pekko.
-// This is just temporary base to see everything compiles and tests will pass 
without any issue
 private[benchmarks] class CsvFormatter(delimiter: Char,
     quoteChar: Char,
     escapeChar: Char,
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatting.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatting.scala
new file mode 100644
index 00000000..fac3208e
--- /dev/null
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatting.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
+ * Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.kafka.benchmarks
+
+import java.nio.charset.{ Charset, StandardCharsets }
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.stream.scaladsl.{ Flow, Source }
+import pekko.util.ByteString
+
+import scala.collection.immutable
+
+/**
+ * Provides CSV formatting flows that convert a sequence of String into their 
CSV representation
+ * in [[pekko.util.ByteString]].
+ */
+private[benchmarks] object CsvFormatting {
+
+  /**
+   * Create a Flow for converting iterables to ByteString.
+   *
+   * @param endOfLine     Line ending (default CR, LF)
+   * @param quotingStyle  Quote all fields, or only fields requiring quotes 
(default)
+   * @param charset       Character set, defaults to UTF-8
+   * @param byteOrderMark Certain CSV readers (namely Microsoft Excel) require 
a Byte Order mark, defaults to None
+   */
+  def format[T <: immutable.Iterable[String]](
+      delimiter: Char = ',',
+      quoteChar: Char = '"',
+      escapeChar: Char = '\\',
+      endOfLine: String = "\r\n",
+      quotingStyle: CsvQuotingStyle = CsvQuotingStyle.Required,
+      charset: Charset = StandardCharsets.UTF_8,
+      byteOrderMark: Option[ByteString] = None): Flow[T, ByteString, NotUsed] 
= {
+    val formatter =
+      new CsvFormatter(delimiter, quoteChar, escapeChar, endOfLine, 
quotingStyle, charset)
+    byteOrderMark.fold {
+      Flow[T].map(formatter.toCsv).named("CsvFormatting")
+    } { bom =>
+      
Flow[T].map(formatter.toCsv).named("CsvFormatting").prepend(Source.single(bom))
+    }
+
+  }
+}
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvQuotingStyle.scala
 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvQuotingStyle.scala
new file mode 100644
index 00000000..52d3b14b
--- /dev/null
+++ 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvQuotingStyle.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
+ * Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.kafka.benchmarks
+
+private[benchmarks] sealed trait CsvQuotingStyle
+
+object CsvQuotingStyle {
+
+  /** Quote only fields requiring quotes */
+  case object Required extends CsvQuotingStyle
+
+  /** Quote all fields */
+  case object Always extends CsvQuotingStyle
+}
diff --git 
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala 
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
index bd470cd2..a5a2b32f 100644
--- a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
+++ b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
@@ -17,17 +17,13 @@ package org.apache.pekko.kafka.benchmarks
 import com.codahale.metrics._
 import com.typesafe.scalalogging.LazyLogging
 import org.apache.pekko
-import pekko.NotUsed
 import pekko.kafka.benchmarks.InflightMetrics.{ BrokerMetricRequest, 
ConsumerMetricRequest }
 import pekko.kafka.benchmarks.app.RunTestCommand
 import pekko.stream.Materializer
-import pekko.stream.scaladsl.{ FileIO, Flow, Sink, Source }
-import pekko.util.ByteString
+import pekko.stream.scaladsl.{ FileIO, Sink, Source }
 
-import java.nio.charset.{ Charset, StandardCharsets }
 import java.nio.file.Paths
 import java.util.concurrent.{ ForkJoinPool, TimeUnit }
-import scala.collection.immutable
 import scala.concurrent.duration._
 import scala.concurrent.{ Await, ExecutionContext, Future }
 
@@ -56,32 +52,14 @@ object Timed extends LazyLogging {
     val metricsReportDetailPath = 
benchmarkReportBasePath.resolve(Paths.get(s"$testName-inflight-metrics-details.csv"))
     require(inflight.size > 1, "At least 2 records (a header and a data row) 
are required to make a report.")
     val summary = Source(List(inflight.head, inflight.last))
-      .via(format())
+      .via(CsvFormatting.format())
       .alsoTo(Sink.foreach(bs => logger.info(bs.utf8String)))
       .runWith(FileIO.toPath(metricsReportPath))
-    val details = 
Source(inflight).via(format()).runWith(FileIO.toPath(metricsReportDetailPath))
+    val details = 
Source(inflight).via(CsvFormatting.format()).runWith(FileIO.toPath(metricsReportDetailPath))
     implicit val ec: ExecutionContext = mat.executionContext
     Await.result(Future.sequence(List(summary, details)), 10.seconds)
   }
 
-  private def format[T <: immutable.Iterable[String]](
-      delimiter: Char = ',',
-      quoteChar: Char = '"',
-      escapeChar: Char = '\\',
-      endOfLine: String = "\r\n",
-      quotingStyle: CsvQuotingStyle = CsvQuotingStyle.Required,
-      charset: Charset = StandardCharsets.UTF_8,
-      byteOrderMark: Option[ByteString] = None): Flow[T, ByteString, NotUsed] 
= {
-    val formatter =
-      new CsvFormatter(delimiter, quoteChar, escapeChar, endOfLine, 
quotingStyle, charset)
-    byteOrderMark.fold {
-      Flow[T].map(formatter.toCsv).named("CsvFormatting")
-    } { bom =>
-      
Flow[T].map(formatter.toCsv).named("CsvFormatting").prepend(Source.single(bom))
-    }
-
-  }
-
   def runPerfTest[F](command: RunTestCommand, fixtureGen: FixtureGen[F], 
testBody: (F, Meter) => Unit): Unit = {
     val name = command.testName
     val msgCount = command.msgCount


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to