Using Autobahn WebsocketClient in custom classs

#1

Hi guys.

I’m trying to leverage from this great websocket implementation. My current problem is that I simply can’t get my head around how to use the reactor, WebsocketClientProtocol, and WebsocketClientFactory.

I have been working on a library, which best can be described as an extremely simplified version of WAMP (it’s called “arip”), for quite a while now, and it seems to work. I’m using websockets as a basis, and have created a websocket server (with tornado). Additionally, I have a javascript client library. What I want to do now, is create a similar client in Python. When creating the Python client, I have been scouting the internet for usable websocket client libraries, and it seems that the Autobahn websocket client is the most elaborated, robuste, and reliable client out there. Therefore I have chosen to use Autobahn as my foundation.

My aim is to simplify things so that a developer only needs to import and use a single object when leveraging from my “arip” library. Below I have inserted the code for my python client library, and as you can see (especially in the connect method) I don’t really get the idea of how to use the Autobahn library in my own code:

Python client library:

from threading import Timer

import json

from twisted.internet import reactor

from autobahn.twisted.websocket import WebSocketClientProtocol

from autobahn.twisted.websocket import WebSocketClientFactory

class arip(WebSocketClientProtocol):

def init(self, id, type, url):

self.callCounter=0

self.callbacks={}

self.id=id

self.type=type

self.rpcs=[]

self.theurl=url

self.errorCallback=None

self.publishCallback=None

self.registrations=None

self.connected=2

···

##############################

Websocket methods

##############################

#Websocket creation

def connect(self):

#Connect to the websocket server

