rabbitmq - Scatter Gather : Wait for all "Gather-Workers" to complete -


i getting rabbitmq.

i have scatter , gather proof of concept working.

my eventual setup be:

myclient (aka, scatter'er)

mygatherworker1

mygatherworker2

mygatherworker3

mygatherworkern (could go 10 @ point)

myrabbitmqserverhost

mygatherworker1-mygatherworkern wired "exchange" (i think that's right term) on myrabbitmqserverhost.

myclient send scatter-gather request myrabbitmqserverhost.

below "myclient" code.

i have minresponses variable. says "once n number of responses, consider scatter , gather done".

however, best strategy making sure configured "mygatherworker(s)" provide response?

i want have guaranteed "wait mygatherworkers" strategy. , not have put "n" value on client side configuration match (same) number of mygatherworker's.

i know adding more mygatherworker's system.

private icollection<customermovesummary> sendscattergathermessagetoqueues(list<string> customerids, imodel model, timespan timeout, string routingkey, int minresponses)         {             list<customermovesummary> returnitems = new list<customermovesummary>();               if (string.isnullorempty(_scattergatherresponsequeue))             {                 _scattergatherresponsequeue = model.queuedeclare().queuename;             }              if (_scattergatherconsumer == null)             {                 _scattergatherconsumer = new queueingbasicconsumer(model);                 model.basicconsume(_scattergatherresponsequeue, true, _scattergatherconsumer);             }              string correlationid = guid.newguid().tostring();             ibasicproperties basicproperties = model.createbasicproperties();             basicproperties.replyto = _scattergatherresponsequeue;             basicproperties.correlationid = correlationid;               string jsonified = jsonconvert.serializeobject(customerids);             byte[] messagebytes = encoding.utf8.getbytes(jsonified);              model.basicpublish(_scattergatherexchange, routingkey, basicproperties, messagebytes);              datetime timeoutdate = datetime.utcnow + timeout;             while (datetime.utcnow <= timeoutdate)             {                 basicdelivereventargs deliveryarguments;                 _scattergatherconsumer.queue.dequeue(500, out deliveryarguments);                 if (deliveryarguments != null && deliveryarguments.basicproperties != null                     && deliveryarguments.basicproperties.correlationid == correlationid)                 {                     //string response = encoding.utf8.getstring(deliveryarguments.body);                      string returnsummaryjsonified = encoding.utf8.getstring(deliveryarguments.body);                     customermovesummary summary = jsonconvert.deserializeobject<customermovesummary>(returnsummaryjsonified);                      returnitems.add(summary);                     if (returnitems.count >= minresponses)                     {                         console.writeline("returnitems.count >= minresponses !!");                         break;                     }                 }             }              return returnitems;         } 

ps

if has "an exception occurred on mygatherworker", what's best practice handling that?


Comments

Popular posts from this blog

javascript - Laravel datatable invalid JSON response -

java - Exception in thread "main" org.springframework.context.ApplicationContextException: Unable to start embedded container; -

sql server 2008 - My Sql Code Get An Error Of Msg 245, Level 16, State 1, Line 1 Conversion failed when converting the varchar value '8:45 AM' to data type int -