Streaming with a database input

Good Afternoon

I've recently been implementing some nodes to interact with an internal database. These nodes generally have a couple of input ports, one database connection and 1 table. 

These nodes have implemented the streaming API in the following way:

<NodeName>NodeModel implements StreamableFunctionProvider

Overide some methods:

    /** {@inheritDoc} */
    @Override
    public InputPortRole[] getInputPortRoles()
    {
        return new InputPortRole[] { InputPortRole.NONDISTRIBUTED_NONSTREAMABLE,
                InputPortRole.NONDISTRIBUTED_NONSTREAMABLE };
    }

    /** {@inheritDoc} */
    @Override
    public OutputPortRole[] getOutputPortRoles()
    {
        return new OutputPortRole[] {OutputPortRole.NONDISTRIBUTED };
    }

	@Override
	public StreamableFunction createStreamableOperator(final PartitionInfo partitionInfo,
			final PortObjectSpec[] inSpecs) throws InvalidSettingsException
	{
		return new StreamableFunction()
		{
			
			/** {@inheritDoc} */
			@Override
			public void runFinal(final PortInput[] inputs, final PortOutput[] outputs, final ExecutionContext ctx)
					throws Exception
			{	

.. my code

 

The nodes need the full imput but provide streamed output. I'm trying to now add an additional output which is simply the input database connection. The node should pass this through to the output, if I do nothing I get the following error:

ERROR Search USPTO Database 3:1:47:0:45 (IllegalStateException): PortObject expected to be set at this point

I'm not sure what I am meant to do with my new output:

org.knime.core.streaming.inoutput.NonTableOutputCache$SyncedPortObjectOutput

How do I go about passing the Database connection port through the streaming execution?

Cheers

Sam

 

 

 

Hi Sam,

hope I understand your problem right. To pass the database connection port object you should be able to cast the respective PortOutput (at the right index) to a PortObjectOutput within the runFinal-method. The PortObjectOutput then allows you to set the port object itself.

Another remark: if you have the KNIME Testing Framework installed, you can also use the Streaming Test Executor for testing (Configure the respective node -> Job Manager Selection -> Test for Streaming ...). This might give some better hints what's going wrong in some cases.

Best,

Martin

One more thing: you should not inherit from the StreamableFunctionProducer since the to be returned StreamableFunction requires to exactly one input port to be streamable (what is not the case in your example). Just override the createStreamableOperator directly and return an instance of a StreamableOperator (instead of a StreamableFunction).

Thanks Martin

I've removed the implements StreamableFunctionProvider and now override createStreamableOperator directly. 

I'm not sure if I found a bug or my implementation is a bit sketchy. The UI updates of the row counts seems to fail when I connect my database port to the WrappedNodeOutput

 

If my Search USPTO node is within a wrapped meta node by itself and executed under the streaming executor I get a jump from 0 to 13,200 (the total output). If it is connected to the String manipulation node I get updates as the output is pushed. 

I cancelled the execution mid run to show what the UI counts look like. 

When connected to another streaming node I wrote, the Search USPTO node gives an incremented count, but the next 2 nodes don't update until the whole meta node has finished. I put a break point in my Extract From Smiles node and it is pushing its output on each increment after the Search USPTO Database node making me think the UI just isn't being updated (but I should really check that the String manipulation node is getting executed).

If I remove the database connection, the streaming outputs update correctly. 

It also works if I just pass through the DB connection

 

This is my code: https://gist.github.com/anonymous/e535dbff3c66ef3899c8be503c664a2b

Cheers

Sam

 

P.S. This count represents the number of read (pulled?) rows not the number of pushed rows?

 

Are at least the results at the outports of the according wrapped streamed metanode as expected?

Yep, table difference checker shows the same output for the streamed and non streamed run.