Example mongoDB, Autobahn Python, asyncio

#1

Hello,

Here is an example of a backend written in Python3.4 / asyncio / autobahn to access a mongoDB database

For mongoDB it uses the “motor” asynchronous driver. The next version of motor (0.5 ?) should be able to also run on top of asyncio (used to be tornado only). If you want to run the test on your own, you’ll have to clone the asyncio branch of motor on github https://github.com/mongodb/motor/tree/asyncio

As you see, accessing mongoDB is really simple !!

"""
# Purpose: show how to access a mongoDB database

<details class='elided'>
<summary title='Show trimmed content'>&#183;&#183;&#183;</summary>

#
# Author:      Remi Jolin
#
# Created:     march 2015
"""
import json
from autobahn.wamp.interfaces import IObjectSerializer, ISerializer
from autobahn.wamp.serializer import JsonObjectSerializer, Serializer
from bson import json_util
from motor.motor_asyncio import AsyncIOMotorClient
from asyncio import coroutine, get_event_loop
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp import register
from autobahn.wamp.types import RegisterOptions
import six

db_uri = "mongodb://localhost/"

class MyApplicationSession(ApplicationSession):
    def onConnect(self):
        print('onConnect......')
        self.join(self.config.realm)

    def onDisconnect(self):
        print('onDisconnect')
        # to terminate the program in case of connection failure
        loop = get_event_loop()
        loop.stop()

    @coroutine
    def onJoin(self, details):
        print('onJoin...', details)
        #  register all the method decorated by @register
        yield from self.register(self)
        #  subscribe all the method decorated with @subscribe
        yield from self.subscribe(self)
        print('fin onJoin')

class MongoApp(MyApplicationSession):
    @coroutine
    def onJoin(self, details):
        print('onJoin...', details)
        yield from super().onJoin(details)
        #  register all the method decorated by @register

        for fct in ('find', ):
            yield from self.register(getattr(self, fct), 'mongo.%s' % fct,
                                     options=RegisterOptions(details_arg='_details'))

    @register('mongo.find_one')
    @coroutine
    def find_one(self, collection, query, fields=None):
        print('find_one', collection, query, fields, test_db[collection])
        return test_db[collection].find_one(query, fields)

    # @register('mongo.find', options=RegisterOptions(details_arg='_details'))
    @coroutine
    def find(self, collection, query, fields=None, _details=None):
        print('find', collection, query, fields, _details)
        cursor = test_db[collection].find(query, fields)
        if _details.progress:
            for value in (yield from cursor.to_list(length=None)):
                _details.progress(value)
        else:
            return [value for value in (yield from cursor.to_list(length=None))]

    @register('mongo.insert')
    @coroutine
    def insert(self, collection, record, **kwargs):
        print('insert', collection, record, kwargs)
        return test_db[collection].insert(record, **kwargs)

    @register('mongo.save')
    @coroutine
    def save(self, collection, record, **kwargs):
        print('save', collection, record, kwargs)
        return test_db[collection].save(record, **kwargs)

    @register('mongo.update')
    @coroutine
    def update(self, collection, query, object, **opts):
        print('update', collection, query, object, opts)
        return test_db[collection].update(query, object, **opts)

# to encode and decode bson to json
class BsonEncoder(json.JSONEncoder):
    """
    to encode bson data
    """
    def default(self, obj):
        # print("bson_encode", obj)
        return json_util.default(obj)

def bson_decode_hook(obj):
    """
    to decode bson data
    :param obj:
    :return:
    """
    obj = json_util.object_hook(obj)
    return obj

