rabbitmq - Scatter Gather : Wait for all "Gather-Workers" to complete -
i getting rabbitmq.
i have scatter , gather proof of concept working.
my eventual setup be:
myclient (aka, scatter'er)
mygatherworker1
mygatherworker2
mygatherworker3
mygatherworkern (could go 10 @ point)
myrabbitmqserverhost
mygatherworker1-mygatherworkern wired "exchange" (i think that's right term) on myrabbitmqserverhost.
myclient send scatter-gather request myrabbitmqserverhost.
below "myclient" code.
i have minresponses variable. says "once n number of responses, consider scatter , gather done".
however, best strategy making sure configured "mygatherworker(s)" provide response?
i want have guaranteed "wait mygatherworkers" strategy. , not have put "n" value on client side configuration match (same) number of mygatherworker's.
i know adding more mygatherworker's system.
private icollection<customermovesummary> sendscattergathermessagetoqueues(list<string> customerids, imodel model, timespan timeout, string routingkey, int minresponses) { list<customermovesummary> returnitems = new list<customermovesummary>(); if (string.isnullorempty(_scattergatherresponsequeue)) { _scattergatherresponsequeue = model.queuedeclare().queuename; } if (_scattergatherconsumer == null) { _scattergatherconsumer = new queueingbasicconsumer(model); model.basicconsume(_scattergatherresponsequeue, true, _scattergatherconsumer); } string correlationid = guid.newguid().tostring(); ibasicproperties basicproperties = model.createbasicproperties(); basicproperties.replyto = _scattergatherresponsequeue; basicproperties.correlationid = correlationid; string jsonified = jsonconvert.serializeobject(customerids); byte[] messagebytes = encoding.utf8.getbytes(jsonified); model.basicpublish(_scattergatherexchange, routingkey, basicproperties, messagebytes); datetime timeoutdate = datetime.utcnow + timeout; while (datetime.utcnow <= timeoutdate) { basicdelivereventargs deliveryarguments; _scattergatherconsumer.queue.dequeue(500, out deliveryarguments); if (deliveryarguments != null && deliveryarguments.basicproperties != null && deliveryarguments.basicproperties.correlationid == correlationid) { //string response = encoding.utf8.getstring(deliveryarguments.body); string returnsummaryjsonified = encoding.utf8.getstring(deliveryarguments.body); customermovesummary summary = jsonconvert.deserializeobject<customermovesummary>(returnsummaryjsonified); returnitems.add(summary); if (returnitems.count >= minresponses) { console.writeline("returnitems.count >= minresponses !!"); break; } } } return returnitems; }
ps
if has "an exception occurred on mygatherworker", what's best practice handling that?
Comments
Post a Comment