Using the Mongo wire protocol for API services

So the startup I work for uses Mongo. I have my own opinions of Mongo,
some good, some bad. I have a decent amount of experience with the wire
protocol. I've written an asynchronous C driver[1] for GObject, rewritten a
python-twisted driver[2], and my own BSON implementation. We wrote ourselves
a "schema" verifier for Mongo that works offline against the Mongo
disk format using my C driver. Needless to say, we have a lot of investment
in Mongo and therefore this impacts my design decisions.

One thing that I do like about Mongo is the simplicity in the scaling
design. The implementation is somewhat annoying (all the different types
of daemons and configurations), but the design itself seems "good enough".
You communicate to each Mongo instance as if it were a real data store,
and it handles communicating with various shards or physical data access.
It is the "turtles all the way down" approach. Lovely. When a system fails,
you have information about the replica-set and can connect to another node
to continue operation.

This simplicity in this design means slightly more complexity in the
client libraries, at the benefit of simplifying the server implementation.
Debugging this process can also be much simpler because you can reuse
your debugging tools at each link in the system.

So, in short, I like this design and would like to use it for internal
services used to build our platform. Wouldn't it be nice if our services
were exported in the same fashion? It would allow for a more seamless
upgrade of internal services with little downtime. It would also allow
for code to be separated more readily and tested individually. When you
have issues with memory fragmentation or leaking in python it is contained
to the particular service and much easier to debug (as compared to a
monolithic app server). This has nothing to do with the choice of exporting
the command via Mongo, but just a side-effect of moving to this design.

So, in my fork of txmongo (a Mongo driver for twisted), I have implemented
a way to implement both clients as well as Mongo servers. You can export
RPCs as Mongo commands. This means you can use the Mongo command line client
to test things. You can implement how you want failover to work yourself.
You can use official Mongo drivers to write your RPC clients. Getting to
build on their work for handling failover, reconnection, and what not.
Standing on the shoulders of giants.

So lets take a look at how this might look in Twisted. We will start by
writing a service that we will eventually export as a Mongo server.

  1. from twisted.application import service
  2. class EchoService(service.Service):
  3. def do_ping(self, **kwargs):
  4. return {'pong': kwargs}
  5.  

Now lets create a TCP listener that will listen and proxy commands to the
above service.

  1. import bson
  2. from twisted.internet import defer
  3. from twisted.python import failure
  4. from txmongo.protocol import MongoProtocol, Reply, REPLY_QUERY_FAILURE
  5. from pymongo.errors import OperationFailure
  6. import types
  7.  
  8. class ServiceProtocol(MongoProtocol):
  9. def __init__(self, service):
  10. MongoProtocol.__init__(self)
  11. self.service = service
  12.  
  13. def handle_QUERY(self, request):
  14. if request.collection.endswith('.$cmd'):
  15. query = request.query.decode(as_class=dict)
  16. if '$query' in query:
  17. query = query['$query']
  18. # first key is the name of the command
  19. name = 'do_' + query.iterkeys().next().lower()
  20. func = getattr(self.service, name, None)
  21. if not func:
  22. # needed for pymongo to connect
  23. if name == 'do_ismaster':
  24. func = lambda *a, **kw: {'ismaster': True, 'ok': 1}
  25. if callable(func):
  26. df = defer.maybeDeferred(func, **query)
  27. df.addCallback(self._queryCb, request.request_id)
  28. df.addErrback(self._queryEb, request.request_id)
  29. return
  30.  
  31. ex = OperationFailure('No such command: ' + name)
  32. self._queryEb(failure.Failure(ex), request.request_id)
  33.  
  34. def _queryCb(self, result, request_id):
  35. if not isinstance(result, types.ListType):
  36. result = [result]
  37. result = [b if isinstance(b, bson.BSON) else bson.BSON.encode(b)
  38. for b in result]
  39. reply = Reply(response_to=request_id, documents=result)
  40. return self.send(reply)
  41.  
  42. def _queryEb(self, reason, request_id):
  43. doc = {'$err': str(reason.value), 'ok': 0}
  44. if hasattr(reason.value, 'code'):
  45. doc['code'] = reason.value.code
  46. ret = [bson.BSON.encode(doc)]
  47. reply = Reply(response_to=request_id, documents=ret,
  48. response_flags=REPLY_QUERY_FAILURE)
  49. return self.send(reply)

Now the simple part, lets wire up a TCP listener using twisted to accept
Mongo connections and bridge the service.

  1. from twisted.internet import protocol, reactor
  2.  
  3. service = EchoService()
  4. factory = protocol.Factory()
  5. factory.protocol = lambda: ServiceProtocol(service)
  6. reactor.listenTCP(27017, factory)
  7. reactor.run()

So now we can go ahead and use the Mongo command line interface to communicate
with this service. Lets do that and execute our new ping command.

$ mongo
> db.runCommand({ping: 1, hello: 'world'})
{ "pong" : { "ping" : 1, "hello" : "world" } }

We can do the same thing using the pymongo driver from Python.

>>> import pymongo
>>> pymongo.Connection().test['$cmd'].find_one({"ping": 1, "hello": "world"})
{u'pong': {u'ping': 1, u'hello': u'world'}}

I haven't decided whether or not I want to move forward with designing services
in this manner, but I do think it is interesting and warrants more exploration
and experimentation. I really like the ability to compartmentalize my various
internal services and the power that gives me in testing, monitoring, and
application lifecycle.

I haven't made this easy to do with my C driver yet, but I do plan on adding
the functionality soon.

As always, patches welcomed!

[1] https://github.com/chergert/mongo-glib
[2] https://github.com/chergert/mongo-async-python-driver