AWS EMR - Spark Random Forest - java.lang.OutOfMemoryError

Hello,

I’m struggling with the out of memory problem, I have a table reader that has 614 000 rows and 3200 columns, all the data is passed to Table to Spark, Spark Partitioning and then it arrives on Random Forest Learner. Context is created on Spark-Livy node on EMR. spark.dynamicAllocation.enabled is set to false. I’m using KNIME 4.0.2 version and EMR 5.23.0.

Table Reader -> Table to Spark -> Spark Partitioning -> Spark Random Forest Learner

ERROR : KNIME-Worker-5 : : Node : Spark Random Forest Learner : 0:2656 : Execute failed: An error occured. For details see View > Open KNIME log.
java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:798)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:797)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:797)
at org.apache.spark.ml.tree.impl.RandomForest$.findBestSplits(RandomForest.scala:538)
at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:201)
at org.apache.spark.ml.classification.RandomForestClassifier$$anonfun$train$1.apply(RandomForestClassifier.scala:142)
at org.apache.spark.ml.classification.RandomForestClassifier$$anonfun$train$1.apply(RandomForestClassifier.scala:120)
at org.apache.spark.ml.util.Instrumentation$$anonfun$11.apply(Instrumentation.scala:183)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:183)
at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:120)
at org.apache.spark.ml.classification.RandomForestClassifier.train(RandomForestClassifier.scala:46)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:153)
at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
at scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:149)
at org.knime.bigdata.spark2_4.jobs.ml.prediction.MLClassificationLearnerJob.runJob(MLClassificationLearnerJob.java:136)
at org.knime.bigdata.spark2_4.jobs.ml.prediction.MLClassificationLearnerJob.runJob(MLClassificationLearnerJob.java:1)
at org.knime.bigdata.spark2_4.base.LivySparkJob.call(LivySparkJob.java:90)
at org.knime.bigdata.spark2_4.base.LivySparkJob.call(LivySparkJob.java:1)
at org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:40)
at org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27)
at org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:57)
at org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:42)
at org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:27)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

It’s not a heap space problem on KNIME because -xmx uses all the memory available and also not a problem with the required memory, because Ganglia graphs didn’t surpass red line limit, it had a total of 1.6T of RAM and it didn’t went over 1.4T. I read about heap size errors, but in this case it’s not a Java Heap Space because it didn’t stipulate it in error, not a GC error limit. I searched for solutions on bunch of mail archives and other cloud forums.

Tried:

  • used maximizeResourceAllocation from AWS in order to let itself to manage the resources, didn’t work
  • set spark.storage.level to DISK_ONLY, MEMORY_AND_DISK_SER, none of them helped
  • used less workers with more memory, 10 cores - 10 partitions, 300G
  • used increased parallelism, 192 workers with 1 core each, 4G
  • used YARN mode
  • used GC collector configurations from AWS docs
  • increased xmx parameters on the machine where KNIME runs to the highest memory available

EMR hardware:
master
1 x c5.18xlarge (72 vCore, 144 GiB memory)
core
2 x r5.24xlarge (96 vCore, 768 GiB memory)

How do I get the total size of Table Reader node?

How can I solve outOfMemory error?

Thanks,
Any suggestions regardless the ones I tried are welcome

Hi @sirev,
Before we go into the OutofMemory issue, one thing:
The “Table to Spark” node is not the right node to use in this case, it is only supposed to be used for testing on small samples. In general we recommend to use the pattern: “Parquet Writer” to write the data into S3 followed by a “Parquet to Spark” node to get data into your cluster.

However the OutOfMemory error looks to me, like the settings for the driver memory are not tuned right.
Did you change the settings according to your setup? You find the settings for the driver resources in the Advanced tab of the Livy node. There you should “Override default Spark driver resources” and try some higher values.

best regards Mareike

2 Likes

hi @mareike.hoeger,
Increased the memory on driver node and it helped to prevent outOfMemory error.

Regarding the table to spark, I use it for testing purposes only, but further I’m thinking to change it to parquet only if the limits of that node doesn’t surpass the needs as on Table to Spark are 600k rows and 3240 columns.

Than you, appreciate it!

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.