Subscribe and exit session, then subscribe to new topic without stopping the reactor


Writing a test program in order to subscribe to different topics one at a time. When calling the Start function in subscribe multiple times from another .py file, see attached code

When running Subscribe.Start it does not get to the return statement without using the reactor.stop(). When using the reactor.stop() and then subscribe to a new topic an error is thrown i.e. “Reactor not restartable”

from autobahn.twisted import sleep
from autobahn.twisted.component import Component, run
from autobahn.twisted.wamp import ApplicationRunner, ApplicationSession, Application, Session
from autobahn.wamp import auth
from autobahn.wamp.serializer import JsonSerializer
from pprint import pprint

from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks

class SubscribeComponent(Session):
	def __init__(self,
				topic: str,
				wait: float):
		self._topic = topic
		self._wait = wait
		self.result = None

	def onJoin(self, details):
		def on_event(*data):
			print('Subscription event received(#%d)' % len(data))
			self.result = data

		subscription = yield self.subscribe(on_event, self._topic)
		print('Subscribe to:%s sub id:%d' % (self._topic,

		yield sleep(self._wait)
		res = yield subscription.unsubscribe()


	def onDisconnect(self):
		if reactor.running:

class Subscribe:
	def __init__(self,
				url: str,
				realm: str):
		self.url = url
		self.realm = realm
		self.runner = ApplicationRunner(url=self.url, realm=self.realm)

	def start(self, topic: str, wait: float):
		# Setup Component to use wamp cra authentication.
		session = SubscribeComponent(topic=topic, wait=wait)
			print('Starting %s at %s, %s, %s' % ("Send", self.url, self.realm, topic))
			#, start_reactor=False), start_reactor=False)
			return session.result
		except Exception as e:
			pprint('Error, Is the router up?')
			return False


Have you tried some of the Autobahn WAMP examples? The “Component” API is a nice high-level one with some examples here:

The Twisted reactor is not re-startable (so is a one-time operation). It’s usually better to use the higher-level twisted.internet.task.react function which handles reactor startup and shutdown; you then simply write an asynchronous “main” function.

We can’t use a main funcrtion due to that we need to call the code above form another python file. We use classes and heritage so main function are not an option for us. See the calling python file below.

from test_framework.api.wamp.initialise import Initialise
from test_framework.api.wamp.rpc_call import RpcCall
from test_framework.api.wamp.subscribe import Subscribe
from test_framework.api.wamp.wamp_api import WampApi

class OpWampApi(WampApi):
	def __init__(self,
				host: str,
				port: int,
				realm: str,
		self.wamp_reactor = wamp_reactor
		self.initialise = Initialise()
		self.rpc_call = RpcCall(url=self._base_uri, realm=self._realm)
		self.subscribe = Subscribe(url=self._base_uri, realm=self._realm)

	def call(self, topic: str, msg: str) -> bool:
		self.rpc_call.start(topic=topic, msg=msg)

	def get_value(self, topic: str) -> bool:
		return self.subscribe.start(topic=topic, wait=20)

	def is_ready(self, wait: float = 10) -> bool:
		return self.initialise.start(url=self._base_uri, realm=self._realm, wait=wait)

We use classes and heritage

well, then don’t use it! python is a functional language (are you coming from plain old java or the like?). from a quick glance at the code you posted, doesn’t make sense to me to wrap autobahn/wamp in this way. you likely don’t need it. keep it simple;)