CSV Writer finished but still writing to file. Parallel chunks corrupting data

Hi,

I am raising this as a bug as I’ve gone to extreme length bullet proofing the approach. By analyzing the results I see strong indications that the CSV Writer is still writing the data to the file but workflow execution already continues.

Let me explain the workflow:

Goal: Parallel process data but write into the same file to prevent individual chunks processing the same data (i.e. newly identified URLs during a scrape).

Approach:

  1. Create Chunks and process data
  2. Wait until all chunks placed their lock file
  3. Identify write order based on lock file creation date&time stamp
  4. Loop until all lock files coming before that of the current chunk got deleted
  5. Write results to CSV
  6. Delete lock file of current chunk
  7. Reiterate

Both tables, that of the loop end and CSV Reader, at the end should match. However, the CSV Reader clearly shows a different structure.

RowID Row Index chunk_index Chunk Count Column3
Row146825 Row Index 9335 chunk_index 14 Chunk Count 15 ?
Row146824 Row Index 8825 chunk_indeRow Index 9334 chunk_index 14 Chunk Count 15

Even by introducing a delay of two seconds to let the CSV writer finish, the issue occurs.

Assessing the data it seems that data is written more likely at overlapping chunks.


Here is the workflow I am working on:

Best
Mike

Further digging into the data, by introducing a conditional breakpoint if the schema changes, it seems the data is written in chunks of about 115 rows (30464 to 30579).

RowID Row Index chunk_index Chunk Count Row Count Column4 Column5 Column6
Row30462 Row Index 462 chunk_index 0 Chunk Count 4 Row Count: 463/2500 ? ? ?
Row30463 Row Index 463 chunk_index 0 Chunk Count 4 Row Count: 464/2500 ? ? ?
Row30464 Row Index 464 chunk_index 0 “Chunk Cou"Row Index 2500” chunk_index 1 Chunk Count 4 Row Count: 1/2500 ?
Row30465 Row Index 2501 chunk_index 1 Chunk Count 4 Row Count: 2/2500 ? ? ?
Row30466 Row Index 2502 chunk_index 1 Chunk Count 4 Row Count: 3/2500 ? ? ?

RowID Row Index chunk_index Chunk Count Row Count Column4 Column5 Column6
Row30578 Row Index 2614 chunk_index 1 Chunk Count 4 Row Count: 115/2500 ? ? ?
Row30579 Row Index 2615 chnt 4 Row Count: 465/2500 ? ? ? ?
Row30580 Row Index 465 chunk_index 0 Chunk Count 4 Row Count: 466/2500 ? ? ?

And another occasion with the exact same row distance 30694 to 30808 but this time being 114 rows.

RowID Row Index chunk_index Chunk Count Row Count Column4 Column5 Column6
Row30692 Row Index 577 chunk_index 0 Chunk Count 4 Row Count: 578/2500 ? ? ?
Row30693 Row Index 578 chunk_index 0 Chunk Count 4 Row Count: 579/2500 ? ? ?
Row30694 Row Index 579 chunk_index 0 Chunk Count 4 “Row Count: 580/2500"unk_index 1” Chunk Count 4 Row Count: 116/2500 ?
Row30695 Row Index 2616 chunk_index 1 Chunk Count 4 Row Count: 117/2500 ? ? ?
RowID Row Index chunk_index Chunk Count Row Count Column4 Column5 Column6
Row30807 Row Index 2728 chunk_index 1 Chunk Count 4 Row Count: 229/2500 ? ? ?
Row30808 "Row “Row Index 580” chunk_index 0 Chunk Count 4 Row Count: 581/2500 ? ? ?
Row30809 Row Index 581 chunk_index 0 Chunk Count 4 Row Count: 582/2500 ? ? ?

@mwiegand to be honest writing from parallel processes into the same local file is always a risk and will most likely result in failures (and this is most likely not a KNIME specific ‘bug’ but the way it is). From my perspective the best way to deal with this is to create unique file names in loop and chunk and write to individual parquet files (could also be CSV or Excel) and then import them back into a single file.

Parquet as a big data format is perfectly fine with dealing with several individual files and then treat them as one:

I think the system with loops etc will have no problem handling the files as individual files

Maybe you give it a try. I will expand the workflow over time. The other thing which could work would be to collect the results in a database like H2 or SQLite which might be able to deal with parallel accesses.

1 Like

Hi @mlauber71,

yes, I have a similar approach in place but for the given scenario, it is necessary that all chunks share some sort of memory. The real life scenario is to scrape a website in parallel chunks.

Imagine you fed a list of 10k URLs to a scraping workflow. Newly discovered URLs are added to each batch. But since you divide the URLs in the first place, you end up scraping everything X-times based on the chunk count.

This might not even be a problem of parallel writing. If you write a file of large size, the write node then finish and the next iteration starts, this might very well result in the same situation … a corrupt file.