class BsonObjectSerializer(JsonObjectSerializer):
    def serialize(self, obj):
        """
        Implements :func:`autobahn.wamp.interfaces.IObjectSerializer.serialize`
        """
        s = json.dumps(obj, cls=BsonEncoder)
        if isinstance(s, six.text_type):
            s = s.encode('utf8')
        if self._batched:
            return s + b'\30'
        else:
            return s

    def unserialize(self, payload):
        """
        Implements :func:`autobahn.wamp.interfaces.IObjectSerializer.unserialize`
        """
        if self._batched:
            chunks = payload.split(b'\30')[:-1]
        else:
            chunks = [payload]
        if len(chunks) == 0:
            raise Exception("batch format error")
        return [json.loads(data.decode('utf8'), object_hook=bson_decode_hook) for data in chunks]

IObjectSerializer.register(BsonObjectSerializer)

class BsonSerializer(Serializer):
    SERIALIZER_ID = 'json'
    MIME_TYPE = "application/json"

    def __init__(self, batched=False):
        super().__init__(BsonObjectSerializer(batched=batched))
        if batched:
            self.SERIALIZER_ID = "json.batched"

ISerializer.register(BsonSerializer)

if __name__ == '__main__':
    print('defining runner')
    db = AsyncIOMotorClient(db_uri, tz_aware=True)
    test_db = db.test_db

    runner = ApplicationRunner(url='ws://localhost:8080/ws', realm='realm1',
                               serializers=[BsonSerializer()],
                               debug_app=True, debug_wamp=True)
    runner.run(MongoApp)
    print('fin...')

``

Suppose we have our test_db with an ‘airport’ collection and the following data in it :

{

“_id” : ObjectId(“55191db9b38fb831330d09bd”),

“code” : “NCE”,

“name” : “Nice”

}

{

“_id” : ObjectId(“55191dc9b38fb831330d09be”),

“code” : “ETZ”,

“name” : “Metz-Nancy Lorraine”

}

{

“_id” : ObjectId(“55191dd8b38fb831330d09bf”),

“code” : “CDG”,

“name” : “Charles de Gaulle”

}

{

“_id” : ObjectId(“55191de0b38fb831330d09c0”),

“code” : “ORY”,

“name” : “Orly”

}

{

“_id” : ObjectId(“55191de8b38fb831330d09c1”),

“code” : “BES”,

“name” : “Brest”

}

{

“_id” : ObjectId(“55191df4b38fb831330d09c2”),

“code” : “LAX”,

“name” : “Los Angeles”

}

``

Now, to use this from a js client :

To make a findOne for the record with code==‘BES’, you can call

session.call(‘mongo.find_one’, [‘airport’, {code: ‘BES’}])
.then(function® { console.log(‘result:’, r) })

``

To find multiple records (all records with an “E” in code), you can call either

session.call(‘mongo.find’, [‘airport’, {code: {’$regex’: ‘E’}}])
.then(function® {console.log®;})

``

and you’ll get the following result:

[Object, Object, Object]
0: Object
_id: Object
code: “NCE”
name: “Nice”
1: Object
_id: Object
code: “ETZ”
name: “Metz-Nancy Lorraine”
2: Object
_id: Object
code: “BES”
name: “Brest”

``

or (progressive result)

session.call(‘mongo.find’, [‘airport’, {code: {’$regex’: ‘E’}}], {}, {receive_progress: true})
.then(function® {console.log®;},
function(err) {console.error(err)},
function(progress) {console.log(‘progress:’, progress)})

``

where you’ll get the records one at a time (like a cursor):

progress: Object {code: “NCE”, name: “Nice”, _id: Object}

progress: Object {code: “ETZ”, name: “Metz-Nancy Lorraine”, _id: Object}

progress: Object {code: “BES”, name: “Brest”, _id: Object}

null

``

Now, to insert a new record :

session.call(‘mongo.insert’, [‘airport’, {code: ‘JFK’, name: ‘John F. Kennedy Intl airport’}])

``

To add a field on all records with ‘C’ in code:

session.call(‘mongo.update’,
[‘airport’, {code: {’$regex’: ‘C’}}, {’$set’: {new_field: 42}}],
{multi: true}).then(function® { console.log® })

``

result :

Object {updatedExisting: true, nModified: 2, ok: 1, n: 2}

``

etc…

0 Likes