Python (Labs) running out of memory while using batches

Hello,

I’ve started using Python (Labs) due to the stable release Python Script node causing memory issues when using AsyncIO. I was hoping the new batching and conversion of input tables to pandas dataframes would solve the memory issues, however the Python (Labs) node still experiences errors caused by running out of memory when using a chunk size of more than 1,000 rows.

It seems to run out of memory when adding data to the output table (knio.batch_write_table()). This will be due to the large response retrieved via the AioHttp library, along with joining the input table with the response data.

I understand the node is still in development, but I have a few questions:

  1. Could it be incompatible with asynchronous tasks?
  2. Why is it still running out of memory when accessing the inputs in batches?
  3. The docs say “previously the size of the input data was limited by the amount of RAM available on the machine, the Python Script (Labs) node can process arbitrarily large amounts of data by accessing it in batches via the .batches() method of the input table” - how does it handle output data?

The error appears as:

Execute failed: Executing the Python script failed: Error while sending a command.

However the logs read:

kernel: [49007.279872] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=/,mems_allowed=0,global_oom,task_memcg=/system.slice/cron.service,task=python,pid=42220,uid=1000
kernel: [49007.279971] Out of memory: Killed process 42220 (python) total-vm:33175100kB, anon-rss:24284592kB, file-rss:1112kB, shmem-rss:0kB, UID:1000 pgtables:56592kB oom_score_adj:0
kernel: [49007.864132] oom_reaper: reaped process 42220 (python), now anon-rss:0kB, file-rss:0kB, shmem-rss:0kB

I have made a minimalist (as possible, but still not very minimal) workflow but it’s not replicating the issue when executed. This is because the website blocks the IP, and therefore the very large response is not received and so the node wont run out of memory. I mostly included the minimal workflow to ensure I’m not using the batches wrong, or that they don’t work the same with AsyncIO, or something.

Thanks for your help.

Minimal Batch Memory Error Workflow.knwf (163.8 KB)

@Nancyjay,

There is no reason for KNIME to be incompatible with asynchronous tasks. However, Python asyncio library and anything that depends on it, is notorious for creating memory leaks which fill memory. There are a couple of things you can do to improve your code:

