This is an automated email from the ASF dual-hosted git repository.
philippus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-connectors-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 0f9c11b7 Separate csv formatting files and components (#508)
0f9c11b7 is described below
commit 0f9c11b7c0bbfb8b58e5c7f5dc3989b3e2ae6cd3
Author: Philippus Baalman <[email protected]>
AuthorDate: Thu Apr 16 11:45:12 2026 +0200
Separate csv formatting files and components (#508)
and remove todo
---
.../pekko/kafka/benchmarks/CsvFormatter.scala | 13 -----
.../pekko/kafka/benchmarks/CsvFormatting.scala | 56 ++++++++++++++++++++++
.../pekko/kafka/benchmarks/CsvQuotingStyle.scala | 26 ++++++++++
.../org/apache/pekko/kafka/benchmarks/Timed.scala | 28 ++---------
4 files changed, 85 insertions(+), 38 deletions(-)
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
index cd959083..1b2fbc4c 100644
---
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatter.scala
@@ -19,19 +19,6 @@ import org.apache.pekko.util.ByteString
import java.nio.charset.{ Charset, StandardCharsets }
import scala.collection.immutable
-private[benchmarks] sealed trait CsvQuotingStyle
-
-object CsvQuotingStyle {
-
- /** Quote only fields requiring quotes */
- case object Required extends CsvQuotingStyle
-
- /** Quote all fields */
- case object Always extends CsvQuotingStyle
-}
-
-// TODO: This needs to be deleted after migrating alpakka to pekko.
-// This is just temporary base to see everything compiles and tests will pass
without any issue
private[benchmarks] class CsvFormatter(delimiter: Char,
quoteChar: Char,
escapeChar: Char,
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatting.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatting.scala
new file mode 100644
index 00000000..fac3208e
--- /dev/null
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvFormatting.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
+ * Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.kafka.benchmarks
+
+import java.nio.charset.{ Charset, StandardCharsets }
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.stream.scaladsl.{ Flow, Source }
+import pekko.util.ByteString
+
+import scala.collection.immutable
+
+/**
+ * Provides CSV formatting flows that convert a sequence of String into their
CSV representation
+ * in [[pekko.util.ByteString]].
+ */
+private[benchmarks] object CsvFormatting {
+
+ /**
+ * Create a Flow for converting iterables to ByteString.
+ *
+ * @param endOfLine Line ending (default CR, LF)
+ * @param quotingStyle Quote all fields, or only fields requiring quotes
(default)
+ * @param charset Character set, defaults to UTF-8
+ * @param byteOrderMark Certain CSV readers (namely Microsoft Excel) require
a Byte Order mark, defaults to None
+ */
+ def format[T <: immutable.Iterable[String]](
+ delimiter: Char = ',',
+ quoteChar: Char = '"',
+ escapeChar: Char = '\\',
+ endOfLine: String = "\r\n",
+ quotingStyle: CsvQuotingStyle = CsvQuotingStyle.Required,
+ charset: Charset = StandardCharsets.UTF_8,
+ byteOrderMark: Option[ByteString] = None): Flow[T, ByteString, NotUsed]
= {
+ val formatter =
+ new CsvFormatter(delimiter, quoteChar, escapeChar, endOfLine,
quotingStyle, charset)
+ byteOrderMark.fold {
+ Flow[T].map(formatter.toCsv).named("CsvFormatting")
+ } { bom =>
+
Flow[T].map(formatter.toCsv).named("CsvFormatting").prepend(Source.single(bom))
+ }
+
+ }
+}
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvQuotingStyle.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvQuotingStyle.scala
new file mode 100644
index 00000000..52d3b14b
--- /dev/null
+++
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/CsvQuotingStyle.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
+ * Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.kafka.benchmarks
+
+private[benchmarks] sealed trait CsvQuotingStyle
+
+object CsvQuotingStyle {
+
+ /** Quote only fields requiring quotes */
+ case object Required extends CsvQuotingStyle
+
+ /** Quote all fields */
+ case object Always extends CsvQuotingStyle
+}
diff --git
a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
index bd470cd2..a5a2b32f 100644
--- a/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
+++ b/benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala
@@ -17,17 +17,13 @@ package org.apache.pekko.kafka.benchmarks
import com.codahale.metrics._
import com.typesafe.scalalogging.LazyLogging
import org.apache.pekko
-import pekko.NotUsed
import pekko.kafka.benchmarks.InflightMetrics.{ BrokerMetricRequest,
ConsumerMetricRequest }
import pekko.kafka.benchmarks.app.RunTestCommand
import pekko.stream.Materializer
-import pekko.stream.scaladsl.{ FileIO, Flow, Sink, Source }
-import pekko.util.ByteString
+import pekko.stream.scaladsl.{ FileIO, Sink, Source }
-import java.nio.charset.{ Charset, StandardCharsets }
import java.nio.file.Paths
import java.util.concurrent.{ ForkJoinPool, TimeUnit }
-import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future }
@@ -56,32 +52,14 @@ object Timed extends LazyLogging {
val metricsReportDetailPath =
benchmarkReportBasePath.resolve(Paths.get(s"$testName-inflight-metrics-details.csv"))
require(inflight.size > 1, "At least 2 records (a header and a data row)
are required to make a report.")
val summary = Source(List(inflight.head, inflight.last))
- .via(format())
+ .via(CsvFormatting.format())
.alsoTo(Sink.foreach(bs => logger.info(bs.utf8String)))
.runWith(FileIO.toPath(metricsReportPath))
- val details =
Source(inflight).via(format()).runWith(FileIO.toPath(metricsReportDetailPath))
+ val details =
Source(inflight).via(CsvFormatting.format()).runWith(FileIO.toPath(metricsReportDetailPath))
implicit val ec: ExecutionContext = mat.executionContext
Await.result(Future.sequence(List(summary, details)), 10.seconds)
}
- private def format[T <: immutable.Iterable[String]](
- delimiter: Char = ',',
- quoteChar: Char = '"',
- escapeChar: Char = '\\',
- endOfLine: String = "\r\n",
- quotingStyle: CsvQuotingStyle = CsvQuotingStyle.Required,
- charset: Charset = StandardCharsets.UTF_8,
- byteOrderMark: Option[ByteString] = None): Flow[T, ByteString, NotUsed]
= {
- val formatter =
- new CsvFormatter(delimiter, quoteChar, escapeChar, endOfLine,
quotingStyle, charset)
- byteOrderMark.fold {
- Flow[T].map(formatter.toCsv).named("CsvFormatting")
- } { bom =>
-
Flow[T].map(formatter.toCsv).named("CsvFormatting").prepend(Source.single(bom))
- }
-
- }
-
def runPerfTest[F](command: RunTestCommand, fixtureGen: FixtureGen[F],
testBody: (F, Meter) => Unit): Unit = {
val name = command.testName
val msgCount = command.msgCount
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]