Autobahn lost transport

#1

All,

I’m getting a strange disconnect on long running functions (approx 15 seconds):

onLeave: CloseDetails(reason = wamp.close.transport_lost, message = ‘WAMP transport was lost without closing the session before’’)

I’m running my python in 3.5 but I was getting the same behavior in python 3.4.0 as well, here is traceback:

File “/usr/local/lib/python3.5/site-packages/txaio/aio.py”, line 329, in done

x = callback(res)

File “/usr/local/lib/python3.5/site-packages/autobahn/wamp/protocol.py”, line 796, in success

‘NoneType’ object has no attribute ‘send’

self._transport.send(reply)

AttributeError: ‘NoneType’ object has no attribute ‘send’

Exception in callback add_callbacks..done(<Task finishe…sh.log.tar"]’>) at /usr/local/lib/python3.5/site-packages/txaio/aio.py:325

handle: <Handle add_callbacks..done(<Task finishe…sh.log.tar"]’>) at /usr/local/lib/python3.5/site-packages/txaio/aio.py:325>

Traceback (most recent call last):

File “/usr/local/lib/python3.5/site-packages/txaio/aio.py”, line 329, in done

x = callback(res)

File “/usr/local/lib/python3.5/site-packages/autobahn/wamp/protocol.py”, line 796, in success

self._transport.send(reply)

AttributeError: ‘NoneType’ object has no attribute ‘send’

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File “/opt/python/Python-3.5.0/Lib/asyncio/events.py”, line 125, in _run

self._callback(*self._args)

File “/usr/local/lib/python3.5/site-packages/txaio/aio.py”, line 334, in done

errback(create_failure())

File “/usr/local/lib/python3.5/site-packages/autobahn/wamp/protocol.py”, line 818, in error

del self._invocations[msg.request]

KeyError: 192

I AM running async coroutines recursively like this (simplified):

@asyncio.coroutine

def recurse_tar(self, tar, passed_path=""):

try:

for member in tar.getmembers():

ext = [".tar", “.tgz”, “.tar.gz”]

if member.name.endswith(tuple(ext)):

z = None

f = None

try:

full_path = os.path.join(tar.name, passed_path, member.name)

print(full_path)

self.search_result.append(full_path)

z = tar.extractfile(member)

if not z:

print(Fore.ORANGE + z + " is NONE!" + Fore.RESET)

f = tarfile.open(fileobj=z)

if not f:

print(Fore.ORANGE + f + " is NONE!" + Fore.RESET)

yield from asyncio.ensure_future(self.recurse_tar(f, full_path))

except Exception as e:

print(Fore.RED + str(e), member.name + Fore.RESET)

finally:

f.close()

z.close()

self.counter += 1

else:

pass # TODO: Logic for file processing

except Exception as e:

print(Fore.RED + str(e) + Fore.RESET)

finally:

if tar:

tar.close()

@asyncio.coroutine

def parse(file_full_path)

tasks = []

for …

tasks.append(asyncio.ensure_future(self.recurse_tar(tarfile.open(file_full_path))))

yield from asyncio.wait(tasks)

A couple of questions:

What are the conditions that cause a WAMP transport to close with the above error?

Is there a time limit or quota for WAMP transports?

Am I going down the wrong troubleshooting path entirely?

Any help appreciated…

Dave

0 Likes

#2

Hi Dave!

No idea about the nested coroutines, but there are no time limits for WAMP transports. The entire idea of WebSocket is to have persistent connections.

I’m forwarding your question to somebody who can hopefully give you more help!

Regards,

Alex

···

Am Montag, 21. September 2015 20:12:15 UTC+2 schrieb Dave Thomas:

All,

I’m getting a strange disconnect on long running functions (approx 15 seconds):

onLeave: CloseDetails(reason = wamp.close.transport_lost, message = ‘WAMP transport was lost without closing the session before’’)

I’m running my python in 3.5 but I was getting the same behavior in python 3.4.0 as well, here is traceback:

File “/usr/local/lib/python3.5/site-packages/txaio/aio.py”, line 329, in done

x = callback(res)

File “/usr/local/lib/python3.5/site-packages/autobahn/wamp/protocol.py”, line 796, in success

‘NoneType’ object has no attribute ‘send’

self._transport.send(reply)

AttributeError: ‘NoneType’ object has no attribute ‘send’

Exception in callback add_callbacks..done(<Task finishe…sh.log.tar"]’>) at /usr/local/lib/python3.5/site-packages/txaio/aio.py:325

handle: <Handle add_callbacks..done(<Task finishe…sh.log.tar"]’>) at /usr/local/lib/python3.5/site-packages/txaio/aio.py:325>

Traceback (most recent call last):

File “/usr/local/lib/python3.5/site-packages/txaio/aio.py”, line 329, in done

x = callback(res)

File “/usr/local/lib/python3.5/site-packages/autobahn/wamp/protocol.py”, line 796, in success

self._transport.send(reply)

AttributeError: ‘NoneType’ object has no attribute ‘send’

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File “/opt/python/Python-3.5.0/Lib/asyncio/events.py”, line 125, in _run

self._callback(*self._args)

File “/usr/local/lib/python3.5/site-packages/txaio/aio.py”, line 334, in done

errback(create_failure())

File “/usr/local/lib/python3.5/site-packages/autobahn/wamp/protocol.py”, line 818, in error

del self._invocations[msg.request]

KeyError: 192

I AM running async coroutines recursively like this (simplified):

@asyncio.coroutine

def recurse_tar(self, tar, passed_path=""):

try:

for member in tar.getmembers():

ext = [".tar", “.tgz”, “.tar.gz”]

if member.name.endswith(tuple(ext)):

z = None

f = None

try:

full_path = os.path.join(tar.name, passed_path, member.name)

print(full_path)

self.search_result.append(full_path)

z = tar.extractfile(member)

if not z:

print(Fore.ORANGE + z + " is NONE!" + Fore.RESET)

f = tarfile.open(fileobj=z)

if not f:

print(Fore.ORANGE + f + " is NONE!" + Fore.RESET)

yield from asyncio.ensure_future(self.recurse_tar(f, full_path))

except Exception as e:

print(Fore.RED + str(e), member.name + Fore.RESET)

finally:

f.close()

z.close()

self.counter += 1

else:

pass # TODO: Logic for file processing

except Exception as e:

print(Fore.RED + str(e) + Fore.RESET)

finally:

if tar:

tar.close()

@asyncio.coroutine

def parse(file_full_path)

tasks = []

for …

tasks.append(asyncio.ensure_future(self.recurse_tar(tarfile.open(file_full_path))))

yield from asyncio.wait(tasks)

A couple of questions:

What are the conditions that cause a WAMP transport to close with the above error?

Is there a time limit or quota for WAMP transports?

Am I going down the wrong troubleshooting path entirely?

Any help appreciated…

Dave

0 Likes

#3

Hi Dave,

I think this is because the file I/O is happening for too long and is blocking the event loop, causing the underlying TCP connection to time-out before Autobahn can send a ping/pong to keep it open.

When you do the yield from, it’s actually calling it synchronously, meaning it will actually keep doing it until it has finished completely, and THEN allow the event loop to continue processing. Asyncio’s Futures, like Deferreds, seem to like doing things synchronously if they can.

The solution to this is either:

  1. Don’t do yield from directly, use something like call_soon (https://docs.python.org/3.5/library/asyncio-eventloop.html#asyncio.BaseEventLoop.call_soon) to queue the next time it recurses
  2. Do a https://docs.python.org/3.5/library/asyncio-task.html#asyncio.sleep for a very short amount of time, allowing the event loop to get control back,
    or 3. Do the work in a thread/subprocess. Because this file I/O is blocking, 1 and 2 will only mitigate the problem, but it will still block the main thread a lot (just not enough to cause timeouts). Since you’re using asyncio, I think that https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor will most likely be what you want (but I’m a Twisted person so I’m only guessing at which API does this correctly).

Hope this helps,

Amber “Hawkie” Brown
GPG: https://keybase.io/hawkowl
haw...@atleastfornow.net

···

On Tuesday, 22 September 2015 02:12:15 UTC+8, Dave Thomas wrote:

All,

I’m getting a strange disconnect on long running functions (approx 15 seconds):

onLeave: CloseDetails(reason = wamp.close.transport_lost, message = ‘WAMP transport was lost without closing the session before’’)

I’m running my python in 3.5 but I was getting the same behavior in python 3.4.0 as well, here is traceback:

File “/usr/local/lib/python3.5/site-packages/txaio/aio.py”, line 329, in done

x = callback(res)

File “/usr/local/lib/python3.5/site-packages/autobahn/wamp/protocol.py”, line 796, in success

‘NoneType’ object has no attribute ‘send’

self._transport.send(reply)

AttributeError: ‘NoneType’ object has no attribute ‘send’

Exception in callback add_callbacks..done(<Task finishe…sh.log.tar"]’>) at /usr/local/lib/python3.5/site-packages/txaio/aio.py:325

handle: <Handle add_callbacks..done(<Task finishe…sh.log.tar"]’>) at /usr/local/lib/python3.5/site-packages/txaio/aio.py:325>

Traceback (most recent call last):

File “/usr/local/lib/python3.5/site-packages/txaio/aio.py”, line 329, in done

x = callback(res)

File “/usr/local/lib/python3.5/site-packages/autobahn/wamp/protocol.py”, line 796, in success

self._transport.send(reply)

AttributeError: ‘NoneType’ object has no attribute ‘send’

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

File “/opt/python/Python-3.5.0/Lib/asyncio/events.py”, line 125, in _run

self._callback(*self._args)

File “/usr/local/lib/python3.5/site-packages/txaio/aio.py”, line 334, in done

errback(create_failure())

File “/usr/local/lib/python3.5/site-packages/autobahn/wamp/protocol.py”, line 818, in error

del self._invocations[msg.request]

KeyError: 192

I AM running async coroutines recursively like this (simplified):

@asyncio.coroutine

def recurse_tar(self, tar, passed_path=""):

try:

for member in tar.getmembers():

ext = [".tar", “.tgz”, “.tar.gz”]

if member.name.endswith(tuple(ext)):

z = None

f = None

try:

full_path = os.path.join(tar.name, passed_path, member.name)

print(full_path)

self.search_result.append(full_path)

z = tar.extractfile(member)

if not z:

print(Fore.ORANGE + z + " is NONE!" + Fore.RESET)

f = tarfile.open(fileobj=z)

if not f:

print(Fore.ORANGE + f + " is NONE!" + Fore.RESET)

yield from asyncio.ensure_future(self.recurse_tar(f, full_path))

except Exception as e:

print(Fore.RED + str(e), member.name + Fore.RESET)

finally:

f.close()

z.close()

self.counter += 1

else:

pass # TODO: Logic for file processing

except Exception as e:

print(Fore.RED + str(e) + Fore.RESET)

finally:

if tar:

tar.close()

@asyncio.coroutine

def parse(file_full_path)

tasks = []

for …

tasks.append(asyncio.ensure_future(self.recurse_tar(tarfile.open(file_full_path))))

yield from asyncio.wait(tasks)

A couple of questions:

What are the conditions that cause a WAMP transport to close with the above error?

Is there a time limit or quota for WAMP transports?

Am I going down the wrong troubleshooting path entirely?

Any help appreciated…

Dave

0 Likes

#4

Thanks Amber! I’ve done a LOT of research on this and even found another post in the crossbar group here:
https://groups.google.com/forum/#!searchin/crossbario/“auto_ping_timeout”$3A/crossbario/-U7Z1Mh4760/909n7O6RjeIJ

Looks like the solution to this is to not block the event loop that autobahn is running within ApplicationRunner.

I’ll post a more complete solution here when I get it working. What makes it tough is that in .NET Async/Await will automatically spawn threads for background tasks. Though in python 3 asyncio, “awaiting” a long running task will not do parallelism (yes I have read everything there is to read about the GIL in the last week :wink: ) it only manages concurrency. My project suffers from latency from IO operations on the network, normal database stuff which asyncio helps tremendously with, but additionally high CPU load, like in the example I attached un-tarring files, which needs the maximum amount of parallelism.

I’ll follow option 3 you recommend above as that is also the approach that aioprocessing (https://pypi.python.org/pypi/aioprocessing/0.0.1) package takes and seems to be the right way. Though asyncio and multiprocessing mixed together is such a powerful new topic, it is VERY difficult to find examples online that mix the two online.

0 Likes

#5

Alex, I sincerely appreciate you forwarding this on.

I would LOVE to see an example put on the crossbar/autobahn site for multiprocessing with asyncio and “not blocking the event loop” when we get to the bottom of this!

0 Likes

#6

The secret to not blocking the event loop is to make sure you are passing args to the ProcessPoolExecutor correctly (commas, not in parenthesis) and to ensure you aren’t passing args that are shared resources (like using a ‘with’ statement while declaring a process or thread pool executor that you are using when calling asyncio methods, just pass a new instance with a max worker setting of one). See Below:

Bad (will block autobahn event loop):

with ProcessPoolExecutor(max_workers=1) as executor:

executor_future = self.loop.run_in_executor(executor, recurse_tar(path, path, parent_child_dict))

Good (will NOT BLOCK autobahn event loop… some code omitted of course):

futures = []

executor_future = self.loop.run_in_executor(ProcessPoolExecutor(max_workers=1), recurse_tar, path, path, parent_child_dict)

futures.append(executor_future)

future_results = yield from asyncio.gather(*futures)

0 Likes