Python script node error when executing in the workflow only with 38M rows, but runs fine at a few million.

Environment:

  • Virtual machine with 880 GB RAM, Windows 11
  • KNIME 5.2.3, Columnar Backend
  • Anaconda installation

I have a python script that works in configuration mode with the full 37.7M samples, and in the workflow if I row-sample my data at 5%. But when I apply the full sample (about 37.7M rows (and up to 50M coming in a later version of the project) the python script gets to 70% and then fails with this error in the KNIME Console:

ERROR Python Script 3:393 An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):

  • File “C:\ProgramData\anaconda3\envs\py3_knime_3\lib\site-packages\py4j\clientserver.py”, line 617, in _call_proxy*
  • return_value = getattr(self.pool[obj_id], method)(params)
  • File “C:\Program Files\KNIME\plugins\org.knime.python3.scripting.nodes_5.2.3.v202403221416\src\main\python_knime_scripting_launcher.py”, line 237, in closeOutputs*
  • raise e*
  • File “C:\Program Files\KNIME\plugins\org.knime.python3.scripting.nodes_5.2.3.v202403221416\src\main\python_knime_scripting_launcher.py”, line 231, in closeOutputs*
  • self._backends.tear_down_arrow(flush=check_outputs)*
  • File “C:\Program Files\KNIME\plugins\org.knime.python3.scripting.nodes_5.2.3.v202403221416\src\main\python\knime\scripting_backend.py”, line 345, in tear_down_arrow*
  • b.tear_down_arrow(flush and is_active_backend)*
  • File “C:\Program Files\KNIME\plugins\org.knime.python3.scripting.nodes_5.2.3.v202403221416\src\main\python\knime\scripting_backend.py”, line 186, in tear_down_arrow*
  • self._write_all_tables()*
  • File “C:\Program Files\KNIME\plugins\org.knime.python3.scripting.nodes_5.2.3.v202403221416\src\main\python\knime\scripting_backend.py”, line 173, in _write_all_tables*
  • table._write_to_sink(sink)*
  • File “C:\Program Files\KNIME\plugins\org.knime.python3.arrow_5.2.0.v202311290857\src\main\python\knime_arrow_table.py”, line 321, in _write_to_sink*
  • sink.write(batch)*
  • File “C:\Program Files\KNIME\plugins\org.knime.python3.arrow_5.2.0.v202311290857\src\main\python\knime_arrow_backend.py”, line 392, in write*
  • self._write_batch(data)*
  • File “C:\Program Files\KNIME\plugins\org.knime.python3.arrow_5.2.0.v202311290857\src\main\python\knime_arrow_backend.py”, line 397, in _write_batch*
  • raise ValueError(*
    ValueError: Tried writing a batch after a batch with a different size than the first batch. Only the last batch of a table can have a different size than the first batch.

Because the python script is successful in execution at 5% of sample, I am focused on issues of how KNIME and Python integration scale (which may or may not be the right lens).

I noticed by watching system resources it starts to write to disc prior to the error.I tried rebalancing KNIME.ini memory and Columnar off-Heap memory in a few ratios, but the error returns:
100Gb KNIME / 700 GB HEAP
200Gb KNIME / 600 GB HEAP
300Gb KNIME / 500 GB HEAP

Any other trouble shooting tips or clues?
Are there limits to the amount of data Python and KNIME can transfer between each other?

1 Like

You can try! 5.2.1 reinstalling in another machines! Because knime update after 5.2.1 has got many problems, in the forum, so I haven’t updated after 5.2.1. Also, the python engine I use is their bundled packages, I also frequently work with millions of rows!

@RVC2023 you can check general performance settings especially your RAM (memory) so that KNIME does have enough.

If you must you can split data you export (or import) from KNIME to Python into several Parquet files and (later) handle them as one big file:

Mor on the processing of Big Files

1 Like

Hi @RVC2023,

Sorry to hear you’re running into trouble with large data and the Python Script node. No, the amount of data should not be a problem. And – unfortunately – the knime.ini configuration Xmx and the columnar off-heap do not have an impact on the amount of memory Python uses (Python runs as a separate process and Python doesn’t have a means to limit the memory usage directly).

The error complains about varying batch sizes. (This is btw. something we’re working on to allow for the 5.3 release). Could you share your Python script, or at least an anonymized version? Are you using pyarrow to create your tables and the BatchOutputTable, or does that happen when you use pandas?

Side note @mlauber71: splitting the data into Apache Parquet files should not have any benefits over our Apache Arrow storage that we use when getting data from KNIME to Python (and in the Columnar Backend). Unless – for some reason – you really need to split the data into separate files. But the 4GB filesize limitations should be long gone I hope :wink:

Best,
Carsten

2 Likes

Can I ask:

  • How many millions of rows are you processing?

  • How much memory does your machine have?
    are you allocating to knime.ini?
    are you allocating to the columnar heap?
    What other memory allocations are you making?

Carsten,

Thank you for reaching out!

Thrilled to hear you are working on this for 5.3 When is that due to be out?

Also…splitting the file does not feel like a great option anyway.

I am using pandas.

…here is the script:

“”"
This script will merge the results of a random list of series with the client transaction file.
Provide an input of the transaction file from the client, the list of series you would like to use
The script will access FRED, calculate change metrics, then merge the results into the transaction file, and then return the merged results back into the workflow.
After this merge, you will have to re-map these results to the output mask.

INSTRUCTIONS

  1. Supply the FRED Series names and plain-english names in the dict_series_list using a dictionary format
  2. Run the script
  3. Remember to assign the newly merged series to the output mask.

“”"

#Clear memory for script run
globals().clear()

Import requisite libraries

import pandas as pd
import numpy as np
import pyfredapi as pf
import os
import datetime
import time

Set runmode (1-Development in Spyder , 0 - Production KNIME)

runmode = 1

EXECUTION PARAMETERS

dict_series_list = {‘CPIRECSL’:‘CPI - Recreation’,
‘PCUATRNWRATRNWR’:‘PPI - Transp. & Warehousing’,
‘FEDFUNDS’:‘Federal Funds Effective Rate’,
‘USACPALTT01CTGYM’:‘CPI-ALL’,
‘wpu80’:‘Construction (Partial)’,
‘HOUST1F’:‘New Private Housing’,
‘RHEACBW027SBOG’:‘Real Estate Loans: Residential Real Estate Loans: Revolving Home Equity Loans, All Commercial Banks’
}
#input_path = “x”
input_path = “x”
#output_path = x"
output_path = “x”

Pull in transaction data file, import matplotlib if running in DEVELOPMENT mode

if runmode==1:
pd.set_option(‘display.max_columns’, None) # See all columns in pd dataFrame
pd.set_option(‘display.max_colwidth’, 20) # Limit column width in pd dataFrame
pd.set_option(‘display.expand_frame_repr’, False) # Remove truncated outputs
st_df = pd.read_csv(input_path+“InitialAssembly100_PCT.csv”, nrows=10000)
#st_df = pd.read_csv(input_path+“InitialAssembly100_PCT.csv”, nrows=10000)
import matplotlib.pyplot as plt
else:
import knime.scripting.io as knio # Imports KNIME libraries when in the KNIME environment
st_df = knio.input_tables[0].to_pandas()

Set FRED_API_KEY as an environmental variable

os.environ[‘FRED_API_KEY’] = ‘1c6a58b07dfeb3b40c85c4de15d4a84d’

Create a dictionary to store dataframes for each item

item_series_dfs = {}

def get_series(item):
#Pull series information and store it as a pandas dataframe
Series_df = pd.DataFrame(pf.get_series(series_id=item))

# Keep the date and value fields
Series_df = Series_df.iloc[:,-2:]

Series_df[item+'Series'] = item
Series_df[item+'SeriesName'] =  dict_series_list[item]

# Format the date and extract Year/Month fields for merge with transaction data
Series_df['date'] = pd.to_datetime(Series_df['date'])
Series_df['Month'] = Series_df['date'].dt.month
Series_df['Year'] = Series_df['date'].dt.year
Series_df[item+'value'] = Series_df['value']

# Calculate 3-month and 12-month rolling averages
Series_df[item+'3MRoll'] = Series_df[item+'value'].rolling(window=3).mean()
Series_df[item+'12MRoll'] = Series_df[item+'value'].rolling(window=12).mean()

# Shift data by 1 unit of previous period
df_prev1 = Series_df.shift(12)                   # Look back 12 month
df_prev3 = Series_df.shift(12)                   # Look back 12 months
df_prev12 = Series_df.shift(12)                 # Look back 12 months

# Calculate rolling averages for previous year
Series_df[item+'1M_prev'] = df_prev1[item+'value']                                       # Take the value from 1 month ago          
Series_df[item+'3MRoll_prev'] = df_prev3[item+'value'].rolling(window=3).mean()          # Take the mean value of the last 3 months from 3 months ago
Series_df[item+'12MRoll_prev'] = df_prev12[item+'value'].rolling(window=12).mean()       # Take the mean value of the last 12 months from 12 months ago

# Calculate percent change in rolling averages
Series_df[item+'1M_CHG_PERC'] = (Series_df[item+'value'] - Series_df[item+'1M_prev'] ) / Series_df[item+'1M_prev']                       # 1 month % change relative to past 1 month
Series_df[item+'3MRoll_CHG_PERC'] = (Series_df[item+'3MRoll'] - Series_df[item+'3MRoll_prev']) / Series_df[item+'3MRoll_prev']         # 3 months % change relative to past 3 months
Series_df[item+'12MRoll_CHG_PERC'] = (Series_df[item+'12MRoll'] - Series_df[item+'12MRoll_prev']) / Series_df[item+'12MRoll_prev']     # 12 months % change relative to past 12 months

if runmode==1:
    # Plot the time series
    plt.figure(figsize=(12, 6))
    plt.plot(Series_df['date'], Series_df[item+'1M_CHG_PERC'], label='1 Month % Change')
    plt.plot(Series_df['date'], Series_df[item+'3MRoll_CHG_PERC'], label='3 Month Rolling Average % Change')
    plt.plot(Series_df['date'], Series_df[item+'12MRoll_CHG_PERC'], label='12 Month Rolling Average % Change')
    plt.xlabel('Date')
    plt.ylabel('Percent Change')
    plt.title('Percent Change in Rolling Averages'+Series_df[item+'Series'].iloc[1,])
    plt.legend()
    plt.grid(True)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()

# Store the dataframe in the dictionary
item_series_dfs[item + '_df'] = Series_df

return pd.DataFrame(Series_df)

Create an empty dataframe for the results from FRED

fredresults_df = pd.DataFrame()

Create a WIDE format result with FRED Series information, this also will become teh source for item_series_dfs which is a dictionary of dataframes (item_series_dfs)

fredresults_df = pd.concat([get_series(item) for item in dict_series_list], ignore_index=True)

RETURN a dict of dataFRAMES we can process and merge seperately

Sort the dictionary by row count in descending order

sorted_dfs = sorted(item_series_dfs.items(), key=lambda x: len(x[1]), reverse=True)

Initialize the merged dataframe with the first dataframe

merged_df = sorted_dfs[0][1]

Left join the remaining dataframes in sorted order with explicit suffixes

for i, (, df) in enumerate(sorted_dfs[1:], start=2):
suffix = f’
{i}’
merged_df = pd.merge(merged_df, df, on=[‘Year’, ‘Month’], how=‘left’, suffixes=(‘’, suffix))

Drop duplicate rows based on ‘Year’ and ‘Month’, keeping the earliest ‘date’ value

merged_df = merged_df.sort_values(‘date’).drop_duplicates(subset=[‘Year’, ‘Month’], keep=‘first’)

Reset the index

merged_df.reset_index(drop=True, inplace=True)

Identify and rename duplicate columns in merged_df

merged_df = merged_df.loc[:,~merged_df.columns.duplicated()]

Identify and rename duplicate columns in merged_df

merged_df = merged_df.loc[:,~merged_df.columns.duplicated()]

Drop columns starting with ‘date’ and ‘value’

merged_df = merged_df.loc[:,~merged_df.columns.str.startswith(‘date’)]
fredresults_df = merged_df.loc[:,~merged_df.columns.str.startswith(‘value’)]

Display the final dataframe

if runmode==1:
print(fredresults_df)

Merge the Series data to the transaction data

Left join st_df to fredresults_df on ‘Year’ and ‘Month’

st_df = st_df.merge(fredresults_df, on=[‘Year’, ‘Month’], how=‘left’)

Display the result

print(st_df)

Push the table out to KNIME

if runmode==1:
print(fredresults_df.columns)
# Create a DataFrame with just the column headings and data types
column_info = pd.DataFrame({
‘Column’: fredresults_df.columns,
‘Data Type’: fredresults_df.dtypes
})

# Write the DataFrame to a CSV file
column_info.to_csv(output_path+'fredresults_column_info.csv', index=False)
st_df.to_csv(input_path+"InitialAssembly100_PCT_FREDSeriesInc.csv", index=False)

else:
knio.output_tables[0] = knio.Table.from_pandas(st_df)

@RVC2023 you might want to consider using a preformatted code or bring the code as an attachment.

1 Like

I am using very low-end PC, but didn’t tried data like yours. But I use heavy coding and complex data manipulation like 20 python scripts out of 40 nodes in a workflow. Didnt found any issues like you. But I will experiment Billions of Rows, after hearing your incident and I will experiment in 32 gb ram.

1 Like

Thrilled to hear you are working on this for 5.3 When is that due to be out?

Traditionally the next KNIME release will happen in summer, probably July

I am using pandas.

Thank you for the script! Hm. Interesting, this seems to be an issue when we convert this large pandas dataframe into a pyarrow table (which we do under the hood because we store data in the Arrow format). We’ll try to reproduce the issue and take a look – and also check whether our variable batch size fix for 5.3 helps in this case, too. Development ticket AP-22376.

Cheers,
Carsten

2 Likes

We’ll try to reproduce the issue and take a look – and also
…great! The load is about 30-40GB because rows are wide X 37.7M rows. Not sure if that has an impact.

check whether our variable batch size fix for 5.3 helps in this case, too. Development ticket AP-22376.
…I am somewhat new to the community…where do I go to get to the development ticket and apply any patches to my environment?

@carstenhaubold from my experience with big data there is either the chance that a system will manage such tasks by itself - or if it isn’t you can try to do it in small (and maybe somewhat stupid) iterations that would then at least get the job done.

It is good that KNIME does support formats like CSV, Parquet and ORC so there are always options how to handle data. If the underlying Arrow format does it that is just great.

1 Like

Ah, sorry I wasn’t more specific here: This means we have created a ticket so that it’s on the list for our developers. The tickets themselves are not visible externally, but in each release changelog we list which tickets have been solved with this release. And we internally reference this forum post from the ticket, so as soon as we have a fix available in the KNIME nightly build we can let you know to try it out.

2 Likes

Carsten…another bit of evidence this might be related to transfer size limitations:

  • I partitioned the workflow 50% and ran the same script (unaltered) on each thread and then concatenated. Both partitions executed successfully and I got the desired result. Yeah!

  • Then the workflow went to the next python node (which previously succeeded) and it failed with the same error. Again, that script executes fine in Spyder, and in the configuration mode of the node, but fails in the context of an executing workflow.

For this second script, I do need all rows of the whole dataset to make it work, but am now trying to find workarounds on the size issue by limiting columns to be input (and then try a merge if I can get it to run successfully).

@RVC2023 I don’t think it is a problem with memory size per-se, but I guess you’ve found a bug in our code that only shows when pyarrow deals with data that satisfies some criteria, such as being large :slight_smile:

I dug a little deeper, followed you stack trace and read through more of the pyarrow API docs. We are using the method pyarrow.Table.to_batches() under the hood to write “batches” of the table to disk instead of the full table. This method has a max_chunksize parameter which we’ve set to the number of rows we’d like (based on the amount of data in the table). However, the docs have a sidenote:

Individual chunks may be smaller depending on the chunk layout of individual columns.

Up to now we never ran into that and thus worked with the assumption, that all batches returned from this method are equally sized. But in your case, that assumption apparently did not hold.

I’ll adjust the ticket that I created for the investigation to fix this behavior (we can simply slice the table manually and create equally sized batches ourselves). But this is only needed as a fix for KNIME 5.2. In KNIME 5.3 we’ll allow varying batch sizes anyways and will remove the assertion check that you ran into.

As these fixes will take a little bit of time to reach you: did you find a workaround for now?

2 Likes