Best
Mike

@mwiegand what you could try is use a H2 in memory database as an intermediate storage and access this storage to check if a newly find URL has already been processed. This might not be 100% reliable but it will reduce the number of duplicates

2 Likes

@mwiegand I inserted a test case in the workflow where for each chunk I will deliberately pull in some random additional rows (some of which might already have been processed. Before storing the data into the parquet file I pull the list of IDs (rank as a long format) from the H2 database and exclude the ones that have already been done.

This is not 100% perfect because within one chunk a row might already be in the works while it is also been processed in the new one. So out of 100 rows I will have 104 in the end producing some duplicates. But most duplicates will have been avoided.

Currently I store everything in the H2 and pull out only the rank. In order to speed things up you could work with a H2 database in memory that will just process the IDs.

This should also work if you produce some additional URLs (IDs) in the course of a chunk and then store them in the H2 database. If you choose shorter intervals this might result in less duplicates but cost more processing power.

1 Like

I got another idea based on the principle to prevent running into conflict when the same file is used. I encode the data in a concatenated string, hash and write that to a directory. If another parallel chunk finds the same hash, it discards it.

Nevertheless, I’d like to better understand the described issue. @thor, apologize for dragging you in here but your superb technical feedback you once gave stuck. Any idea why the CSV writer finished writing data but other chunks still ran into conflict? Even with a two seconds artificial delay and all the logic I established with lock files …

Update: I did a few more test as I had a strong opinion that the cause is rooted in the CSV writer node.

1st approach, based on the assumption the CSV writer writes in 115-ish chunks at a time, I converted each column into a collection of one cell and converted that to a string. Result, still failure.

2nd approach, attempting to eliminate the CSV writer as a cause, using Python to write the data into the CSV. Result, SUCCESS! :tada: :partying_face: :champagne: :clinking_glasses:

I will complete the example workflow later on and provide it for everyone.

Update
Aaand finished … here is the final workflow. I will also write an article about that later on, publishing it in the knowledge sharing section as well.

I’d appreciate you feedback in case you want to have a look @mlauber71

Cheers
Mike

1 Like

@mwiegand using some sort of trigger or blocking file to steer executions (or prevent them) is one way to overcome limitations in some extreme cases.

In general I would strongly advise not to write to the same file in parallel regardless if you pull it off to initially get the results. I would only use dedicated systems like databases that are built to receive write operations in parallel. Depending on the operating system, the surroundings, some aggressive virus scanner some micro delays you will most likely run into troubles in a real life scenario.

Trying to deliberately break a system might be interesting to test some fringe cases. In most productive environments there is a good chance you are setting yourself up for problems that are then complicated to first detect and then to fix. When it is not super important to squeeze the last microseconds out of a process (if this is the case you might need advanced in-memory techniques) it is best to keep things so that you can easily track them, give them unique names and better rely on a loop or something so as to safely collect. Also built-in pauses and caches might sometimes go a long way to improve stability as well as (like you have done) retry and catch constructs.

The question when it comes to robust processes is not if you are paranoid - the question is: are you paranoid enough …


on another note: I currently get an error message when trying to install the Nodepit Power Nodes. I do not have the energy to investigate. I would just use the Cache node instead.

Hey @mlauber71,

I hope you are getting better soon.

Usually system have / offer safety measures to prevent writing to the same file simultaneously. I.e. opening the same Excel file twice and writing to it usually throws an error. Though, utilizing these options seems up to the developer.

What bugs me are the safety measures taken in combination with a forced delay of two seconds still being insufficient. Even without parallel processing, it could mean by pure iteration data might get corrupted.

Or curious :grimacing:

I winder what the take of the Knime-Devs is on that …

My point simply is, following an „it just works“ expectation Knime users might have as Knime is so accessible, that this ease results in a false perception of safety. Though, I believe that With the ambition to ease access an „it just works“ mindset is what Knime-Devs might aim for.

Best
Mike

@mlauber71 D’oh! Could you shed some light on this? What error message do you get?

FYI: @qqilihq

2 Likes

@danielesser

image

knime_log.log (1.6 KB)

@mlauber71 the Noop nodes are part of the Power Nodes:

Can you find them?

@mwiegand yes I can find them but the error message stays the same.

1 Like

Thanks a lot for this, @mlauber71. This looks vaguely familiar.

Could you please do the following and check if that solves your issue:

  1. Remove the update site for NodePit 5.3 from your “Available Software Sites”.

  2. Restart your KNIME Analytics Platform.

  3. Add the update site for NodePit 5.3 again to your “Available Software Sites”:

    https://download.nodepit.com/5.3
    
  4. Install or update the NodePit Power Nodes (or any other extension).

My assumption is the configuration of our update site in KNIME is screwed up, so let’s verify. Does this solve your issue?

Best regards,
Daniel

@danielesser this seems to have solved the problem. Very strange but thanks.

2 Likes