I've recently been implementing some nodes to interact with an internal database. These nodes generally have a couple of input ports, one database connection and 1 table.
These nodes have implemented the streaming API in the following way:
/** {@inheritDoc} */
@Override
public InputPortRole[] getInputPortRoles()
{
return new InputPortRole[] { InputPortRole.NONDISTRIBUTED_NONSTREAMABLE,
InputPortRole.NONDISTRIBUTED_NONSTREAMABLE };
}
/** {@inheritDoc} */
@Override
public OutputPortRole[] getOutputPortRoles()
{
return new OutputPortRole[] {OutputPortRole.NONDISTRIBUTED };
}
@Override
public StreamableFunction createStreamableOperator(final PartitionInfo partitionInfo,
final PortObjectSpec[] inSpecs) throws InvalidSettingsException
{
return new StreamableFunction()
{
/** {@inheritDoc} */
@Override
public void runFinal(final PortInput[] inputs, final PortOutput[] outputs, final ExecutionContext ctx)
throws Exception
{
.. my code
The nodes need the full imput but provide streamed output. I'm trying to now add an additional output which is simply the input database connection. The node should pass this through to the output, if I do nothing I get the following error:
ERROR Search USPTO Database 3:1:47:0:45 (IllegalStateException): PortObject expected to be set at this point
I'm not sure what I am meant to do with my new output:
hope I understand your problem right. To pass the database connection port object you should be able to cast the respective PortOutput (at the right index) to a PortObjectOutput within the runFinal-method. The PortObjectOutput then allows you to set the port object itself.
Another remark: if you have the KNIME Testing Framework installed, you can also use the Streaming Test Executor for testing (Configure the respective node -> Job Manager Selection -> Test for Streaming ...). This might give some better hints what's going wrong in some cases.
One more thing: you should not inherit from the StreamableFunctionProducer since the to be returned StreamableFunction requires to exactly one input port to be streamable (what is not the case in your example). Just override the createStreamableOperator directly and return an instance of a StreamableOperator (instead of a StreamableFunction).
I've removed the implements StreamableFunctionProvider and now override createStreamableOperator directly.
I'm not sure if I found a bug or my implementation is a bit sketchy. The UI updates of the row counts seems to fail when I connect my database port to the WrappedNodeOutput
If my Search USPTO node is within a wrapped meta node by itself and executed under the streaming executor I get a jump from 0 to 13,200 (the total output). If it is connected to the String manipulation node I get updates as the output is pushed.
I cancelled the execution mid run to show what the UI counts look like.
When connected to another streaming node I wrote, the Search USPTO node gives an incremented count, but the next 2 nodes don't update until the whole meta node has finished. I put a break point in my Extract From Smiles node and it is pushing its output on each increment after the Search USPTO Database node making me think the UI just isn't being updated (but I should really check that the String manipulation node is getting executed).
If I remove the database connection, the streaming outputs update correctly.
It also works if I just pass through the DB connection
This is my code: https://gist.github.com/anonymous/e535dbff3c66ef3899c8be503c664a2b
Cheers
Sam
P.S. This count represents the number of read (pulled?) rows not the number of pushed rows?