Trouble in subscribing to a published event stream(Rawsockets)

Hi, I am trying to subscribe using rs_subscriber.py to an eventstream published by rs_publisher.py. I am using Crossbar v19.10.1(default docker image) and autobahn==19.11.1

I start crossbar server using,

docker run -it -p 8080:8080 crossbario/crossbar

rs_subscriber.py

from autobahn.asyncio.rawsocket import RawSocketClientProtocol,WampRawSocketClientFactory


class MyClientProtocol(RawSocketClientProtocol):

    def onConnect(self, response):
        print("Server connected: {0}".format(response.peer))

    def onConnecting(self, transport_details):
        print("Connecting; transport details: {}".format(transport_details))
        return None  # ask for defaults

    def onOpen(self):
        print("RawSocket connection open.")

        def hello():
            self.sendMessage(u"Hello, world!".encode('utf8'))
            self.sendMessage(b"\x00\x01\x03\x04", isBinary=True)
            self.factory.loop.call_later(1, hello)

        # start sending messages every second ..
        hello()

    def onMessage(self, payload, isBinary):
        if isBinary:
            print("Binary message received: {0} bytes".format(len(payload)))
        else:
            print("Text message received: {0}".format(payload.decode('utf8')))

    def onClose(self, wasClean, code, reason):
        print("RawSocket connection closed: {0}".format(reason))


if __name__ == '__main__':

    try:
        import asyncio
    except ImportError:
        # Trollius >= 0.3 was renamed
        import trollius as asyncio

    factory = WampRawSocketClientFactory(u"rs://localhost:8080")
    factory.protocol = MyClientProtocol

    loop = asyncio.get_event_loop()
    coro = loop.create_connection(factory, 'localhost', 8080)
    loop.run_until_complete(coro)
    loop.run_forever()
    loop.close()

rs_subscriber.py

import os
import six
import argparse

import txaio
txaio.use_asyncio()

from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.types import SubscribeOptions
from  autobahn.wamp.serializer import MsgPackSerializer,CBORObjectSerializer
from autobahn.wamp.types import PublishOptions
import asyncio
from autobahn.wamp.exception import ApplicationError


class ClientSession(ApplicationSession):

    async def onJoin(self, details):
        print('session joined', details)

        async def stream_heartbeats(self):
            counter=1
            print(counter)
            while True:
                counter=counter+1
                try:
                    publication = await self.publish(u'com.intusurg.xi.systems.{}.heartbeat'.format("vb8188"),{"data":"meh"},options=PublishOptions(acknowledge=True, exclude_me=False))
                except ApplicationError as e:
                    print(e.args)
                print("heartbeat id = {}".format(publication))
                await asyncio.sleep(1)
        asyncio.ensure_future(stream_heartbeats(self))
    
    def onDisconnect(self):
        print("Disconnected!!!")


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument('-d',
                        '--debug',
                        action='store_true',
                        help='Enable debug output.')

    parser.add_argument('--url',
                        dest='url',
                        type=six.text_type,
                        default="rs://localhost:8080",
                        help='The router URL (default: "rs://localhost:8080").')

    parser.add_argument('--realm',
                        dest='realm',
                        type=six.text_type,
                        default="realm1",
                        help='The realm to join (default: "realm1").')

    args = parser.parse_args()

    # start logging
    if args.debug:
        txaio.start_logging(level='debug')
    else:
        txaio.start_logging(level='info')

    extra = {
    }
    serializer = [MsgPackSerializer()]
    runner = ApplicationRunner(
                url=args.url,
                realm=args.realm,
                serializers=serializer,
                extra=extra
            )
    runner.run(ClientSession)

