-
Notifications
You must be signed in to change notification settings - Fork 3
/
asyncboto.py
272 lines (226 loc) · 10.5 KB
/
asyncboto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""
AsyncBoto
Thomas Parslow 2011
Make Boto operate in an Asynchronous way using Tornado.
I've only tested with SimpleDB so far (and then only a little bit)
but you should be able to do the same with all the other types of
connection in Boto.
THIS IS RATHER EARLY RIGHT NOW, IT'S NOT PROPERLY TESTED AND IT MAY
JUST BE A STUPID IDEA ALTOGETHER. I JUST WANT TO GET SOME FEEDBACK.
DON'T BE STUPID AND USE THIS FOR SOMETHING IMPORTANT. THANKS :)
"""
import boto.sdb.connection
import boto.s3.connection
import tornado.httpclient
import tornado.ioloop
import urlparse
import functools
import hashlib
class AsyncCallInprogress(Exception):
pass
class AsyncHttpResponse(object):
"""
Emulates the parts of httplib's HTTPResponse that Boto uses.
"""
def __init__(self, status, reason, body, headers):
self.status = status
self.reason = reason
self.body = body
self.headers = headers
def read(self):
return self.body
def getheader(self, name):
return self.headers.get(name,None)
class AsyncHttpConnection(object):
"""
This class replaces the httplib HTTPConnection used by boto. Only
the bits actually used by Boto are provided (at least I hope they
are!).
When a request is actually made (which happens in "getresponse")
the cache of previous requests (for this particular connection,
which would live no longer than a single boto async call) and if
one is found it returns it right away.
If no request is found one is started and a AsyncCallInprogress
exception is raised. This causes the call in progress to be
stopped for the moment. When the response actually comes through
the Boto call is made again, but this time when it gets to the bit
where it calls getresponse a response is ready and waiting.
This cycle could happen a few times during a an async call into
Boto but during the final call through all the requests will be in
the cache and thus the call will complete without raising an
AsyncCallInprogress.
"""
def __init__(self, aws_connection, fn, callback, errback):
self.aws_connection = aws_connection
self.fn = fn
self.callback = callback
self.errback = errback
self.host = None
self.is_secure = None
self.response_cache = {}
def _get_request_sig(self):
"""
request_sig is a tuple that can be used to look up a request.
It consists of the request details minus the timestamp and the
signature (which will differ if they're called even a tiny bit
of time apart from eachother).
The request sig is based on the current values of the member
variables.
"""
s = self.path.split("?",1)
path_minus_query = s[0]
query = s[1] if len(s) > 1 else ""
query = [(k,tuple(v)) for k,v in urlparse.parse_qs(query).items() if k not in ["Timestamp","Signature"]]
query.sort()
headers = [(k,tuple(v)) for k,v in self.headers.items() if k not in ["Date", "Authorization"]]
headers.sort()
# Hash this so that large body's aren't kept around when not
# needed
return hashlib.md5(repr((self.is_secure, self.host, self.method,
path_minus_query, tuple(query), self.data,
tuple(headers)))).hexdigest()
def request(self, method, path, data, headers):
"""
In the httplib version this would actually make the request,
but here we just store the params ready for use in the
getresponse method.
"""
self.method = method
self.path = path
self.data = data
self.headers = headers
def _callback(self, request_sig, tornado_response):
"""
The Tornado httpclient callback. Is partially applied to the
request_sig so the response can be stuffed into response_cache.
"""
response = AsyncHttpResponse(
tornado_response.code, "???", tornado_response.body, tornado_response.headers)
self.response_cache[request_sig] = response
#self.aws_connection._async_http_connection = self
#retry the call, with the response in place this time
self.aws_connection._call_async(self.fn, callback=self.callback, errback=self.errback, async_http_connection=self)
def getresponse(self):
"""
Checks response_cache, if we already have a response for this
request then return it otherwise start the Async call and
raise AsynCallInProgress.
"""
request_sig = self._get_request_sig()
if request_sig in self.response_cache:
# We already made the request and got the response, carry
# on!
return self.response_cache[request_sig]
# Ok, we need to make a request
http_client = tornado.httpclient.AsyncHTTPClient()
if self.is_secure:
schema = "https"
else:
schema = "http"
url = "%s:https://%s%s" % (schema, self.host, self.path)
request = tornado.httpclient.HTTPRequest(url,self.method, self.headers, self.data or None)
http_client.fetch(request, functools.partial(self._callback,request_sig))
raise AsyncCallInprogress
class AsyncConnectionMixin(object):
"""
Mixin to replace get_http_connection and put_http_connection in a
subclass of AWSAuthConnection from Boto to create an Async version
of a connection class.
All calls to methods in the new Async version must be wrapped in
call_async calls to make then operate asynchronously. For example:
sdb_conn.call_async(
lambda : sdb_conn.get_domain("my_test_domain").get_attributes("hello"),
callback=hello)
call_async sets up AsyncHttpConnection with the callback and also
traps the AsyncCallInProgress exception.
Exceptions are trapped and send to the "errback" callback.
"""
def call_async(self, fn, callback=lambda x : None, errback=None):
return self._call_async(fn, callback, errback)
def _call_async(self, fn, callback, errback, async_http_connection=None):
self._async_http_connection = async_http_connection
try:
if not self._async_http_connection:
self._async_http_connection = AsyncHttpConnection(self, fn, callback,errback)
try:
ret = fn()
except AsyncCallInprogress:
pass
except Exception, e:
if errback is None:
raise
else:
tornado.ioloop.IOLoop.instance().add_callback(lambda : errback(e))
else:
# When a call finally succeeds without raising
# AsyncCallInprogress we then need to pass control to the
# callback.
#
# This could also happen first time if the call doesn't
# involve any HTTP requests. But call_async would still
# return right away and have the callback called on the
# next interation of the IOLoop
tornado.ioloop.IOLoop.instance().add_callback(lambda : callback(ret))
finally:
del self._async_http_connection
def get_http_connection(self, host, is_secure):
"""
This is called to get an HTTP connection from the pool. This
is the point at which we inject our replacement http connection
"""
if hasattr(self, "_async_http_connection"):
self._async_http_connection.host = host
self._async_http_connection.is_secure = is_secure
return self._async_http_connection
else:
# This hasn't been called from within an async_call so
# just allow it to do a normal synchronous call
return super(AsyncConnectionMixin, self).get_http_connection(host,is_secure)
def put_http_connection(self, *args, **kwargs):
if not hasattr(self, "_async_http_connection"):
super(AsyncConnectionMixin, self).put_http_connection(*args, **kwargs)
# I'm defining some connections here but you shoudl be able to use
# this with any boto connection object. It's just that I haven't
# tested the ones not here at all (as opposed to the literally
# *minutes* of testing that I have given these ones)
class AsyncSDBConnection(AsyncConnectionMixin, boto.sdb.connection.SDBConnection):
pass
class AsyncS3Connection(AsyncConnectionMixin, boto.s3.connection.S3Connection):
pass
if __name__ == "__main__":
from tornado.options import define, options
import random
define("aws_access_key_id", type=str)
define("aws_access_key_secret", type=str)
tornado.options.parse_command_line()
sdb_conn = AsyncSDBConnection(options.aws_access_key_id, options.aws_access_key_secret)
def callback2(ret):
print "Return from get was: ", ret
def callback1(ret):
print "Return from put was:", ret
sdb_conn.call_async(lambda : sdb_conn.get_domain("mytest").get_attributes("boom"), callback=callback2)
sdb_conn.call_async(lambda : sdb_conn.create_domain("mytest").put_attributes("boom", {"hello": "goodbye"}), callback=callback1)
# Test errback
def callback3(ret):
assert False, "We were expecting an error!"
def errback3(exception):
print "Exception received: ", exception
sdb_conn.call_async(lambda : sdb_conn.get_domain("i_do_not_exist"), callback=callback3, errback=errback3)
s3_conn = AsyncS3Connection(options.aws_access_key_id, options.aws_access_key_secret)
bucket_name = "test_bucket." + str(random.random())
def s3callback1(bucket):
print "The Bucket:", bucket
def s3callback2(key):
print "The Key:", key
def s3callback3(_ignore):
def s3callback4(contents):
print "Contents:", contents
# clean up by deleting the bucket
s3_conn.call_async(lambda : key.delete(), lambda _ : s3_conn.call_async(lambda :bucket.delete()))
s3_conn.call_async(lambda : key.get_contents_as_string(), callback=s3callback4)
s3_conn.call_async(lambda : key.set_contents_from_string("hello!"), callback=s3callback3)
s3_conn.call_async(lambda : bucket.new_key("hello"), callback=s3callback2)
s3_conn.call_async(lambda : s3_conn.create_bucket(bucket_name), callback=s3callback1)
print "Start loop"
tornado.ioloop.IOLoop.instance().start()