How to create a node similar to mongoDB connector node in knime using python extension

I want to create my own knime node simlar to the mongoDB connector using the python extension SDK. So that output of my node would be a database connection object.

This mongoDB connector node has output port of type MongoDBConnectionPortObject , I am not sure how to have a similar output port with python

In the python API when browsing throgh knime python repo the output port types I could see were @kn.output_table, @kn.output_binary, and @kn.output_view. How do I create a new port object, could someone help with an example on creating an output port which would be similar to MongoDBConnectionPortObject, using python.

Hi @arunjose696,

you bring up a very good question. As of now it isn’t possible to use the same port objects that are used by Java nodes (besides the default table port). However, should you want to define custom ports to use among Python nodes, this documentation may help you further: Pure Python Extensions: Defining custom port objects

Kind regards
Marvin

3 Likes

Thanks Marvin, I was trying to implement my custom ports but I am getting an error AttributeError: module ‘knime_extension’ has no attribute ‘PortObject’ when I try to create my port object.

The below is code I used.

AttributeError: module ‘knime_extension’ has no attribute ‘PortObject’

class TestPortObject(knext.PortObject):
    def __init__(self, spec: TestPortObjectSpec, data: str) -> None:
        super().__init__(spec)
        self._data = data

    def serialize(self) -> bytes:
        return self._data.encode()

    @classmethod
    def deserialize(cls, spec: TestPortObjectSpec, storage: bytes) -> "TestPortObject":
        return cls(spec, storage.decode())

    @property
    def data(self) -> str:
        return self._data

I could see the PortObject was not presesnt in knext library for knime conda package versions 4.6.4

Hi arun (what happened to your previous account? :slight_smile: ),

it looks like you use the KNIME Analytics Platform 4.6, is that correct?
We introduced these PortObjects in 4.7.0, you cannot use them in 4.6.
If you use 4.6, refer to the guide of that version (Create a New Python based KNIME Extension). Marvins link refers to the currenet (4.7) version.

Best regards
Steffen

Hi Steffen,

I started a new account from a different email :slight_smile:

I decided to go with 4.7 version itself as custom nodes were not present in 4.6 documentation. Now I am able to create custom ports but output port created by one node is not being able to be passed as input port to the next node.

The below is my code.

I defined the my custom node in a file called myfile.py


class MyPortObjectSpec(knext.PortObjectSpec):
    def __init__(self, spec_data: str) -> None:
        self._spec_data = spec_data

    def serialize(self) -> dict:
        return {"spec_data": self._spec_data}

    @classmethod
    def deserialize(cls, data: dict) -> "MyPortObjectSpec":
        cls(data["spec_data"])

    @property
    def spec_data(self) -> str:
        return self._data


import pickle


class MyPortObject(knext.PortObject):
    def __init__(self, spec: MyPortObjectSpec, model) -> None:
        super().__init__(spec)
        self._model = model

    def serialize(self) -> bytes:
        return pickle.dumps(self._model)

    @classmethod
    def deserialize(cls, spec: MyPortObjectSpec, data: bytes) -> "MyPortObject":
        return cls(spec, pickle.loads(data))

    def predict(self, data):
        return self._model.predict(data)


my_model_port_type = knext.port_type(name="My model port type", object_class=MyPortObject, spec_class=MyPortObjectSpec, id="0001")

In above code I create a nodetype with id=0001. I want this node to be output of first node and input of the next node.

Now in my first node I use

@knext.node(name="My Template Node", node_type=knext.NodeType.LEARNER, icon_path="icon.png", category="/")
@knext.output_port("Trained Model", "Trained fancy machine learning model", port_type=myfile.my_model_port_type)
class TemplateNode:
    
    def configure(self, configure_context, input_schema_1):
        return myfile.MyPortObjectSpec("sdada")
        
 
    def execute(self, exec_context, input_1):  
        return myfile.MyPortObject(myfile.MyPortObjectSpec("sdada"),{"a":1})

In the second node which should get the output of this node as input I try to use.

@knext.node(name="New node 1", node_type=knext.NodeType.LEARNER, icon_path="icon.png", category="/")
@knext.input_port("Trained Model", "Trained fancy machine learning model", port_type=myfile.my_model_port_type)
class TemplateNode:

    def configure(self, configure_context, input_schema_1):
        pass      

    def execute(self, exec_context, input_1):
        pass

But this does not work I get the below error

AssertionError: Expected input port ID 0001 but got org.tutorial.first_extension.myfile.MyPortObject

1 Like

Hi arun696,

I tried your code and adjusted some things, but I cannot reproduce your error. Please try the adjusted version and tell me whether you still get the AssertionError and where exactly.

Setup:
|
|- myfile.py
|- my_extension.py

I did not understand whether you defined your custom node also in the myfile, but why would you call the port type then via myfile.my_model_port_type?

In both files I added import knime.extension as knext
In my_extension.py I changed the class name such that not both node classes have the same name and the second overwrites the first one. I have no idea how you managed to see both. I also adjusted the configure and execute method to expect no input (because your leaner does not have input, but only one output):

class TemplateNode2:
    def configure(self, configure_context):
        return myfile.MyPortObjectSpec("sdada")

    def execute(self, exec_context):
        return myfile.MyPortObject(myfile.MyPortObjectSpec("sdada"), {"a": 1})

