java - I need to mock a RabbitMQ in my unit Test -
i using rabbitmq in project.
i have in consumer code of client part of rabbitmq , connection need tls1.1 connect real mq.
i want test code in junit test , mock message delivery consumer.
i see in google several examples different tools how camel rabbit or activemq tools works amqp 1.0 , rabbitmq works in amqp 0.9 .
someone had problem?
thanks!
update
this code testing receive json queue.
package com.foo.foo.queue; import java.io.file; import java.io.fileinputstream; import java.io.ioexception; import java.net.url; import java.security.*; import java.security.cert.certificateexception; import javax.net.ssl.*; import org.apache.commons.lang3.stringutils; import org.apache.log4j.logmanager; import org.apache.log4j.logger; import org.json.jsonobject; import com.foo.foo.constants.constants; import com.foo.foo.core.configurationcontainer; import com.foo.foo.policyfinders.policyfinder; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; import com.rabbitmq.client.queueingconsumer; public class brokerthreadhlconsumer extends thread { private static brokerthreadhlconsumer instance; private static final logger log = logmanager.getlogger(brokerthreadhlconsumer.class); private channel channel; private string queuename; private policyfinder policyfinder; private connection connection; private queueingconsumer consumer; private boolean loop; private brokerthreadhlconsumer() throws ioexception { connectionfactory factory = new connectionfactory(); char[] keypassphrase = "clientrabbit".tochararray(); keystore keystorecacerts; configurationcontainer configurationcontainer = configurationcontainer.getinstance(); string exchangename = configurationcontainer.getproperty(constants.exchange_name); string rabbithost = configurationcontainer.getproperty(constants.rabbitmq_server_host_value); try { /* public key cacerts connect message queue*/ keystorecacerts = keystore.getinstance("pkcs12"); url resourcepublickey = this.getclass().getclassloader().getresource("certs/client.keycert.p12"); file filepublickey = new file(resourcepublickey.touri()); keystorecacerts.load(new fileinputstream(filepublickey), keypassphrase); keymanagerfactory keymanager; keymanager = keymanagerfactory.getinstance("sunx509"); keymanager.init(keystorecacerts, keypassphrase); char[] trustpassphrase = "changeit".tochararray(); keystore tks; tks = keystore.getinstance("jceks"); url resourcecacerts = this.getclass().getclassloader().getresource("certs/cacerts"); file filecacerts = new file(resourcecacerts.touri()); tks.load(new fileinputstream(filecacerts), trustpassphrase); trustmanagerfactory tmf; tmf = trustmanagerfactory.getinstance("sunx509"); tmf.init(tks); sslcontext c = sslcontext.getinstance("tlsv1.1"); c.init(keymanager.getkeymanagers(), tmf.gettrustmanagers(), null); factory.seturi(rabbithost); factory.usesslprotocol(c); connection = factory.newconnection(); channel = connection.createchannel(); channel.exchangedeclare(exchangename, "fanout"); queuename = channel.queuedeclare().getqueue(); channel.queuebind(queuename, exchangename, ""); } catch (nosuchalgorithmexception e) { e.printstacktrace(); } catch (certificateexception e) { e.printstacktrace(); } catch (keystoreexception e) { e.printstacktrace(); } catch (unrecoverablekeyexception e) { e.printstacktrace(); } catch (keymanagementexception e1) { e1.printstacktrace(); } catch (exception e) { log.error("couldn't instantiate channel broker installed in " + rabbithost); log.error(e.getstacktrace()); e.printstacktrace(); } } public static brokerthreadhlconsumer getinstance() throws certificateexception, unrecoverablekeyexception, nosuchalgorithmexception, keystoreexception, keymanagementexception, ioexception { if (instance == null) instance = new brokerthreadhlconsumer(); return instance; } public void run() { if (policyfinder != null) { try { consumer = new queueingconsumer(channel); channel.basicconsume(queuename, true, consumer); log.info("consumer broker started , waiting messages"); loop = true; while (loop) { try { queueingconsumer.delivery delivery = consumer.nextdelivery(); string message = new string(delivery.getbody()); jsonobject obj = new jsonobject(message); log.info("message received broker " + obj); if (stringutils.isnotempty(message) && !policyfinder.managepolicyset(obj)) { log.error("policyset error: error upgrading policyset"); } } catch (exception e) { log.error("receiving message error"); log.error(e); } } } catch (ioexception e) { log.error("consumer couldn't start"); log.error(e.getstacktrace()); } } else { log.error("consumer couldn't start cause of policyfinder null"); } } public void close() { loop = false; try { consumer.getchannel().basiccancel(consumer.getconsumertag()); } catch (ioexception e) { e.printstacktrace(); } try { channel.close(); } catch (ioexception e) { e.printstacktrace(); } try { connection.close(); } catch (ioexception e) { e.printstacktrace(); } } public void setluxpolicyfinder(policyfinder policyfinder) { this.policyfinder = policyfinder; } }
Comments
Post a Comment