Spring整合RabbitMQ

Spring整合RabbitMQ

1、首先有一个springMVC的demo,这里就不再介绍,自己提前准备。注意的事情为spring版本不能过低,否则会报错,我就陷入这个坑中了。spring采用4.2.3.RELEASE版本. 
2、安装rabbitmq服务,以前博客中详细教程:http://www.javaet.com/blog/53.htm。 
3、准备jar包

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.0.4.RELEASE</version>
</dependency>

生成Spring配置文件rabbitMq.xml

创建rabbitMq.xml和rabbitmq.properties配置文件。

rabbitmq.properties文件:

rabbit.host=localhost
rabbit.port=5672
rabbit.virtualHost=/
rabbit.username=guest
rabbit.password=guest
rabbit.cacheSize=20

rabbitMq.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
     
    <!--配置connection-factory,指定连接rabbit server参数 -->
    <rabbit:connection-factory id="connectionFactory" channel-cache-size="${rabbit.cacheSize}"
        username="${rabbit.username}" password="${rabbit.password}" host="${rabbit.host}" port="${rabbit.port}" virtual-host="${rabbit.virtualHost}" />
        
    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="connectionFactory" />
    
    <!--定义queue -->
    <rabbit:queue name="myQueue" durable="true" auto-delete="false" exclusive="false" />
        
    <!-- 定义direct exchange,绑定queue -->
    <rabbit:direct-exchange name="myExchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="myQueue" key="queue-key"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    
    <!-- 消息对象json转换类 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
    
    <!--定义rabbit template用于数据的接收和发送 -->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="myExchange" message-converter="jsonMessageConverter" />
        
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象
    <rabbit:listener-container
        connection-factory="connectionFactory">
        <rabbit:listener queues="myQueue" ref="messageReceiver" />
    </rabbit:listener-container>
        不过通过上面的方法配置启动失败了,不知道什么情况:有高手看到希望能够留言帮忙解决,非常感谢!
        org.springframework.beans.factory.BeanCreationException: 
            Error creating bean with name 'org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean#0': Invocation of init method failed; 
        Caused by: java.lang.NullPointerException
            at org.springframework.amqp.rabbit.config.ListenerContainerFactoryBean.createContainer(ListenerContainerFactoryBean.java:516)
    -->
    <bean id="listenerContainer" 
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">  
        <property name="queueNames" value="myQueue"></property> 
        <property name="connectionFactory" ref="connectionFactory"></property>  
        <property name="messageListener" ref="receiveListenerAdapter"></property>
    </bean>
    
    <bean id="receiveListenerAdapter" 
        class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">  
        <constructor-arg ref="messageReceiver" />  
        <property name="messageConverter" ref="messageConverter"></property>  
    </bean> 
    
    <bean id="messageConverter"  
        class="org.springframework.amqp.support.converter.SimpleMessageConverter">  
    </bean> 
    
    <!-- 消息接收者 -->
    <bean id="messageReceiver" class="com.mq2.ConsumerWithSpring"></bean>
    
</beans>

在spring的配置文件中引入rabbitMq.xml和rabbitmq.properties配置文件。

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop" 
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/context 
       http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/tx 
       http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
       http://www.springframework.org/schema/aop
       http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">
    
  <description>spring配置文件</description>
  
  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
       <property name="locations" value="classpath:/rabbitmq.properties" />
  </bean>
  
  <import resource="classpath:rabbitMq.xml" />
    
</beans>

创建消费者

public class ConsumerWithSpring extends MessageListenerAdapter {

    public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody(), "UTF-8");
         System.out.println("receive msg: " + msg);
    }
    
}

启动生产者

public class ProductorWithSpring {
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring-config.xml");
        RabbitTemplate template = ctx.getBean(RabbitTemplate.class);
        for(int i = 0; i < 10000; i++){
            //发送消息(交换机,路由key,发送消息)
            template.convertAndSend("myExchange", "queue-key", "Hello, world!");
            Thread.sleep(1000);// 休眠1秒
        }
        ctx.destroy();
    }
}

控制台显示如下

receive msg: "Hello, world!"

附件

Spring整合RabbitMQ.zip

RabbitMQ Java Client请参考:http://www.javaet.com/blog/55.htm

打赏

您看完此文章的心情是

  • 0人

  • 鼓掌

    0人

  • 草泥马

    0人

  • 愤怒

    0人

  • 鄙视

    0人

评论

  1. #1

    hef (2019-04-27 18:36)

    Spring命名空间不正确,将spring-beans-3.0.xsd 改为 spring-beans.xsd,将spring-rabbit-1.7.xsd 改为 spring-rabbit.xsd