Good question Sam (sorry, I dont know the answer!). The streaming API raises a lot of new questions I think. Maybe there is a need for some detailed developer resource on the streaming API?
Isn't the "Binner (Dictionary)" the exact same type of 'challenge'? One port is streamed (and possibly distributed), the other dictionary port is static and needs to be processed beforehand?
Have a look at org.knime.base.node.preproc.binnerdictionary.BinByDictionaryNodeModel
The two important methods are those:
/**
* {@inheritDoc}
*/
@Override
public StreamableOperator createStreamableOperator(final PartitionInfo partitionInfo, final PortObjectSpec[] inSpecs)
throws InvalidSettingsException {
return new StreamableOperator() {
@Override
public void runFinal(final PortInput[] inputs, final PortOutput[] outputs, final ExecutionContext exec) throws Exception {
DataTableSpec[] specs = new DataTableSpec[2];
specs[0] = (DataTableSpec) inSpecs[0];
specs[1] = (DataTableSpec) inSpecs[1];
BufferedDataTable dict = (BufferedDataTable) ((PortObjectInput) inputs[1]).getPortObject();
ColumnRearranger core = createColumnRearranger(specs, dict, exec);
core.createStreamableFunction(0, 0).runFinal(inputs, outputs, exec);
}
};
}
/**
* {@inheritDoc}
*/
@Override
public InputPortRole[] getInputPortRoles() {
return new InputPortRole[]{InputPortRole.DISTRIBUTED_STREAMABLE, InputPortRole.NONDISTRIBUTED_NONSTREAMABLE};
}
That certainly is useful. Follow up question for reference -if the node is not using a ColumnRearranger, where should the 'set up' method from one of the ports go? (Is there a nice KNIME node example?)
That's the same method: 'runFinal' (if you have one scan over the data...). It's then your choice which of the two (or more) ports you consume ... obviously you first want to to read the dictionary and then apply it to the the main data stream.