I am building a server which stores key/value data on top of Redis using Twisted Python.
The server receives a JSON dictionary via HTTP, which is converted into a Python dictionary and put in a buffer. Everytime new data is stored, the server schedules a task which pops one dictionary from the buffer and writes every tuple into a Redis instance, using a txredis client.
isLeaf = True
self.clientCreator = protocol.ClientCreator(reactor, Redis)
d = self.clientCreator.connectTCP(...)
self.redis = None
self.buffer = deque()
def render_POST(self, request):
task_id = request.requestHeaders.getRawHeaders('x-task-id')
return '<html><body>Error reading task_id</body></html>'
data = json.loads(request.content.read())
return ' '
task_id, dic = self.buffer.pop()
log.msg('Buffer: %s' % len(self.buffer))
m = yield self.redis.sismember('DONE', task_id)
# Simple check
if m == '1':
log.msg('%s already stored' % task_id)
log.msg('%s unpacking' % task_id)
s = yield self.redis.sadd('DONE', task_id)
d = defer.Deferred()
for k, v in dic.iteritems():
k = k.encode()
d.addCallback(self.redis.push, k, v)
Basically, I am facing a Producer/Consumer problem between two different connections, but I am not sure that the current implementation works well in the Twisted paradygm.
I have read the small documentation about producer/consumer interfaces in Twisted, but I am not sure if I can use them in my case.
Any critics is welcome: I am trying to get a grasp of event-driven programming, after too many years of thread concurrency.
The producer and consumer APIs in Twisted,
IConsumer, are about flow control. You don't seem to have any flow control here, you're just relaying messages from one protocol to another.
Since there's no flow control, the buffer is just extra complexity. You could get rid of it by just passing the data directly to the
write_on_redis method. This way
write_on_redis doesn't need to handle the empty buffer case, you don't need the extra attribute on the resource, and you can even get rid of the
callLater (although you can also do this even if you keep the buffer).
I don't know if any of this answers your question, though. As far as whether this approach "works well", here are the things I notice just by reading the code:
sismembercall or the
saddcall, you may lose tasks if either of these fail, since you've already popped them from the work buffer.
dalso means that any failed push will prevent the rest of the data from being pushed. It also passes the result of the
push(I'm assuming it returns a
Deferred) as the first argument to the next call, so unless
pushmore or less ignores its first argument, you won't be pushing the right data to redis.
If you want to implement flow control, then you need to have your HTTP server check the length of
self.buffer and possibly reject the new task - not adding it to
self.buffer and returning some error code to the client. You still won't be using
IProducer, but it's sort of similar.