Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adlfs replaces main thread event loop #291

Open
Eugeny opened this issue Nov 22, 2021 · 5 comments
Open

adlfs replaces main thread event loop #291

Eugeny opened this issue Nov 22, 2021 · 5 comments

Comments

@Eugeny
Copy link

Eugeny commented Nov 22, 2021

Opening an AzureBlobFile will swap out the calling thread's event loop for fsspec thread's own event loop. If it's required for Azure SDK that's all nice and good, but adlfs should clean up after itself.

@Artimi
Copy link

Artimi commented Dec 14, 2023

Hi, I'd like to chip in to the issue to highlight its severity. adlfs behavior brought me some more wrinkles while trying to figure out what is going on.

fsspec is creating a loop in subthread which is fine and causes no issues https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py#L135. However, adlfs when trying to connect https://github.com/fsspec/adlfs/blob/main/adlfs/spec.py#L495 takes this loop which is running in a subthread and set it as loop in the main thread. This cannot work because when main thread wants to work with the loop later and calls asyncio.get_event_loop it gets the loop that is running in the subthread and get an error RuntimeError('This event loop is already running') when trying to call loop.run_until_complete.

  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 625, in run_until_complete
    self._check_running()
 
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 584, in _check_running
    raise RuntimeError('This event loop is already running')

You can see the problem in following example

import asyncio
import threading
import time


def background_task():
    print("background_task")
    time.sleep(0.9)
    print("background_task done")


def start_subthread():
    loop = asyncio.new_event_loop()
    event_loop_thread = threading.Thread(target=loop.run_forever)
    event_loop_thread.daemon = True
    event_loop_thread.start()

    asyncio.set_event_loop(loop)

    print("LOOP SUBTHREAD", loop, "id", id(loop), "running", loop.is_running())
    return loop, event_loop_thread


def stop_subthread(loop, event_loop_thread):
    loop.call_soon_threadsafe(loop.stop)
    event_loop_thread.join()


def main():
    subthread_loop, event_loop_thread = start_subthread()

    main_thread_loop= asyncio.get_event_loop()
    print("LOOP MAIN THREAD", main_thread_loop, "id", id(main_thread_loop), "running", main_thread_loop.is_running())
    main_thread_loop.run_until_complete(asyncio.sleep(1))

    subthread_loop.call_soon_threadsafe(
        background_task,
    )

    print("LOOP MAIN THREAD", main_thread_loop, "id", id(main_thread_loop), "running", main_thread_loop.is_running())
    main_thread_loop.run_until_complete(asyncio.sleep(1))

    stop_subthread(subthread_loop, event_loop_thread)


if __name__ == '__main__':
    main()

I should be able to run main_thread_loop.run_until_complete(asyncio.sleep(1)) however I get

Traceback (most recent call last):
  File "/Users/user/project/debug_loop_not_running.py", line 49, in <module>
    main()
  File "/Users/user/project/debug_loop_not_running.py", line 36, in main
    main_thread_loop.run_until_complete(asyncio.sleep(1))
  File "/Users/user/.pyenv/versions/3.10.10/lib/python3.10/asyncio/base_events.py", line 625, in run_until_complete
    self._check_running()
  File "/Users/user/.pyenv/versions/3.10.10/lib/python3.10/asyncio/base_events.py", line 584, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
LOOP SUBTHREAD <_UnixSelectorEventLoop running=True closed=False debug=False> id 4546719616 running True
LOOP MAIN THREAD <_UnixSelectorEventLoop running=True closed=False debug=False> id 4546719616 running True
sys:1: RuntimeWarning: coroutine 'sleep' was never awaited

So using adlfs prevents later usage of asyncio event loop which I consider a major bug and blocker.

@Eugeny
Copy link
Author

Eugeny commented Dec 14, 2023

I've ended up wrapping all calls in this context manager:

@contextlib.contextmanager
def wrap_fss():
    try:
        loop = asyncio.get_event_loop()
    except RuntimeError:
        loop = None
    try:
        asyncio.set_event_loop(fsspec.asyn.get_loop())
        yield
    finally:
        asyncio.set_event_loop(loop)

@Artimi
Copy link

Artimi commented Dec 14, 2023

Thanks for the workaround!
Just to be clear I think that it would be better to not set the event loop in main thread in the first place.

@TomAugspurger
Copy link
Contributor

TomAugspurger commented Dec 14, 2023 via email

@Artimi
Copy link

Artimi commented Dec 15, 2023

Thanks for the context. I'm not so familiar with adlfs and fsspec or azure sdk to be able to see all implications.

Maybe we could just use the snippet by @Eugeny in #291 (comment): thus when RuntimeError is raised first get the current event loop and then make sure to set it in finally.

Just beware that set_event_loop is used at least 2 times in this file and I don't know whether it is used elsewhere.

I can see also another problem here

def do_connect():
  try:
    ...
  except RuntimeError:
    self.do_connect()

May create an endless recursion if RuntimeError is raised every time. In the end this will fail on RecursionError.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants