I have a node that does database lookups. Currently it uses ColumnRearranger with a CellFactory. The issue here is, that this leads to a database call for each row which is slow and I could group calls into fewer ones using WHERE xyz IN…instead of WHERE xyz =…
I would need some hint on how to achieve that as ColumnRearranger with a CellFactory seems to work strictly row by row and expects an immediate answer.
Is there a way to achieve this with existing “components” or do I need to code it from scratch?
Hi @beginner,
there is no BatchedColumnRearanger you can just re-use. So you will need to fall back to just looping over the input data yourself.
If you want it to be simple just always process the whole input table in one request your node, then users need use the ChunkLoopStart to specify the chunk size. However, if you want to be fancy, you could also implement the createStreamableOperator and getInputPortRoles methods to make your node streamable and offload the chunking to the streaming executor.
Now, you can do the work of your output generation in the ColumnRearranger, using the resultCache, e.g.
private ColumnRearranger createColumnRearranger(DataTableSpec spec){
ColumnRearranger rearranger = new ColumnRearranger(spec);
//Do stuff here to create your added columns etc, into a DataColumnSpec[] newColSpecs...
rearranger.append(new AbstractCellFactory(newColSpecs) {
@Override
public DataCell[] getCells (DataRow row) {
//Do whatever here to get your result 'key' as it is in the resultCache from the incoming row
String key = ...;
if(!resultCache.containsKey(key)){
getNextResultBlock();
}
Object res = resultCache.get(key);
//Do whatever with the result object to convert it into new cells...
return ....;
}
}
}
Implement the #getNextResultBlock() method to query your database, using the iterator field:
public void getNextResultBlock() {
//Clear the existing results
resultCache.clear();
Set<String> keys = new HashSet<>(); //For the keys of your 'in' clause
int rowCount = 0;
while(rowIter.hasNext() && rowCount < chunkSize) {
DataRow r = rowIter.next();
// get your key from the row and add it to the keys set...
keys.add(...);
}
//Run the query, you could use keys.stream().collect(Collectors.joining("','") to build the IN clause
//Add the result to the resultCache...
}
The main thing is that as it stands there is no synchronisation, so make sure the CellFactory isn’t configure to perform parallel execution. Otherwise, it works (we use it to roll calls to RESTful webservices).