My executor will be OOM when use spark-sql to read data from Mysql.
In
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala,
I see the following lines.I'm wandering why JDBC_BATCH_FETCH_SIZE should be
bigger than 0?
val fetchSize = {
val size = parameters.getOrElse(JDBC_BATCH_FETCH_SIZE, "0").toInt
require(size >= 0,
s"Invalid value `${size.toString}` for parameter " +s"`$JDBC_BATCH_FETCH_SIZE`.
The minimum value is 0. When the value is 0, " +
"the JDBC driver ignores the value and does the estimates.")
size
}
According to this, fetchSize should be Integer.MIN_VALUE to stream result
sets row-by-row. And in core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
(looks like an old style? ), I see fetchSize is set to Integer.MIN_VALUE too.
override def compute(thePart: Partition, context: TaskContext): Iterator[T] =
new NextIterator[T]
{
context.addTaskCompletionListener{ context => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY)
val url = conn.getMetaData.getURL
if (url.startsWith("jdbc:mysql:")) {
// setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force
// streaming results, rather than pulling entire resultset into memory.
// See the below URL
//
dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
stmt.setFetchSize(Integer.MIN_VALUE)
} else {
stmt.setFetchSize(100)
}
Thanks