Autobahn-python asyncio.ensure_future self not defined

hi I am trying to use autobahn python client to publish an event every 10 seconds, I am using docker image as as a crossbar router.

$ docker run -it -p 8080:8080 crossbario/crossbar
........

2019-12-07T00:23:59+0000 [Controller      1] Ok, Transport transport001 has started Web Service webservice001
2019-12-07T00:23:59+0000 [Controller      1] Ok, Router worker001 configured
2019-12-07T00:23:59+0000 [Controller      1] Ok, local node configuration booted successfully!

rs.py

import os
import six
import argparse

import txaio
txaio.use_twisted()

from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.types import SubscribeOptions
from autobahn.wamp.types import PublishOptions
import asyncio
from autobahn.wamp.exception import ApplicationError


class ClientSession(ApplicationSession):

    async def onJoin(self, details):
        print('session joined', details)

    async def stream_heartbeats(self):
        counter=1
        while True:
            counter=counter+1
            try:
                publication = await self.publish(u'com.intusurg.xi.systems.{}.heartbeat'.format("vb8188"),{"data":"meh"},options=PublishOptions(acknowledge=True, exclude_me=False))
            except ApplicationError as e:
                print(e.args)
            print("heartbeat id = {}".format(publication))
            await asyncio.sleep(10)
    asyncio.ensure_future(stream_heartbeats(self))
    
    def onDisconnect(self):
        print("Disconnected!!!")


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument('-d',
                        '--debug',
                        action='store_true',
                        help='Enable debug output.')

    parser.add_argument('--url',
                        dest='url',
                        type=six.text_type,
                        default="rs://localhost:8080",
                        help='The router URL (default: "rs://localhost:8080/rs").')

    parser.add_argument('--realm',
                        dest='realm',
                        type=six.text_type,
                        default="realm1",
                        help='The realm to join (default: "realm1").')

    args = parser.parse_args()

    # start logging
    if args.debug:
        txaio.start_logging(level='debug')
    else:
        txaio.start_logging(level='info')

    extra = {
    }

    runner = ApplicationRunner(url=args.url, realm=args.realm, extra=extra)
    runner.run(ClientSession, auto_reconnect=True)

executing rs.py

(venv) ➜  playground python3 rs.py 
Traceback (most recent call last):
  File "rs.py", line 15, in <module>
    class ClientSession(ApplicationSession):
  File "rs.py", line 28, in ClientSession
    asyncio.ensure_future(stream_heartbeats(self))
NameError: name 'self' is not defined

I am facing a python issue, where I want to make sure that I publish 10 seconds to the heartbeat stream. This is continuation of my earlier efforts to start using TCP raw sockets for transportation layer.

thanks for all the help crossbar dev/doc team
Vaibhav Chauhan

The indenting on that line is wrong; move it right 4 more spaces (also, I don’t see where stream_heartbeats is defined?)

Hi @meejah I tried what you mentioned however I am now seeing that, stream_heartbeats is not being executed at all. I am a bit new to asyncio framework maybe that’s the issue but if you can help me why it’s working as expected it would be great.

import os
import six
import argparse

import txaio
txaio.use_twisted()

from autobahn.twisted.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.types import SubscribeOptions
from autobahn.wamp.types import PublishOptions
import asyncio
from autobahn.wamp.exception import ApplicationError


class ClientSession(ApplicationSession):

    async def onJoin(self, details):
        print('session joined', details)

        async def stream_heartbeats(self):
            print("lol")
            counter=1
            while True:
                counter=counter+1
                try:
                    publication = await self.publish(u'com.intusurg.xi.systems.{}.heartbeat'.format("vb8188"),{"data":"meh"},options=PublishOptions(acknowledge=True, exclude_me=False))
                except ApplicationError as e:
                    print(e.args)
                print("heartbeat id = {}".format(publication))
                await asyncio.sleep(10)
        asyncio.ensure_future(stream_heartbeats(self))
    
    def onDisconnect(self):
        print("Disconnected!!!")


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument('-d',
                        '--debug',
                        action='store_true',
                        help='Enable debug output.')

    parser.add_argument('--url',
                        dest='url',
                        type=six.text_type,
                        default="rs://localhost:8080",
                        help='The router URL (default: "rs://localhost:8080/rs").')

    parser.add_argument('--realm',
                        dest='realm',
                        type=six.text_type,
                        default="realm1",
                        help='The realm to join (default: "realm1").')

    args = parser.parse_args()

    # start logging
    if args.debug:
        txaio.start_logging(level='debug')
    else:
        txaio.start_logging(level='info')

    extra = {
    }

    runner = ApplicationRunner(url=args.url, realm=args.realm, extra=extra)
    runner.run(ClientSession, auto_reconnect=True)

client app log::

(venv) ➜  test python3 rs.py --debug
2019-12-09T10:22:23-0800 ApplicationRunner.run()
2019-12-09T10:22:23-0800 using t.a.i.ClientService
2019-12-09T10:22:23-0800 WampRawSocketClientFactory.buildProtocol(addr=IPv4Address(type='TCP', host='127.0.0.1', port=8080))
2019-12-09T10:22:23-0800 WampRawSocketClientFactory.buildProtocol() -> proto=<autobahn.twisted.rawsocket.WampRawSocketClientProtocol object at 0x10b221828>, max_message_size=16777216, MAX_LENGTH=16777216
2019-12-09T10:22:23-0800 WampRawSocketClientProtocol.connectionMade()
2019-12-09T10:22:23-0800 WampRawSocketClientProtocol: opening handshake received - 7ff10000
2019-12-09T10:22:23-0800 WampRawSocketClientProtocol: server requests us to send out most 16777216 bytes per message
2019-12-09T10:22:23-0800 ApplicationSession started.
2019-12-09T10:22:23-0800 WampRawSocketClientProtocol: opening handshake completed (using serializer <autobahn.wamp.serializer.JsonSerializer object at 0x10b135a58>)
2019-12-09T10:22:23-0800 session joined
2019-12-09T10:22:23-0800 SessionDetails(realm=<realm1>,
2019-12-09T10:22:23-0800                session=6323445940014651,
2019-12-09T10:22:23-0800                authid=<WGGU-NYAA-PKN5-FCJX-HHNW-EX65>,
2019-12-09T10:22:23-0800                authrole=<anonymous>,
2019-12-09T10:22:23-0800                authmethod=anonymous,
2019-12-09T10:22:23-0800                authprovider=static,
2019-12-09T10:22:23-0800                authextra={'x_cb_node_id': None, 'x_cb_peer': 'tcp4:172.17.0.1:33182', 'x_cb_pid': 16},
2019-12-09T10:22:23-0800                serializer=<json>,
2019-12-09T10:22:23-0800                resumed=None,
2019-12-09T10:22:23-0800                resumable=None,
2019-12-09T10:22:23-0800                resume_token=None)

you’re “shadowing” self in stream_heartbeats – just leave that off (the “inner function” can see the self variable without passing it in like that).