factory = WebSocketClientFactory(“ws://”+self.theurl+"?id="+self.id+"&type="+self.type)

factory.protocol = arip

#Make “localhost” a variable

reactor.connectTCP(“localhost”, 8888, factory)

reactor.run()

self.connected=0

#Connect

def onConnect(self, request):

print(“Client connecting: {}”.format(request.peer))

#open

def onOpen(self):

self.connected=1

self.sendMessage(’{“request”:“register_methods”,“params”:’+json.dumps(self.rpcs)+’}’.encode(‘utf8’))

#message

def onMessage(self, payload, isBinary):

mes=json.loads(payload.decode(‘utf8’))

if ‘result’ in mes:

self.callbacksmes[“req_id”]

del self.callbacks[mes[“req_id”]]

elif ‘error’ in mes:

if self.errorCallback:

self.errorCallback(mes)

elif ‘request’ in mes:

if mes[“request”]==“publish”:

if self.publishCallback:

self.publishCallback(mes)

elif mes[“request”]==“call_method”:

fn=getattr(self,mes[“params”][0],None)

res=""

if fn:

res=fn(*mes[“params”][1])

else:

res=“Error occurred. The client could not parse the request as a function”

self.sendMessage(’{“request”:“result”, “content”:"’+res+’", “req_id”:"’+mes[“req_id”]+’", “receiver”:"’+mes[“receiver”]+’"}’)

elif ‘register’ in mes:

if self.registrations:

self.registrations(mes)

elif ‘unregister’ in mes:

if self.registrations:

self.registrations(mes)

#close

def onClose(self, wasClean, code, reason):

self.connected=2

print(“WebSocket connection closed: {}”.format(reason))

##############################

arip server methods

##############################

def getAllEntities(self,callback):

req_id=self.counterControl(callback)

self.sendToServer(’{“request”:“get_all_entities”, “req_id”:"’+req_id+’"}’)

def getEntity(self, id, callback):

req_id=self.counterControl(callback)

self.sendToServer(’{“request”:“get_entity”,“params”:["’+str(id)+’"], “req_id”:"’+str(req_id)+’"}’)

def getEntitiesByType(self,type, callback):

req_id=self.counterControl(callback)

self.sendToServer(’{“request”:“get_entities_by_type”,“params”:["’+type+’"], “req_id”:"’+req_id+’"}’)

def getAllTypes(self,callback):

req_id=self.counterControl(callback)

self.sendToServer(’{“request”:“get_all_types”, “req_id”:"’+req_id+’"}’)

def subscribeToRegistrations(self):

self.sendToServer(’{“request”:“subscribe_to_registrations”}’)

def unsubscribeFromRegistrations(self):

self.sendToServer(’{“request”:“unsubscribe_from_registrations”}’)

def subscribeTo(self,id):

self.sendToServer(’{“request”:“subscribe_to”,“params”:["’+id+’"]}’)

def unsubscribeFrom(self,id):

self.sendToServer(’{“request”:“unsubscribe_from”,“params”:["’+id+’"]}’)

#params is a comma separated list

#receiver is the id of the receiver

def callMethod(self,methodName, params, receiver,callback):

req_id=self.counterControl(callback)

self.sendToServer(’{“request”:“call_method”, “params”:["’+str(methodName)+’",’+str(params)+’], “req_id”:"’+str(req_id)+’", “receiver”:"’+str(receiver)+’"}’)

def publish(self,content):

self.sendToServer(’{“request”:“publish”, “content”:"’+content+’"}’)

##############################

arip client methods

##############################

#parameters has to be a JSON list ex. [{“type”:“string”,“name”:“address”}]

def registerFunction(self, name, params, returnType):

	#self.rpcs.append('{"methodName":"'+name+'","parameters":'+params+',"returnType":"'+returnType+'"}')

self.rpcs.append({“methodName”:name,“parameters”:params,“returnType”:returnType})

def setErrorCallback(self, callback):

self.errorCallback=callback

def setPublisherCallback(self, callback):

self.publishCallback=callback

def setRegistrationsCallback(self, callback):

self.registrations=callback

##############################

Helper methods

##############################

def counterControl(self,callback):

counter=self.callCounter

self.callCounter +=1

self.callbacks[counter]=callback

return counter

def sendToServer(self, content):

if self.connected==1:

self.sendMessage(json.dumps(content))

elif self.connected==0:

t=Timer(0.5, self.sendToServer, content)

t.start()

else:

self.sendClose()

self.connect()

self.sendToServer(content)

``

Python usage of client library:

from aripClient import arip

class python_test():

def init(self, name):

self.name=name

w=arip(“my_third”,“lamppost_scheduler”,“localhost:8888/ws”)

w.registerFunction(“getName”,[],“string”)

w.registerFunction(“setName”,[{“name”:“name”,“type”:“string”}],"")

w.connect()

w.getEntity(“my_first”,self.callback_entity)

w.callMethod(“calculator”, “[12,4]”,“my_first”, self.callback_method)

w.setPublisherCallback(self.callback_publish)

w.subscribeTo(“my_first”)

#################################

#			        #

#       Callback methods      	#

#				#

#################################

def callback_entity(res):

print res

def callback_method(res):

print "The result is: "+res

def callback_publish(res):

print res

#################################

#				#

#    Specific client methods    #

#				#

#################################

def getName():

return self.name

def setName(n):

self.name=n

if name == ‘main’:

python_test(“John”)

``


I understand the concept as follows:

  • You create a protocol implementing all the asynchronous events that happens in the life of a websocket connection
  • You create a factory that has a specific url for the websocket server, and holds the protocol
  • You provide the factory to the reactor, and start the reactor in a new thread
    This is all good, and I can make it work when running the provided code examples. The problem is that I simply can’t get my head around how to implement the Autobahn library so that it fits the mindset I’m having in the above code.

First of all, it seems that I can’t do a custom implementation of “WebsocketClientProtocol” holding a connect method that initiates the websocket connection. This makes sense, since the protocol implementation only should focus on, well… the protocol. What happens right now is that I create an arip object which is a WebSocketClientProtocol, and then I, in the connect method, create a new instance of the custom WebSocketClientProtocol once again - which does not make sense. Additionally, when I create the second instance of my custom WebSocketClientProtocol class, I don’t provide any arguments hence this line “factory.protocol = arip”. This of course makes the code fail.

It’s important for me that developers using the arip library should be able to interface with the library as simple as shown in the usage code above. The developer should only create a new arip object, set a few properties, and then connect. The developer shouldn’t be concerned with anything else under the hood, meaning that they shouldn’t instantiate or start a WebsocketClientFactory or reactor them selves - this should all happen behind the scene.

All-in-all, I’m confused about how to merge the arip class directly with the Autobahn client websocket connection. Should I create something like a layer in between, that resembles a controller layer (from the famous MVC pattern)? I guess it comes down to me missing an important point somewhere.

Any help is extremely valuable.

Thanks!

Lasse Vestergaard

0 Likes