Skip to content

Commit

Permalink
GH zatosource#274 - Final touches making fan-out/fan-in and parallel …
Browse files Browse the repository at this point in the history
…execution use common backend.
  • Loading branch information
dsuch committed Jan 21, 2015
1 parent 9be1dc8 commit 5afbe2d
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 6 deletions.
1 change: 0 additions & 1 deletion code/zato-common/src/zato/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ class CHANNEL(Attrs):
NOTIFIER_RUN = 'notifier-run' # New in 2.0
NOTIFIER_TARGET = 'notifier-target' # New in 2.0
PARALLEL_EXEC_CALL = 'parallel-exec-call' # New in 2.0
PARALLEL_EXEC_ON_FINAL = 'parallel-exec-on-final' # New in 2.0
PARALLEL_EXEC_ON_TARGET = 'parallel-exec-on-target' # New in 2.0
PUBLISH = 'publish' # New in 2.0
SCHEDULER = 'scheduler'
Expand Down
3 changes: 2 additions & 1 deletion code/zato-server/src/zato/server/base/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,12 +985,13 @@ def _on_message_invoke_service(self, msg, channel, action, args=None):
'zato.request_ctx.async_msg':msg,
'zato.request_ctx.in_reply_to':msg.get('in_reply_to'),
'zato.request_ctx.fanout_cid':zato_ctx.get('fanout_cid'),
'zato.request_ctx.parallel_exec_cid':zato_ctx.get('parallel_exec_cid'),
}

data_format = msg.get('data_format')
transport = msg.get('transport')

if msg.get('channel') in (CHANNEL.FANOUT_ON_TARGET, CHANNEL.FANOUT_ON_FINAL):
if msg.get('channel') in (CHANNEL.FANOUT_ON_TARGET, CHANNEL.FANOUT_ON_FINAL, CHANNEL.PARALLEL_EXEC_ON_TARGET):
payload = loads(msg['payload'])
else:
payload = msg['payload']
Expand Down
11 changes: 8 additions & 3 deletions code/zato-server/src/zato/server/pattern/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ParallelBase(object):
on_target_channel = None
on_final_channel = None
needs_on_final = False
request_ctx_cid_key = None

def __init__(self, source):
self.source = source
Expand Down Expand Up @@ -61,7 +62,7 @@ def invoke(self, targets, on_final, on_target=None, cid=None):
for name, payload in targets.items():
to_json_string = False if isinstance(payload, basestring) else True
self.source.invoke_async(name, payload, self.call_channel, to_json_string=to_json_string,
zato_ctx={'fanout_cid': cid})
zato_ctx={self.request_ctx_cid_key: cid})

return cid

Expand All @@ -71,9 +72,10 @@ def _log_before_callbacks(self, cb_type, cb_list, invoked_service):
def on_call_finished(self, invoked_service, response, exception):

now = invoked_service.time.utcnow()
cid = invoked_service.wsgi_environ['zato.request_ctx.fanout_cid']
cid = invoked_service.wsgi_environ['zato.request_ctx.{}'.format(self.request_ctx_cid_key)]
data_key = self.data_pattern.format(cid)
counter_key = self.counter_pattern.format(cid)
req_ts_utc, on_target = invoked_service.kvdb.conn.hmget(data_key, 'req_ts_utc', 'on_target')

with invoked_service.lock(self.lock_pattern.format(cid)):

Expand All @@ -83,12 +85,14 @@ def on_call_finished(self, invoked_service, response, exception):
data.response = response
data.exception = exception
data.ok = False if exception else True
data.service = invoked_service.name
data.req_ts_utc = req_ts_utc

# First store our response and exception (if any)
json_data = dumps(data)
invoked_service.kvdb.conn.hset(data_key, invoked_service.get_name(), json_data)

on_target = loads(invoked_service.kvdb.conn.hget(data_key, 'on_target'))
on_target = loads(on_target)

if logger.isEnabledFor(DEBUG):
self._log_before_callbacks('on_target', on_target, invoked_service)
Expand Down Expand Up @@ -125,4 +129,5 @@ def on_call_finished(self, invoked_service, response, exception):
def invoke_callbacks(self, invoked_service, payload, cb_list, channel, cid):
for name in cb_list:
if name:
logger.warn('123123 %r %r %s', name, payload, type(payload))
invoked_service.invoke_async(name, payload, channel, to_json_string=True, zato_ctx={'fanout_cid': cid})
1 change: 1 addition & 0 deletions code/zato-server/src/zato/server/pattern/fanout.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ class FanOut(ParallelBase):
on_target_channel = CHANNEL.FANOUT_ON_TARGET
on_final_channel = CHANNEL.FANOUT_ON_FINAL
needs_on_final = True
request_ctx_cid_key = 'fanout_cid'
2 changes: 1 addition & 1 deletion code/zato-server/src/zato/server/pattern/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ParallelExec(ParallelBase):
data_pattern = KVDB.PARALLEL_EXEC_DATA_PATTERN
call_channel = CHANNEL.PARALLEL_EXEC_CALL
on_target_channel = CHANNEL.PARALLEL_EXEC_ON_TARGET
on_final_channel = CHANNEL.PARALLEL_EXEC_ON_FINAL
request_ctx_cid_key = 'parallel_exec_cid'

def invoke(self, targets, on_target, cid=None):
return super(ParallelExec, self).invoke(targets, None, on_target, cid)

0 comments on commit 5afbe2d

Please sign in to comment.