1/ The class and all async functions should be defined before and outside of the loop (that is everything from class…to and including the async main function. These can be in the global scope of the script, defining them in each local scope increases memory usage. It also reduces the amount of code in the loop making it much easier to read.

2/ I wrote a couple of articles on asyncio and problems that can arise. You may want to read them:

3/ I may have misunderstood your code, but it could be simplified to a single async function to retrieve the data when passed a url. You could create a list of functions to be called (you don’t need to wrap the async function in a create_task as gather will do that for you) and then use a single asyncio.gather to collate responses. Then once you have the results you can format the data in non-async code as you will not be waiting on results to be returned and there is no benefit to running this processing asynchronously.

4/ Having said all of the above, it might not be your Python code (though you should seek to simplify it). Have you tried increasing your Java heap space in knime.ini? It could be that you are running out of heap space when creating the table in memory.

DiaAzul

5 Likes

Following on from my previous post, below is a working version of the script. You may need to adjust your URL requests as many of them return 503-service unavailable, though some return data. Not sure if that is what you expect. I’ve added typing to the variables to make function calls more explicit. you may also want to look at the cookie jar as I am not sure what you are trying to collect.


import asyncio
import json
from datetime import datetime
from typing import Dict, List, Optional, Tuple

import knime_io as knio
import pandas as pd
from aiohttp import ClientResponseError, ClientSession, ClientTimeout, TCPConnector

HTTP_METHOD = "GET"
HTTP_HEADERS = {
    "upgrade-insecure-requests": "1",
    "rrt": "50",
    "downlink": "10",
    "ect": "4g",
    "sec-fetch-site": "same-origin",
    "sec-fetch-mode": "navigate",
    "sec-fetch-dest": "document",
}
URL_COLUMN = "BuiltURL"


async def fetcher(
    session: ClientSession, index: str, url: str
) -> Tuple[str, Optional[int], Optional[str]]:
    """Fetches data from a URL.

    Args:
        session (ClientSession): Client session context
        index (str): Row index in KNIME table.
        url (str): URL form with to source response

    Returns:
        Tuple[str, Optional[int], Optional[str]]: KNIME Row index, response status, body of response.
    """
    status: Optional[int] = None
    body: Optional[str] = None
    
    try:
        async with session.request(
            method=HTTP_METHOD,
            url=url,
            headers=HTTP_HEADERS,
            allow_redirects=True,
            ssl=False,
        ) as response:
            status = response.status
            body = await response.text()
    except ClientResponseError as e:
        status = e.status

    return index, status, body


async def gather_urls(
    urls: Dict[str, str]
) -> Tuple[List[Tuple[str, Optional[int], Optional[str]]], Dict[str, str]]:
    """Given a dictionary of URLs, fetch responses.
  
      Args:
          urls (Dict[str, str]): Dictionary of URLs with KNIME Row ID as key and URL as value.
  
      Returns:
          Tuple[List[Tuple[str, Optional[int], Optional[str]]], Dict[str, str]]: Returns fetched results and cookie jar for session.
    """
    results: List[Tuple[str, Optional[int], Optional[str]]] = []
    fetchers: List[asyncio.Task] = []
    cookie_jar: Dict = {}
    timeout = ClientTimeout(total=10)
    # Close underlying sockets after connection releasing - otherwise IP isn"t rotated for retries
    conn = TCPConnector(ssl=False, force_close=True)

    async with ClientSession(
        connector=conn, timeout=timeout, raise_for_status=True
    ) as session:
        cookie_jar = {cookie.key: cookie.value for cookie in session.cookie_jar}
        for index, url in urls.items():
            fetchers.append(asyncio.create_task(fetcher(session, index, url)))
        results = await asyncio.gather(*fetchers, return_exceptions=True)

    return results, cookie_jar

output_table = knio.batch_write_table()

for batch in knio.input_tables[0].batches():
    input_batch = batch.to_pandas().set_index("RowID")

    urls: Dict[str, str] = {}
    for index, row in input_batch.iterrows():
        urls[index] = row[URL_COLUMN]

    fetcher_results, cookie_jar = asyncio.run(gather_urls(urls))
    serialised_cookies = json.dumps(cookie_jar)

    batch_results = []
    for row_id, status, body in fetcher_results:
        result = {
            "RowID": row_id,
            "Status": status or 0,
            "Body": body,
            "Date": datetime.now(),
            "Cookie": serialised_cookies,
        }
        batch_results.append(result)

    batch_output_table = input_batch.join(
        pd.DataFrame(batch_results).set_index("RowID"),
        on=["RowID"],
        how="left",
        lsuffix="(left)",
        rsuffix="",
    )

    output_table.append(batch_output_table)

knio.output_tables[0] = output_table

2 Likes

Hi DiaAzul,

Thank you for your thoughtful (and thought provoking) responses. Your articles also seem very helpful and I look forward to reading them.

I do believe I am experiencing a memory leak in my code, and after seeing yours it’s clear I need to improve and optimise my code.

I appreciate your efforts in providing me with a working example. I will include as much of your logic as I can, however I had cut out quite a lot from my extended version of the minimal workflow I provided, therefore:

  1. I need to keep the queue system as there is a step where I add failed requests back to the queue to be retried after all URL’s have been retried once. I found this to be a superior method than throttling the requests.
  2. The reason Fetcher is a class is because I track the errors and fail-retry iterators by updating instance variables, and then output these to the table

I have followed your advice on defining all async functions before and outside of the loop. Unfortunately though I am still experiencing the same errors.

I have allocated 25GB in the knime.ini file, which worked well when I was using the old python node with Requests and multi-threading.

My main concern is that I’m not experiencing any benefits from batching the inputs with Python Labs. The node always fails when outputting to the Knime table.

I will try implementing your logic while retaining the queue and tracking errors and hopefully that will solve my issues.

Thanks again for your help.

2 Likes

@Nancyjay , you’re welcome; and, thanks for sharing your code.

If you want to retry requests that were unsuccessful, you may want to filter the Pandas table after you have done your initial batch of requests in the main loop to identify requests where the status is not 200. Then, with that list of URLs, repeat the request process. If this gives you more information then you can update the Pandas dataFrame before appending the batch to the KNIME table. It should be a lot simpler than implementing queues.

DiaAzul

2 Likes

Hi @DiaAzul,

I’ve accepted your answer as the solution. I modified my code as per your suggestions and it helped a lot. I am no longer getting the memory error and have been able to process more data than I previously could using batching.

I will take on your advice to retry failed requests by filtering the Pandas table rather than adding back to queue, thanks for that.

I’ve now run into a new error for which I’ve created a new discussion in the Forum, but I believe this one is a bug caused by Python Script (Labs).

I also wanted to say your articles are very informative and I’ve already learned from them.

Thanks again.

2 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.