Forward from one websocket to another Autobahn python

I asked a question about this on Stack Overflow and would appreciate some help. I am running a server in python and I have successfully opened two websockets, one webclient-to-me and one me-to-IBM Watson. I have been able to relay audio from the client to Watson using a shared queue, but how can I take utf8 JSON messages received on one websocket and then send them on another websocket?

I have managed to solve this problem using the method described here but still not sure if this is technically the “correct” way of sending a message from the server to the client. In the future it would be awesome to see a working example on Autobahn’s GitHub showing how to send a message from the server to the client as even the “broadcast” server example actually just echoes a message from the client.

Hi Nathanael,

using threads … the SO code … is “wrong” (it might be actually be buggy, haven’t looked deeply into the posted code).

What you want in your app is 2 independent sessions, which have a reference to each other.

Thus, your main app structure might look like:

extra = {
    "first_session": None,
    "second_session": None
}

Then start 2 sessions, providing the same extra to both.

Then, in each of the 2 sessions onJoin, set self.config.extra[..] = self

Then, in each of the 2 session onMessage, do:

if self.config.extra[self.other] and self.config.extra[self.other].is_attached():
    self.config.extra[self.other].sendMessage(..)

with self.other == "first_session" or self.other == "second_session" for the 2 sessions.

To complete, in onLeave, set the extra self.config.extra[self.me] to None again. This will make the other session stop using this session that just left (is no longer joined on the target realm or even disconnected).

Hope these hints get you going … if not, pls post back, yeah, we probably should have an example of doing such such.

Cheers,
/Tobias

@meejah I don’t think we have a raw WebSocket (non-WAMP) 2-way relay example, right? maybe there is some code somewhere …

1 Like

Thank you for your help! So I have this code now, based on your suggestions:

from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
from threading import Thread
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
# For autobahn
import json
from autobahn.twisted.websocket import WebSocketServerProtocol, \
    WebSocketServerFactory
from twisted.internet import reactor

try:
    from Queue import Queue, Full
except ImportError:
    from queue import Queue, Full

extra = {
    "first_session": None,
    "second_session": None
}

###############################################
#### Initalize queue to store the recordings ##
###############################################
CHUNK = 1024
# Note: It will discard if the websocket client can't consumme fast enough
# So, increase the max size as per your choice
BUF_MAX_SIZE = CHUNK * 10
# Buffer to store audio
q = Queue(maxsize=int(round(BUF_MAX_SIZE / CHUNK)))
# Create an instance of AudioSource
audio_source = AudioSource(q, True, True)

###############################################
#### Prepare Speech to Text Service ########
###############################################

# initialize speech to text service
authenticator = IAMAuthenticator('secretkey')
speech_to_text = SpeechToTextV1(authenticator=authenticator)

# define callback for the speech to text service
class MyRecognizeCallback(RecognizeCallback):
    def __init__(self):
        RecognizeCallback.__init__(self)

    def on_transcription(self, transcript):
        # Forward to client
        if self.config.extra["second_session"] and self.config.extra["second_session"].is_attached():
            self.config.extra["second_session"].sendMessage(transcript)

    def on_connected(self):
        print('Connection was successful')
        self.config.extra["first_session"] = self

    def on_error(self, error):
        # Forward to client
        if self.config.extra["second_session"] and self.config.extra["second_session"].is_attached():
            self.config.extra["second_session"].sendMessage('Error received: {}'.format(error))

    def on_inactivity_timeout(self, error):
        # Forward to client
        if self.config.extra["second_session"] and self.config.extra["second_session"].is_attached():
            self.config.extra["second_session"].sendMessage('Inactivity timeout: {}'.format(error))

    def on_listening(self):
        print('Service is listening')

    def on_hypothesis(self, hypothesis):
        # Forward to client
        if self.config.extra["second_session"] and self.config.extra["second_session"].is_attached():
            self.config.extra["second_session"].sendMessage(hypothesis)

    def on_data(self, data):
        # Forward to client
        if self.config.extra["second_session"] and self.config.extra["second_session"].is_attached():
            self.config.extra["second_session"].sendMessage(data)

    def on_close(self):
        # Forward to client
        if self.config.extra["second_session"] and self.config.extra["second_session"].is_attached():
            self.config.extra["second_session"].sendMessage("Recognizer session closed")
        self.config.extra["first_session"] = None

class MyServerProtocol(WebSocketServerProtocol):

    def onConnect(self, request):
        print("Client connecting: {0}".format(request.peer))
        # Start recognizer on connection
        recognize_thread = Thread(target=recognize_using_weboscket, args=())
        recognize_thread.daemon = True
        recognize_thread.start()

    def onOpen(self):
        print("WebSocket connection open.")
        self.config.extra["second_session"] = self

    def onMessage(self, payload, isBinary):
        if isBinary:
            try: # Put incoming audio into the queue
                q.put(payload)
            except Full:
                pass # discard
        else:
            print("Text message received: {0}".format(payload.decode('utf8')))

    def onClose(self, wasClean, code, reason):
        print("WebSocket connection closed: {0}".format(reason))
        self.config.extra["second_session"] = None
  
## this function will initiate the recognize service and pass in the AudioSource
def recognize_using_weboscket(*args):
    mycallback = MyRecognizeCallback()
    speech_to_text.recognize_using_websocket(audio=audio_source,
                                            content_type='audio/l16; rate=16000',
                                            recognize_callback=mycallback,
                                            interim_results=True)

if __name__ == '__main__':

    factory = WebSocketServerFactory("ws://127.0.0.1:9001")
    factory.protocol = MyServerProtocol

    reactor.listenTCP(9001, factory)
    reactor.run()

Not sure if I understood correctly. How can I “provide” extra to both of my sessions? Also, correct me if I’m misunderstanding but does Autobahn for websockets provide a onJoin() and is_attached() callback? Or should I somehow make these callbacks myself?