Easier Multiprocessing in Python scripts

This is in reference of

To be able to use multiprocessing from a Python script node I need to create my own module and import that. And this process isn’t straight forward at all. For a direct import as shown in above thread, the module must be installed. An alternative is to edit path at runtime:

import sys
module_path = 'c:/xyz/my_module'

if module_path not in sys.path:
from my_module import my_function

Still this requires for every function to be called in multiprocessing to be put into a file even if it’s just a one-off thing. It also makes the python script “opaque” as it’s less clear what actually happens and you simply can’t share it like put it in a component as now the external python file is missing.

Ideally KNIME would take care of this and create this module and import it auto-magically if it detects a function definition inside the script. Or find a different way than “exec” to run the script. But as-is multi-processing is barley useable inside Python Script nodes.

To distribute its task to the newly created processes, multiprocessing uses the pickle module from the Python standard library to serialize everything for transmission to processes created/managed through multiprocessing. This includes the data and the function you wish to call, “my_function” in your example. Unfortunately, the way pickle serializes functions is to specify the importable module where that function is defined.

>>> import pickle
>>> data = {'apple': 'red', 'banana': 'yellow'}
>>> pickle.dumps(data)  # Briefly investigate what a dict looks like when pickled.
>>> pickle.dumps(pickle.dumps)  # Pickling a function contains a reference to the module where it is defined ("pickle").
>>> def say_hello():
...     print('Hola!')
>>> pickle.dumps(say_hello)  # This function was defined in the interactive shell, also known as "__main__".

Later, unpickling pickle.dumps will succeed because the pickle module is importable but unpickling the function I defined in the interactive shell refers to __main__ which is not importable. multiprocessing attempts to catch this problem with unpicklability (an impressive sounding word when said out loud) before it becomes a big problem.

I can suggest a couple of options for you to be able to define new functions whenever you want and successfully pickle and unpickle them in multiprocessing:

  1. As you already suggest, save your function definitions in a Python module (like “test3.py”). To make it especially easy on yourself when using this with KNIME, place this importable Python module in the directory that contains your KNIME Workflow. In your Python node, “import test3” will find this module file in your Workflow’s directory and you should be able to successfully pickle/unpickle it, as required by multiprocessing.
  2. There are other serialization tools for Python that are not part of the Python standard library that can help, especially “cloudpickle” and “dill”. They will permit you to define functions on-the-fly in the Python interactive shell and serialize them in a way that does not require you to store their definition in an importable module.
  3. You might prefer to make use of my example workflow on the KNIME Hub which showcases how to define a function in one Python node without saving its definition in a module file, then serialize it and pass it to downstream Python nodes: Propagate Python Functions Downstream – KNIME Hub

Given that Python already provides these options (e.g. use a module file or use “cloudpickle”, etc.) we can quickly create new Python functions on the fly and use them with pickle and multiprocessing inside KNIME nodes without much effort.

Hope this helps,



Thanks for your extensive reply.

I did some more “research” and it seems joblib can deal just fine with functions defined inside the Python Script node. (as it uses loky as backend which I think but not entirely sure uses cloudpickle)

Therefore the solution for me is to simply use joblib. To be nitpicking, an example about this somewhere in a blog would be nice.

Basic example (generalized form here untested)

import knime_io as knio
from joblib import Parallel, delayed

def my_func(x):
    return x*x

df = knio.input_tables[0].to_pandas()
df['Square'] = Parallel(n_jobs=6)(delayed(my_func)(i) for i in range(len(df.index)))
knio.output_tables[0] = knio.write_table(df)

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.