In a workflow, I connected the two nodes and they run through.

Best regards
Steffen

4 Likes

Hi Steffen,

This has been tremendous support I am getting here. I really appreciate it.

Your solution works for me when both nodes are in the same file. In my actual case both the nodes are in seperate packages so the id of the output node from first package takes the id from the package name.

Is it possible to connect between 2 seperately developed nodes.

My config file would look something like this

org.tutorial.first_extension: # {group_id}.{name} from the knime.yml
  src: C:\Users\**\OneDrive\Documents\courses\proj\new\node1\basic\tutorial_extension # Path to folder containing the extension files
  conda_env_path: C:\Users\**\anaconda3\envs\my_python_env # Path to the Python environment to use
  debug_mode: true # Optional line, if set to true, it will always use the latest changes of execute/configure, when that method is used within the KNIME Analytics Platform

org.tutorial.second_extension: # {group_id}.{name} from the knime.yml
  src: C:\Users\**\OneDrive\Documents\courses\proj\new\node1\basic\tutorial_extension2 # Path to folder containing the extension files
  conda_env_path: C:\Users\**\anaconda3\envs\my_python_env # Path to the Python environment to use
  debug_mode: true # Optional line, if set to true, it will always use the latest changes of execute/configure, when that method is used within the KNIME Analytics Platform

Hi arun696,

yes, it is possible to connect two seperately developed nodes. I now tried putting the predictor node into a second extension. Both extensions have the myfile.py. I could connect them and they run through.
Did you drag and drop the nodes again from the node repository into your workflow? Maybe in the workflow there are still old version of the nodes.

Let me know what your further exploration shows.

Best regards
Steffen

PS: Thanks for the compliment, nice to hear!

2 Likes

Yeah I think the issue was with the nodes being still older versions. It works now on drag and drop again.

I have a different problem now. As I wanted to have my node output a database connection object. I tried making the database connection object a part of the data in my port something like below

def execute(self, exec_context):
        client = pymongo.MongoClient("mymongourl")
        db = client["knimeDB"]
        return myfile.MyPortObject(myfile.MyPortObjectSpec("sdada"), {"connection": db})

But this did not seem to work as the database object couldn’t be pickled/serialized by pickle or any other libraries.

Then I took a look at source code of java mongodb connector node. In java what is being done is the database object is being writen to a synchronised hash map from one node and other node reads it from the same hashmap. I think this is possible only because both nodes have same java run time.

But in case of python nodes I think two connected nodes dont share the same runtime because I tried to write data(database connection) to a shared singleton variable from one node and read it back in from other node and this does not work.

As the nodes dont share the same python runtime how would it be possible to share my database connection object from one node to other?. (Serializing and writing to a file and reading back wont work for connection object)

1 Like

Hi @arun696,

Nice to see that you’re trying out the custom port object types for your Python nodes!

As you correctly assessed, Python nodes are not necessarily run in the same Python process, so storing content in global variables to pass it between nodes will not work.

For the your mongoDB connection this is a bit tricky. Maybe lets try to think about this in a different way: how would such a connection be shared if you wanted to use Python’s multiprocessing? As you say it should not be pickled or written to a file, this sounds to me like each process needs to connect to the server on their own? Then you might want to consider putting the credentials in an encrypted form into the port object? Or can maybe a “session” be shared across multiple connections?

Please let us know whether there is an option how sharing a mongoDB connection via a port object works for you and if not, what would be needed, so we can try to make our API more useful.

Thanks,
Carsten

1 Like

Thanks @ carstenhaubold for the response. The problem with both mentioned approaches is I will have to connect to database again in each node in my workflow. I was looking for a way where on node could handle connection to db, and the next nodes could read data.

Would it be possible to somehow make the python API work similar to java where this is possible out of the box.

We tried using flow variables as in below document to pass the database connection across nodes but currently flow variables supports only few data types

https://docs.knime.com/latest/pure_python_node_extensions_guide/index.html#_accessing_flow_variables

A suggestion would be to have flow variables support even complex or user defined data types so that the database connection could be passed on across nodes as a flow variable.

Hi @arun696,

Thanks for the details.

I was looking for a way where on node could handle connection to db, and the next nodes could read data. Would it be possible to somehow make the python API work similar to java where this is possible out of the box.

The custom port objects are exactly the way how we wanted to enable this for Python nodes. The problem here is that the mongoDB connection prevents serialization. This is reasonable because the client holds an open socket connection, for which “serialization” does not make sense.

Currently all Python nodes run in different processes. Handing over an open socket connection from one process to the other is exactly what mongoDB forbids by preventing serialization. So the workaround of passing the MongoClient between nodes via FlowVariables still does not help, because it would still have to be passed to a different process.

When searching for MongoClient and python multiprocessing it looks as if people are creating one client per process as well.

Hypothetical idea: what we could offer is the option to specify that you want all Python-based nodes of your extension to run within the same process. But that would a) prevent parallel execution of nodes and b) allow nodes to alter the “state” (=global variables) of the process so the order of execution of the nodes would have an impact on the result, meaning any workflow with branches will be prone to race conditions. This does not sound very convincing to me :slight_smile:

So for the time being, it seems there is no way around reconnecting to the DB in each node.

Hope that helps,
Carsten

4 Likes