Traceback::
(venv) ➜ test python3 rs_subscriber.py
Exception in callback RawSocketClientProtocol.connection_made(<_SelectorSoc…e, bufsize=0>>)
handle: <Handle RawSocketClientProtocol.connection_made(<_SelectorSoc…e, bufsize=0>>) created at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/selector_events.py:696>
source_traceback: Object created at (most recent call last):
File “/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py”, line 1432, in _run_once
handle._run()
File “/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/events.py”, line 145, in _run
self._callback(*self._args)
File “/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/coroutines.py”, line 126, in send
return self.gen.send(value)
File “/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py”, line 809, in create_connection
sock, protocol_factory, ssl, server_hostname)
File “/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/coroutines.py”, line 110, in next
return self.gen.send(None)
File “/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py”, line 832, in _create_connection_transport
transport = self._make_socket_transport(sock, protocol, waiter)
File “/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/selector_events.py”, line 73, in _make_socket_transport
extra, server)
File “/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/selector_events.py”, line 696, in init
self._loop.call_soon(self._protocol.connection_made, self)
Traceback (most recent call last):
File “/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/events.py”, line 145, in _run
self._callback(*self._args)
File “/Users/vchauhan/playground/test/venv/lib/python3.6/site-packages/autobahn/asyncio/rawsocket.py”, line 248, in connection_made
self._length_exp << 4 | self.serializer_id,
File “/Users/vchauhan/playground/test/venv/lib/python3.6/site-packages/autobahn/asyncio/rawsocket.py”, line 242, in serializer_id
raise NotImplementedError()
NotImplementedError

The problem is with your rs_subscriber.py code, your class should inherit ApplicationSession just like your subscriber.

In summary: RawSocketClientProtocol isn’t supposed to be used (at all?) and definitely not the way you are trying to use it.

ok I tried this which is very similar to what we have for publisher code,

rs_subscriber.py(updated)

from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

class ClientSession(ApplicationSession):

    def onConnect(self, response):
        print("Server connected: {0}".format(response.peer))

    def onConnecting(self, transport_details):
        print("Connecting; transport details: {}".format(transport_details))
        return None  # ask for defaults

    def onOpen(self):
        print("RawSocket connection open.")

        def hello():
            self.sendMessage(u"Hello, world!".encode('utf8'))
            self.sendMessage(b"\x00\x01\x03\x04", isBinary=True)
            self.factory.loop.call_later(1, hello)

        # start sending messages every second ..
        hello()

    def onMessage(self, payload, isBinary):
        if isBinary:
            print("Binary message received: {0} bytes".format(len(payload)))
        else:
            print("Text message received: {0}".format(payload.decode('utf8')))

    def onClose(self, wasClean, code, reason):
        print("RawSocket connection closed: {0}".format(reason))


if __name__ == '__main__':
    runner = ApplicationRunner(
            url="rs://localhost:8080",
            realm="realm1",
        )
    runner.run(ClientSession)

but I am not getting

(venv) ➜  test python3 rs_subscriber.py
2019-12-11T12:29:32 WampRawSocketProtocol: ApplicationSession constructor / onOpen raised (onOpen() takes 1 positional argument but 2 were given)
2019-12-11T12:29:32 WampRawSocketProtocol: ApplicationSession.onClose raised (onClose() missing 2 required positional arguments: 'code' and 'reason')

I am thinking that I have to call subscribe on (‘com.intusurg.xi.systems.vb8188.heartbeat’) but I am not sure where…

Please remove onConnect(), onConnecting(), onOpen() and onMessage() overrides, they are not part of our WAMP interfaces. You seem to be confusing things, please take a look at a working WAMP example here https://github.com/crossbario/autobahn-python/blob/master/examples/asyncio/wamp/overview/frontend.py

yes this worked, FWIW this is the corresponding updated rs_suscriber.py file.

from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner

class ClientSession(ApplicationSession):

    async def onJoin(self, details):
        def onevent(msg):
            print("Got event: {}".format(msg))
        await self.subscribe(onevent, u'com.intusurg.xi.systems.vb8188.heartbeat')

    def onClose(self, wasClean, code, reason):
        print("RawSocket connection closed: {0}".format(reason))


if __name__ == '__main__':
    runner = ApplicationRunner(
            url="rs://localhost:8080",
            realm="realm1",
        )
    runner.run(ClientSession)

output
2019-12-11T12:41:27 poll took 1004.040 ms: 1 events
Got event: {'data': 'meh'}