[python] New Component API and dynamic subscribe/unsubscribe register/unregister, how to ? (asyncio)

#1

Hi everyone,

I’m trying to use the new component API and I’m not able to find how to manage the subscribe/unsubscribe/register/unregister.

I started from the little Klein example in the doc (https://autobahn.readthedocs.io/en/latest/wamp/programming.html#patterns-for-more-complicated-applications) and came to this:

import asyncio
from autobahn.asyncio.component import Component, run
from autobahn import wamp

comp = Component(
transports=[{
“type”: “rawsocket”,
“url”: “/tmp/mysocket1”,
“endpoint”: {
“type”: “unix”,
“path”: “/tmp/mysocket1”
}
}],
realm=u"realm1",
)

class MyClass:
def init(self, wamp_comp):
self._wamp = wamp_comp
self._session = None
self._wamp.on(“join”, self._initialize)
self._wamp.on(“leave”, self._uninitialize)

async def _initialize(self, session: wamp.ISession, details):
    self._session = session
    print("Subscribed")
    await session.subscribe(handler=self.output, topic="com.test")

def _uninitialize(self, session, reason):
    print(session, reason)
    print("Lost WAMP connection")
    self._session = None

async def output(self, value):
    print("ok")
    print(value)
    await asyncio.sleep(0)

@comp.on_join
async def joined(session, details):
print(“session ready”)
count = 0
while True:
session.publish(topic=“com.test”, value=“hello”, count=count)
count += 1
print(“hello”)
await asyncio.sleep(5)

if name == “main”:
myobj = MyClass(comp)
run([comp])

But, until now I’ve not been able to get something working when I do session.*.

Right now, the output is:

2018-08-20T16:52:13 connecting once using transport type “rawsocket” over endpoint “unix”
2018-08-20T16:52:13 ApplicationSession started.
session ready
hello
Subscribed
hello
hello
^C2018-08-20T16:52:25 Shutting down due to SIGINT

So the subscriber do not work.

The aim of this is to be able to use the subscribe, unsubscribe register unregister without the decorators because the topics and so on will be defined at run time (this example is utterly simplified to stress my problem)(and also without the old ApplicationSession)

If someone has a little clue for me, it would be great.

Nicolas

0 Likes

#2

Your only running one component - you need a second component to subscribe to the topic being published by the current component you have.

Just tweak your the run function to run two components so they can talk balk and forth.

if name == “main”:

myobj = MyClass(comp)

run([comp, comp])
···

On Monday, August 20, 2018 at 8:09:06 AM UTC-7, Potens wrote:

Hi everyone,

I’m trying to use the new component API and I’m not able to find how to manage the subscribe/unsubscribe/register/unregister.

I started from the little Klein example in the doc (https://autobahn.readthedocs.io/en/latest/wamp/programming.html#patterns-for-more-complicated-applications) and came to this:

import asyncio
from autobahn.asyncio.component import Component, run
from autobahn import wamp

comp = Component(
transports=[{
“type”: “rawsocket”,
“url”: “/tmp/mysocket1”,
“endpoint”: {
“type”: “unix”,
“path”: “/tmp/mysocket1”
}
}],
realm=u"realm1",
)

class MyClass:
def init(self, wamp_comp):
self._wamp = wamp_comp
self._session = None
self._wamp.on(“join”, self._initialize)
self._wamp.on(“leave”, self._uninitialize)

async def _initialize(self, session: wamp.ISession, details):
    self._session = session
    print("Subscribed")
    await session.subscribe(handler=self.output, topic="com.test")

def _uninitialize(self, session, reason):
    print(session, reason)
    print("Lost WAMP connection")
    self._session = None

async def output(self, value):
    print("ok")
    print(value)
    await asyncio.sleep(0)

@comp.on_join
async def joined(session, details):
print(“session ready”)
count = 0
while True:
session.publish(topic=“com.test”, value=“hello”, count=count)
count += 1
print(“hello”)
await asyncio.sleep(5)

if name == “main”:
myobj = MyClass(comp)
run([comp])

But, until now I’ve not been able to get something working when I do session.*.

Right now, the output is:

2018-08-20T16:52:13 connecting once using transport type “rawsocket” over endpoint “unix”
2018-08-20T16:52:13 ApplicationSession started.
session ready
hello
Subscribed
hello
hello
^C2018-08-20T16:52:25 Shutting down due to SIGINT

So the subscriber do not work.

The aim of this is to be able to use the subscribe, unsubscribe register unregister without the decorators because the topics and so on will be defined at run time (this example is utterly simplified to stress my problem)(and also without the old ApplicationSession)

If someone has a little clue for me, it would be great.

Nicolas

0 Likes

#3

Hi,

Thanks a lot for your answer, it helped me to find my mistake. Trying your advice resulted having two sessions, subscriptions and publish, so everything twice (but the first publish, only once), trying to understand further what was going on (trying to understand why I would have to have two components), I understood I had forgotten to use options=wamp.PublishOptions(exclude_me=False) in the publish… :frowning:

Now it’s working, the only thing is I loose the first publish since (I guess) the decorator comp.join is done before the _initialize.

So, to remember, read and re-read the doc (autobahn and crossbar).

Thanks a lot Adrien, your answer made me ask myself the good questions to understand my mistake.

For the records, here is my final script:

import asyncio

from autobahn import wamp
from autobahn.asyncio.component import Component, run

comp = Component(
transports=[{
“type”: “rawsocket”,
“url”: “/tmp/mysocket1”,
“endpoint”: {
“type”: “unix”,
“path”: “/tmp/mysocket1”
}
}],
realm=u"realm1",
)

class MyClass:
def init(self, wamp_comp):
self._wamp = wamp_comp
self._session = None
self._wamp.on(“join”, self._initialize)
self._wamp.on(“leave”, self._uninitialize)

def _initialize(self, session: wamp.ISession, details):
    self._session = session
    print("Subscribed")
    print("session")
    print(session)
    session.subscribe(handler=self.output, topic="com.test")

def _uninitialize(self, session, reason):
    print(session, reason)
    print("Lost WAMP connection")
    self._session = None

async def output(self, value, *args, **kwargs):
    print("output:")
    print("\tvalue :", value)
    print("\targs :", args)
    print("\tkwargs :", kwargs)
    await asyncio.sleep(0)

@comp.on_join
async def joined(session, details):
print(“session ready”)
count = 0
while True:
session.publish(
topic=“com.test”,
value=“hello”,
count=count,
options=wamp.PublishOptions(exclude_me=False))
count += 1
print("publishing hello ", count)
await asyncio.sleep(5)

if name == “main”:
myobj = MyClass(comp)
run([comp])

And the output:

2018-08-23T10:30:50 connecting once using transport type “rawsocket” over endpoint “unix”
2018-08-23T10:30:50 ApplicationSession started.
Subscribed
session
<autobahn.asyncio.wamp.Session object at 0x7f9c1bbb3f60>
session ready
publishing hello 1
output:
value : hello
args : ()
kwargs : {‘count’: 0}
publishing hello 2
output:
value : hello
args : ()
kwargs : {‘count’: 1}
publishing hello 3
output:
value : hello
args : ()
kwargs : {‘count’: 2}
^C2018-08-23T10:31:00 Shutting down due to SIGINT

···

On Thursday, August 23, 2018 at 4:41:05 AM UTC, Adrien Emery wrote:

Your only running one component - you need a second component to subscribe to the topic being published by the current component you have.

Just tweak your the run function to run two components so they can talk balk and forth.

if name == “main”:

myobj = MyClass(comp)
run([comp, comp])

On Monday, August 20, 2018 at 8:09:06 AM UTC-7, Potens wrote:

Hi everyone,

I’m trying to use the new component API and I’m not able to find how to manage the subscribe/unsubscribe/register/unregister.

I started from the little Klein example in the doc (https://autobahn.readthedocs.io/en/latest/wamp/programming.html#patterns-for-more-complicated-applications) and came to this:

import asyncio
from autobahn.asyncio.component import Component, run
from autobahn import wamp

comp = Component(
transports=[{
“type”: “rawsocket”,
“url”: “/tmp/mysocket1”,
“endpoint”: {
“type”: “unix”,
“path”: “/tmp/mysocket1”
}
}],
realm=u"realm1",
)

class MyClass:
def init(self, wamp_comp):
self._wamp = wamp_comp
self._session = None
self._wamp.on(“join”, self._initialize)
self._wamp.on(“leave”, self._uninitialize)

async def _initialize(self, session: wamp.ISession, details):
    self._session = session
    print("Subscribed")
    await session.subscribe(handler=self.output, topic="com.test")

def _uninitialize(self, session, reason):
    print(session, reason)
    print("Lost WAMP connection")
    self._session = None

async def output(self, value):
    print("ok")
    print(value)
    await asyncio.sleep(0)

@comp.on_join
async def joined(session, details):
print(“session ready”)
count = 0
while True:
session.publish(topic=“com.test”, value=“hello”, count=count)
count += 1
print(“hello”)
await asyncio.sleep(5)

if name == “main”:
myobj = MyClass(comp)
run([comp])

But, until now I’ve not been able to get something working when I do session.*.

Right now, the output is:

2018-08-20T16:52:13 connecting once using transport type “rawsocket” over endpoint “unix”
2018-08-20T16:52:13 ApplicationSession started.
session ready
hello
Subscribed
hello
hello
^C2018-08-20T16:52:25 Shutting down due to SIGINT

So the subscriber do not work.

The aim of this is to be able to use the subscribe, unsubscribe register unregister without the decorators because the topics and so on will be defined at run time (this example is utterly simplified to stress my problem)(and also without the old ApplicationSession)

If someone has a little clue for me, it would be great.

Nicolas

0 Likes

#4

Great! And I didn’t know you could publish messages and listen from the same component using:

options=wamp.PublishOptions(exclude_me=False)

Cheers!

···

On Thursday, August 23, 2018 at 1:39:08 AM UTC-7, Potens wrote:

Hi,

Thanks a lot for your answer, it helped me to find my mistake. Trying your advice resulted having two sessions, subscriptions and publish, so everything twice (but the first publish, only once), trying to understand further what was going on (trying to understand why I would have to have two components), I understood I had forgotten to use options=wamp.PublishOptions(exclude_me=False) in the publish… :frowning:

Now it’s working, the only thing is I loose the first publish since (I guess) the decorator comp.join is done before the _initialize.

So, to remember, read and re-read the doc (autobahn and crossbar).

Thanks a lot Adrien, your answer made me ask myself the good questions to understand my mistake.

For the records, here is my final script:

import asyncio

from autobahn import wamp
from autobahn.asyncio.component import Component, run

comp = Component(
transports=[{
“type”: “rawsocket”,
“url”: “/tmp/mysocket1”,
“endpoint”: {
“type”: “unix”,
“path”: “/tmp/mysocket1”
}
}],
realm=u"realm1",
)

class MyClass:
def init(self, wamp_comp):
self._wamp = wamp_comp
self._session = None
self._wamp.on(“join”, self._initialize)
self._wamp.on(“leave”, self._uninitialize)

def _initialize(self, session: wamp.ISession, details):
    self._session = session
    print("Subscribed")
    print("session")
    print(session)
    session.subscribe(handler=self.output, topic="com.test")

def _uninitialize(self, session, reason):
    print(session, reason)
    print("Lost WAMP connection")
    self._session = None

async def output(self, value, *args, **kwargs):
    print("output:")
    print("\tvalue :", value)
    print("\targs :", args)
    print("\tkwargs :", kwargs)
    await asyncio.sleep(0)

@comp.on_join
async def joined(session, details):
print(“session ready”)
count = 0
while True:
session.publish(
topic=“com.test”,
value=“hello”,
count=count,
options=wamp.PublishOptions(exclude_me=False))
count += 1
print("publishing hello ", count)
await asyncio.sleep(5)

if name == “main”:
myobj = MyClass(comp)
run([comp])

And the output:

2018-08-23T10:30:50 connecting once using transport type “rawsocket” over endpoint “unix”
2018-08-23T10:30:50 ApplicationSession started.
Subscribed
session
<autobahn.asyncio.wamp.Session object at 0x7f9c1bbb3f60>
session ready
publishing hello 1
output:
value : hello
args : ()
kwargs : {‘count’: 0}
publishing hello 2
output:
value : hello
args : ()
kwargs : {‘count’: 1}
publishing hello 3
output:
value : hello
args : ()
kwargs : {‘count’: 2}
^C2018-08-23T10:31:00 Shutting down due to SIGINT

On Thursday, August 23, 2018 at 4:41:05 AM UTC, Adrien Emery wrote:

Your only running one component - you need a second component to subscribe to the topic being published by the current component you have.

Just tweak your the run function to run two components so they can talk balk and forth.

if name == “main”:

myobj = MyClass(comp)
run([comp, comp])

On Monday, August 20, 2018 at 8:09:06 AM UTC-7, Potens wrote:

Hi everyone,

I’m trying to use the new component API and I’m not able to find how to manage the subscribe/unsubscribe/register/unregister.

I started from the little Klein example in the doc (https://autobahn.readthedocs.io/en/latest/wamp/programming.html#patterns-for-more-complicated-applications) and came to this:

import asyncio
from autobahn.asyncio.component import Component, run
from autobahn import wamp

comp = Component(
transports=[{
“type”: “rawsocket”,
“url”: “/tmp/mysocket1”,
“endpoint”: {
“type”: “unix”,
“path”: “/tmp/mysocket1”
}
}],
realm=u"realm1",
)

class MyClass:
def init(self, wamp_comp):
self._wamp = wamp_comp
self._session = None
self._wamp.on(“join”, self._initialize)
self._wamp.on(“leave”, self._uninitialize)

async def _initialize(self, session: wamp.ISession, details):
    self._session = session
    print("Subscribed")
    await session.subscribe(handler=self.output, topic="com.test")

def _uninitialize(self, session, reason):
    print(session, reason)
    print("Lost WAMP connection")
    self._session = None

async def output(self, value):
    print("ok")
    print(value)
    await asyncio.sleep(0)

@comp.on_join
async def joined(session, details):
print(“session ready”)
count = 0
while True:
session.publish(topic=“com.test”, value=“hello”, count=count)
count += 1
print(“hello”)
await asyncio.sleep(5)

if name == “main”:
myobj = MyClass(comp)
run([comp])

But, until now I’ve not been able to get something working when I do session.*.

Right now, the output is:

2018-08-20T16:52:13 connecting once using transport type “rawsocket” over endpoint “unix”
2018-08-20T16:52:13 ApplicationSession started.
session ready
hello
Subscribed
hello
hello
^C2018-08-20T16:52:25 Shutting down due to SIGINT

So the subscriber do not work.

The aim of this is to be able to use the subscribe, unsubscribe register unregister without the decorators because the topics and so on will be defined at run time (this example is utterly simplified to stress my problem)(and also without the old ApplicationSession)

If someone has a little clue for me, it would be great.

Nicolas

0 Likes