Streaming API query

Hi

I have a node that has two inputs (data tables). One table needs to be processed completely before the second table can be processed. 

I use a column rearranger to generate my output table. Is there a better way than my implementation:

 

	public InputPortRole[] getInputPortRoles()
	{
		return new InputPortRole[] { InputPortRole.NONDISTRIBUTED_STREAMABLE,
				InputPortRole.NONDISTRIBUTED_NONSTREAMABLE };
	}

	/** {@inheritDoc} */
	@Override
	public OutputPortRole[] getOutputPortRoles()
	{
		return new OutputPortRole[] { OutputPortRole.NONDISTRIBUTED };
	}

	/** {@inheritDoc} */
	@Override
	public StreamableOperator createStreamableOperator(final PartitionInfo partitionInfo,
			final PortObjectSpec[] inSpecs) throws InvalidSettingsException
	{
		return new StreamableOperator()
		{

			@Override
			public void runFinal(PortInput[] inputs, PortOutput[] outputs, ExecutionContext exec) throws Exception
			{
				... actions on first data table
				
				// Classify
				RowInput reactions = (RowInput) inputs[0];
				RowOutput out = (RowOutput) outputs[0];

				StreamableFunction function = createColumnRearranger(reactions.getDataTableSpec(), spc)
						.createStreamableFunction();
				
				DataRow row;
				while ((row = reactions.poll()) != null)
				{
					DataRow computed = function.compute(row);
					out.push(computed);
				}
				reactions.close();
			}
		};
	}

 

Cheers

Sam

Good question Sam (sorry, I dont know the answer!).  The streaming API raises a lot of new questions I think.  Maybe there is a need for some detailed developer resource on the streaming API?

Steve

 

Hi Sam:

Isn't the "Binner (Dictionary)" the exact same type of 'challenge'? One port is streamed (and possibly distributed), the other dictionary port is static and needs to be processed beforehand?

Have a look at org.knime.base.node.preproc.binnerdictionary.BinByDictionaryNodeModel

The two important methods are those:

    /**
     * {@inheritDoc}
     */
    @Override
    public StreamableOperator createStreamableOperator(final PartitionInfo partitionInfo, final PortObjectSpec[] inSpecs)
        throws InvalidSettingsException {
        return new StreamableOperator() {

            @Override
            public void runFinal(final PortInput[] inputs, final PortOutput[] outputs, final ExecutionContext exec) throws Exception {
                DataTableSpec[] specs = new DataTableSpec[2];
                specs[0] = (DataTableSpec) inSpecs[0];
                specs[1] = (DataTableSpec) inSpecs[1];
                BufferedDataTable dict = (BufferedDataTable) ((PortObjectInput) inputs[1]).getPortObject();
                ColumnRearranger core = createColumnRearranger(specs, dict, exec);
                core.createStreamableFunction(0, 0).runFinal(inputs, outputs, exec);
            }
        };
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public InputPortRole[] getInputPortRoles() {
        return new InputPortRole[]{InputPortRole.DISTRIBUTED_STREAMABLE, InputPortRole.NONDISTRIBUTED_NONSTREAMABLE};
    }

- Bernd

Yes indeed, thanks Bernd :)

That certainly is useful.  Follow up question for reference -if the node is not using a ColumnRearranger, where should the 'set up' method from one of the ports go? (Is there a nice KNIME node example?)

Steve

Hi Steve,

That's the same method: 'runFinal' (if you have one scan over the data...). It's then your choice which of the two (or more) ports you consume ... obviously you first want to to read the dictionary and then apply it to the the main data stream.

- Bernd