Convert Spark Submit Job to KNIME Equivalent

#1

Hi @Iris @tobias.koetter

Presently I’m running a spark submit job through command line on a cloudera cluster which looks something like this -

nohup /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/bin/spark-submit \
--class "com.nfpa.demo.testClass" \ //Class to be used from the below config file
--master yarn \ //Spark Configurations to be used
--deploy-mode cluster \
--executor-memory 2408M \
--num-executors 12 \
--executor-cores 4 \
--driver-memory 2G \
--conf "spark.storage.memoryFraction=1" \
--conf "spark.akka.frameSize=200" \
--conf "spark.default.parallelism=100" \
--conf "spark.core.connection.ack.wait.timeout=600" \
--conf "spark.yarn.executor.memoryOverhead=2048" \
--conf "spark.yarn.driver.memoryOverhead=400" \
/home/mayub/test-1.0-SNAPSHOT.jar \ //Custom JAR File to be used
hdfs://10.10.10.80:8020/user/mayub/nfpa_output/run_20190210/ \ //Output file location
/home/mayub/tmp/sample_index/ //Sample Index file
/user/mayub/nfpa_input/sample.csv //Input File used for process
10 \ //Custom Parameter1
240 \ //Custom Parameter2
ALL_STATES \ //Custom Paramaeter3
> /home/mayub/NFPAGeocoder/run_geocode_data_20190210.log 2>&1 & Logging into file

Since we are using Cloudera, I have added the jar file to the Classpath through UI and Spark Configurations while creating the Spark Context inside of KNIME. as below:

However, I’m not able to figure out how my rest of the command can be moved over to KNIME. I’m guessing it should be simple. Any help is appreciated.

Thanks !

Mohammed Ayub

0 Likes

#2

Hi @mohammedayub

you can use “Spark RDD Java Snippet (Source)” node.

If you open the configuration dialog, you can add the test-1.0-SNAPSHOT.jar to the compiler classpath. Then you need to invoke the relevant method of your com.nfpa.demo.testClass.

You can pass the string parameters (Output file location, Sample Index file, Input File used for process, /Custom Parameter1/2/3) as flow variables into the snippet and there pass them into your com.nfpa.demo.testClass.

One complication:
The string params that start with /home/mayub/ seem to reference local files?

If your class is reading them in the Spark driver process, they need to be accessible locally by the spark-job-server Linux user on the machine where jobserver runs, because that is where jobserver executes the Spark driver process.

However, if your class is reading them on the Spark executors (e.g. inside a mapPartition() closure), then they need to be mirrored to every machine in your cluster.

Björn

0 Likes

#3

Hi @bjoern.lohrmann

Thanks for you reply to this issue.
Can you create/share a dummy workflow with the configs you mentioned. I’m finding it a little hard to see how to instantiate the class with the param in this particular node. -sorry I’m not a pro coder :slight_smile:

Yes, the string params /home/mayub/ are locally present in all the nodes of the cluster in the same location.

Thanks !

Mohammed Ayub

0 Likes

#4

Hi @bjoern.lohrmann

Any update on this.

Thanks !

Mohammed Ayub

0 Likes

#5

Hi @bjoern.lohrmann and @Iris ,

As you suggested, I am trying to replicate the same thing in “Spark RDD Java Snippet (Source)” node. My node configurations look like this:

After running this, I get the below error:
“ERROR Spark RDD Java Snippet (Source) 3:221:170:182 Execute failed: com.nfpa.demo.SparkAddressGeocoderOutputFormatter (ClassNotFoundException)”

Initially, it was giving an error on the class name so I added the jar file to the CDH spark job server location which resolved the issue. The jar file is also in the “Additional Libraries” page in the node.

Could you please help me in resolving this?

Thanks,
Radhika Maheshwari

0 Likes

#6

Hi @bjoern.lohrmann and @Iris,

Is there any update on this? Looking forward to your reply.

Thanks,
Radhika

0 Likes