java - I need to mock a RabbitMQ in my unit Test -


i using rabbitmq in project.

i have in consumer code of client part of rabbitmq , connection need tls1.1 connect real mq.

i want test code in junit test , mock message delivery consumer.

i see in google several examples different tools how camel rabbit or activemq tools works amqp 1.0 , rabbitmq works in amqp 0.9 .

someone had problem?

thanks!

update

this code testing receive json queue.

package com.foo.foo.queue;  import java.io.file; import java.io.fileinputstream; import java.io.ioexception; import java.net.url; import java.security.*; import java.security.cert.certificateexception; import javax.net.ssl.*;  import org.apache.commons.lang3.stringutils; import org.apache.log4j.logmanager; import org.apache.log4j.logger; import org.json.jsonobject;  import com.foo.foo.constants.constants; import com.foo.foo.core.configurationcontainer; import com.foo.foo.policyfinders.policyfinder; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.queueingconsumer;  public class brokerthreadhlconsumer extends thread {  private static brokerthreadhlconsumer instance;  private static final logger log = logmanager.getlogger(brokerthreadhlconsumer.class);  private channel channel; private string queuename; private policyfinder policyfinder; private connection connection; private queueingconsumer consumer;  private boolean loop;  private brokerthreadhlconsumer() throws ioexception {     connectionfactory factory = new connectionfactory();     char[] keypassphrase = "clientrabbit".tochararray();     keystore keystorecacerts;     configurationcontainer configurationcontainer = configurationcontainer.getinstance();     string exchangename = configurationcontainer.getproperty(constants.exchange_name);     string rabbithost = configurationcontainer.getproperty(constants.rabbitmq_server_host_value);     try {         /* public key cacerts connect message queue*/         keystorecacerts = keystore.getinstance("pkcs12");         url resourcepublickey = this.getclass().getclassloader().getresource("certs/client.keycert.p12");         file filepublickey = new file(resourcepublickey.touri());         keystorecacerts.load(new fileinputstream(filepublickey), keypassphrase);         keymanagerfactory keymanager;          keymanager = keymanagerfactory.getinstance("sunx509");         keymanager.init(keystorecacerts, keypassphrase);          char[] trustpassphrase = "changeit".tochararray();         keystore tks;          tks = keystore.getinstance("jceks");          url resourcecacerts = this.getclass().getclassloader().getresource("certs/cacerts");         file filecacerts = new file(resourcecacerts.touri());          tks.load(new fileinputstream(filecacerts), trustpassphrase);          trustmanagerfactory tmf;         tmf = trustmanagerfactory.getinstance("sunx509");         tmf.init(tks);          sslcontext c = sslcontext.getinstance("tlsv1.1");         c.init(keymanager.getkeymanagers(), tmf.gettrustmanagers(), null);          factory.seturi(rabbithost);         factory.usesslprotocol(c);         connection = factory.newconnection();         channel = connection.createchannel();         channel.exchangedeclare(exchangename, "fanout");         queuename = channel.queuedeclare().getqueue();         channel.queuebind(queuename, exchangename, "");      } catch (nosuchalgorithmexception e) {         e.printstacktrace();     } catch (certificateexception e) {         e.printstacktrace();     } catch (keystoreexception e) {         e.printstacktrace();     } catch (unrecoverablekeyexception e) {         e.printstacktrace();     } catch (keymanagementexception e1) {         e1.printstacktrace();     } catch (exception e) {         log.error("couldn't instantiate channel broker installed in " + rabbithost);         log.error(e.getstacktrace());         e.printstacktrace();     } }  public static brokerthreadhlconsumer getinstance() throws certificateexception, unrecoverablekeyexception, nosuchalgorithmexception, keystoreexception, keymanagementexception, ioexception {     if (instance == null)         instance = new brokerthreadhlconsumer();     return instance; }  public void run() {     if (policyfinder != null) {         try {             consumer = new queueingconsumer(channel);             channel.basicconsume(queuename, true, consumer);             log.info("consumer broker started , waiting messages");             loop = true;             while (loop) {                 try {                     queueingconsumer.delivery delivery = consumer.nextdelivery();                     string message = new string(delivery.getbody());                     jsonobject obj = new jsonobject(message);                     log.info("message received broker " + obj);                     if (stringutils.isnotempty(message) && !policyfinder.managepolicyset(obj)) {                         log.error("policyset error: error upgrading policyset");                     }                 } catch (exception e) {                     log.error("receiving message error");                     log.error(e);                 }             }         } catch (ioexception e) {             log.error("consumer couldn't start");             log.error(e.getstacktrace());         }     } else {         log.error("consumer couldn't start cause of policyfinder null");     } }  public void close() {     loop = false;     try {         consumer.getchannel().basiccancel(consumer.getconsumertag());     } catch (ioexception e) {         e.printstacktrace();     }     try {         channel.close();     } catch (ioexception e) {         e.printstacktrace();     }     try {         connection.close();     } catch (ioexception e) {         e.printstacktrace();     } }  public void setluxpolicyfinder(policyfinder policyfinder) {     this.policyfinder = policyfinder; } } 


Comments

Popular posts from this blog

javascript - Laravel datatable invalid JSON response -

java - Exception in thread "main" org.springframework.context.ApplicationContextException: Unable to start embedded container; -

sql server 2008 - My Sql Code Get An Error Of Msg 245, Level 16, State 1, Line 1 Conversion failed when converting the varchar value '8:45 AM' to data type int -