How do I detect wamp.error.procedure_already_exists and restart event loop?

I have a server app which connects to crossbar and the on_join method registers some RPC functions.

The problem I have is that sometimes there is a previous instance of the server (we’re using Docker) which is still shutting down and is still connected to the crossbar, so I get a “wamp.error.procedure_already_exists” error.

I want to detect this error and have my app retry periodically to register my procedures , as the old app will be shutdown within a few seconds so those procedures should be free.

My current code using asyncio wamp looks like

component = Component(
    transports=[
        {
            "type": "websocket",
            "url": wamp_server,
            "options": {
                "open_handshake_timeout": 100,
            }
        },
    ],
    realm="myrealm",
)

@component.on_join
async def join(session, details):
    global this_session
    this_session = session
     session.register(seek, "some.procedure.named.seek")

I see that the session.register returns an object that looks like
future: <Future finished exception=ApplicationError(“register for already registered procedure ‘some.procedure.named.seek’”)>

How would I get access to that Future and check the exception, and then get my Component to retry to register my procedures?

I found I can get access to the exception like this, so i can see when there’s an error, but what is the best way to retry the register() call? Would I put a retry loop in the on_join function? If so , how do I get the info about the error that the ‘done_callback’ has access to, in order to know if I should retr the register()?

async def join(session, details):
    future = session.register(seek, "ai.leela.daikon.seek")
    def done(fut):
        e = fut.exception()
        if isinstance(e, ApplicationError):
            print(f'{fut} is ApplicationError!!')
    future.add_done_callback(done)

Since you’re using Component already, I think it should retry if “the session fails” – and I’d expect that an exception out of on_join would count.

What behavior do you observe if you just let the exceptions from .register() come out of on_join? (That is just “await register()”). Looks like you’re using python3 and asyncio, is that true?

When the code gets one of these failures to register, no error is thrown, so my app proceeds
to run its ‘event loop’, just not registered on the crossbar service for any procedures.

I have an awful workaround I am using which is to put ‘done_handler’ on one of my calls to register a procedure, and if it succeeds, it assumes it’s ok and registers the rest of them (without looking at any other registration failures errors those calls might cause), otherwise it keeps looping periodically trying to register that one procedure

async def join(session, details):
  global register_err
  register_err = True
  def done(fut):
      global register_err
      e = fut.exception()
      if e == None:
          register_err = False
      elif isinstance(e, ApplicationError):
          register_err = True

  while register_err:
      session.register(seek, "my.projectname.seek").add_done_callback(done)
      await asyncio.sleep(1)

  session.register( set_source, "my.projectname.set_source")
  session.register( step,       "my.projectname.step")
  session.register( play,       "my.projectname.play")
  session.register( stop,       "my.projectname.stop")
  session.register( setActions, "my.projectname.set_actions")
 ...

You’re not “await”-ing any of the registers? (In the first listing if you “await session.register(…)” does it re-connect? If you replace that with “raise Exception(‘test’)” does it re-connect?)

If I try doing await on the session.register, when the procedure is already registered by another app, I get a printout to the console which repeats three times then appears to hang

2020-11-20T03:33:50 Session.onUserError(): "wamp.error.procedure_already_exists: register for already registered procedure 'ai.leela.daikon.seek'"
2020-11-20T03:33:50 Session.onUserError(): "wamp.error.procedure_already_exists: register for already registered procedure 'ai.leela.daikon.seek'"
2020-11-20T03:33:50 Session.onUserError(): "wamp.error.procedure_already_exists: register for already registered procedure 'ai.leela.daikon.seek'"

I don’t see anything retrying the on_join

you can use the force_register option when registering the procedure

this option is exactly for such use cases (eg a non-shared registration where a reconnecting client wants to kick any previously registered, still not purged callee)

1 Like

forgot to point out: this is a Crossbar.io specific, not yet standardized (aka implementation defined) WAMP option …

Thanks that force_reregister works !

One question, in the existing server, I see a printout to console

2020-11-21T13:25:26 Router unregistered procedure ‘ai.leela.daikon.set_source’ with ID 3197339563022590

Is there any way I can put a handler on whatever is printing that, so I can force a shutdown of the old server when a new one starts registering?

so in principle, the WAMP meta event wamp.registration.on_unregister should also be fired in this special case.

note that the WAMP meta API is standardized, but nevertheless optional / not implemented in all routers.

anyways, here is the spec

https://wamp-proto.org/_static/gen/wamp_latest.html#registration-meta-api
https://wamp-proto.org/_static/gen/wamp_latest_ietf.html#rfc.section.14.3.7

and here is a Crossbar.io example

I should emphasize: this situation is “special”, as the registration of the (dead) session is actively kicked by the router. this is different from when the whole session is later purged from the router when it finally detects the session as being dead. in that case, the registration is removed as well of course - but it is a different code path in crossbar. anyways, pls first try - if it doesn’t work, pls come back;)

The documentation in crossbar.io for subscribing to meta-events says

Important : To receive and process these events, your component will need to have subscribe permission on the respective topic.

What do I need to do to ensure my client app has permissions to subscribe to
wamp.registration.on_unregister?

My config for my server just looks like this currently:

component = Component(
    transports=[
        {
            "type": "websocket",
            "url": wamp_server,
            "options": {
                "open_handshake_timeout": 100,
            }
        },
    ],
    realm="leela",
)

The WAMP meta API (the feature itself) is enabled by default (you can control that via enable_meta_api in options).

Then, to allow a client to actually subscribe to the respective URIs, just configure permissions. Eg this allows “anything” (which includes wamp.* URI subscriptions):

obviously, for production, you will want to narrow those permissions to only those clients (via authrole potentially) that should be allowed, and also narrow down the URIs matched. you only need the exact URI wamp.registration.on_unregister, only need action subscribe, and only want say authrole=="my-backend"