ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • spring에서 ActiveMQ 접근 방법
    카테고리 없음 2009. 12. 7. 18:55
    자바로 개발하다 보면 여지없이 비동기로 처리할 일이 많이 생깁니다. 예를 들어 메일 전송이나 시간이 꽤 걸리는 DB 처리 등은 비동기를 많이 쓰게 됩니다. 제가 개발하는 곳에서도 비동기 처리 필요가 생겨서 고민하다가 ActiveMQ를 도입하게 되었습니다. ActiveMQ의 설치는 엄청 간단하고요. 실행해서 admin( http://ip:8161/admin ) 이 웹에 뜨면 ActiveMQ가 올라간 것입니다. 

    자 이제부터 기술하는 것은 Spring in action 2nd ed. 10장을 액기스만 뽑아서 설명드리는 것이니 보다 자세한 사항을 아시려면 10장을 자세히 읽어 보시면 되겠습니다. 아래 설명하는 모든 sample은 첨부의 test.zip에 묶여 있으니 참조하시고요. 

    우선 ActiveMQ는 queue와 topic이라는 구조를 지원합니다. queue는 아래 그림처럼 Sender와 Receiver가 1:1 관계를 가집니다. 

    이에 반해 topic은 Publisher와 Subscriber가 1:n 관계를 가지죠. 

    Sender나 Publisher는 message를 queue와 topic에 던지고 바로 응답을 받고요. ActiveMQ가 이 message를 receiver나 subscriber에 전달하죠. 만약 전달한 대상이 없다면 계속 keep을 하게 됩니다. 

    spring은 activeMQ를 JMS의 하나의 Provider로 접근합니다. spring에서 activeMQ를 접근할 때의 매직은 바로 JmsTemplate에 있습니다. 우선 beans.xml을 보도록 하죠.

    <?xml version="1.0" encoding="UTF-8"?>
    <beans 
      xmlns="http://www.springframework.org/schema/beans" 
      xmlns:amq="http://activemq.apache.org/schema/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
           
      <!-- a pooling based JMS provider -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
    <value>tcp://125.141.153.155:61616</value>
    </property>
    </bean>
      
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory" ref="connectionFactory"/>
    </bean>

    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="dbin.queue"/>
    </bean>

    <bean id="msgConverter" class="test.dbin.MsgConverterImpl"/>

    <bean id="msgMdp" class="test.dbin.MsgMDP"/>
    <bean id="purePojoMdp" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <property name="delegate" ref="msgMdp"/>
    <property name="defaultListenerMethod" value="handleMessage"/>
    <property name="messageConverter" ref="msgConverter"/>
    </bean>

    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="jmsFactory"/>
    <property name="destination" ref="queueDestination"/>
    <property name="messageListener" ref="purePojoMdp"/>
    </bean>
      <!-- Spring JMS Template -->
      <bean id="producerJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="queueDestination"/>
        <property name="messageConverter" ref="msgConverter"/>
      </bean>

    <bean id="producer" class="test.dbin.MsgProducerImpl">
       <property name="jmsTemplate" ref="producerJmsTemplate"/>
    </bean> 
    </beans>
    connectionFactory bean은 JMS connection을 기술합니다. 설치된 activeMQ 연결 스트링을 기술해 주면 됩니다. 그리고 이 connectionFactory는 connection pooling이 없어서 매번 접근할 때마다 connection connect/close를 반복합니다.. 따라서 pooling을 해 주기 위해 jmsFactory를 정의해 줍니다.
    그리고 본 예제에서는 queue를 사용하기 때문에 "dbin.queue"라는 이름을 가진 queueDestination을 정의했습니다. 

    이제 헛갈리는 것이 하나 나오는데 MessageConverter라는 것입니다. 예를 들어서 다음의 Msg라는 POJO가 있고 이를 Producer(Sender)와 Receiver가 통신한다고 해 보죠. 

    package test.dbin;

    public class Msg {
    private String name;
    private String value;
    public void setName(String name) {
    this.name = name;
    }
    public String getName() {
    return name;
    }
    public void setValue(String value) {
    this.value = value;
    }
    public String getValue() {
    return value;
    }
    public String toString() {
    return "name = " + name + ", value = " + value;
    }
    }
    MessageConverter는 이 POJO를 JMS 메시지로 encoding하고 다시 역으로 decoding해서 POJO로 바꾸는 역할을 하는 interface를 의미하며 보통 이를 구현하는 구현 객체를 하나 정의해서 사용합니다. 바로 MsgConverterImpl 입니다. MapMessage는 JMS에서 제공하는 메시지 객체로써 여기에 encoding, decoding을 수행해서 Msg를 다룹니다. 

    package test.dbin;

    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.Session;

    import org.springframework.jms.support.converter.MessageConversionException;
    import org.springframework.jms.support.converter.MessageConverter;

    /**
     * MapMessage로의 conversion을 담당하는 class
     * 
     *
     */
    public class MsgConverterImpl implements MessageConverter {
    public MsgConverterImpl() {
    }
    public Object fromMessage(Message message) throws JMSException, MessageConversionException {
    if( !(message instanceof MapMessage)) {
    throw new MessageConversionException("not MapMessage");
    }
    MapMessage mapMessage = (MapMessage)message;
    Msg msg = new Msg();
    msg.setName(mapMessage.getString("name"));
    msg.setValue(mapMessage.getString("value"));
    return msg;
    }
    public Message toMessage(Object object, Session session)  throws JMSException, MessageConversionException {
    if ( !(object instanceof Msg) ) {
    throw new MessageConversionException("not Msg");
    }
    Msg msg = (Msg)object;
    MapMessage mapMessage = session.createMapMessage();

    mapMessage.setString("name", msg.getName());
    mapMessage.setString("value", msg.getValue());
    return mapMessage;
    }
    }

    이번에는 Producer(Sender)쪽을 보기로 하죠. 대충 흐름도는 아래와 같습니다.
    JmsTemplate이 queue와 sender의 가운데에 끼게 되는데 ibatis와 spring 연동시 SqlMapClientTemplate와 역할이 아주 유사합니다. 위의 beans.xml의 producerJmsTemplate을 보면 connectionFactory, defaultDestination을 가져서 어떤 JMS Provider를 쓸지 어떤 queue를 쓸지 명시하고요, 마지막으로 messageConverter를 어떤 것을 쓸지 명시해 줍니다. 자, 그러면 아래와 같은 매우 간단한 producer가 나오게 됩니다.

    package test.dbin;

    import java.util.HashMap;
    import java.util.Map;

    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.Session;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;

    public class MsgProducerImpl implements MsgProducer {
    public void send(final Msg msg) {
    jmsTemplate.convertAndSend(msg);
    }
    private JmsTemplate jmsTemplate;
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
    }
    }

    package test.dbin;

    import java.util.HashMap;
    import java.util.Map;

    import org.springframework.jms.core.JmsTemplate;

    public interface MsgProducer {
    public void send(final Msg map);
    public void setJmsTemplate(JmsTemplate jmsTemplate);
    }
    메시지 발송 아주 간단하죠? MsgProducerImpl.send(Msg) 만 호출하면 바로 queue로 Msg가 encoding되어 전송되게 됩니다. 바로 JmsTemplate의 매직이죠.

    이번에는 Receiver쪽을 Message-Driven POJO로 구현해 보도록 하겠습니다. 
    우선 Msg를 처리하는 POJO인 MsgMDP 객체를 하나 구현합니다. 
    package test.dbin;

    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageListener;

    import org.springframework.jms.support.converter.MessageConverter;

    /**
     * Message Driven POJO
     *
     */
    public class MsgMDP {
    public void handleMessage(Msg msg) {
    // handles the message
    System.out.println("handler called");
    System.out.println(msg);
    }
    }
    handleMessage가 달랑 하나 있고 심지어 Msg까지 받습니다. 흠, 잡다한 것은 spring이 알아서 한다는 의미군요. 그러면 이 MsgMDP가 어떻게 메시지를 받게 되는지 beans.xml을 보도록 하죠. 매직은 바로 MessageListenerAdapter에 있습니다. purePojoMdp bean을 보면 delegate가 있는데 이것은 msgMdp로 설정되어 있죠. msgMdp bean은 바로 MsgMDP 클래스고요. 흠. defaultListenerMethod가 handleMessage로 되어 있네요. 얘기 끝이군요. messageConverter도 설정했으니 JMS Message가 Msg로 변하도록 MessageListenerAdapter가 모두 처리해 주는 것입니다. 
    이번에는 메시지를 받았다는 것을 어떻게 알 수 있을까요? 바로 SimpleMessageListenerContainer를 선언하면 됩니다. 여기에 messageListener로 위에서 선언한 purePojoMdp를 넣었죠. 이제 spring이 초기화되었을 때 SimpleMessageListenerContainer가 만들어지고 dbin.queue에 메시지가 도착하면 MsgMDP.handlerMessage(Msg)가 호출되게 되는 것입니다. 

    이번에는 테스트 main을 만들어 보죠. MsgProducerImpl에 해당하는 producer bean을 하나 얻어서 Msg를 send하면 MsgMDP.handleMessage(Msg)가 호출되는 것을 확인해 볼 수 있습니다. 저는 감탄했는데 여러분은 어떻게 생각하시는지요.

    package test.dbin;

    import java.net.InetAddress;
    import java.util.HashMap;

    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.xml.*;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.core.io.FileSystemResource;


    public class MsgTest {
    public static void main(String[] args) throws Exception {
    ApplicationContext ctx = new ClassPathXmlApplicationContext("beans.xml");
    Msg msg = new Msg();
    msg.setName("daniel yoon");
    msg.setValue("lullaby");
    // 전송 
    MsgProducer producer = (MsgProducer)ctx.getBean("producer");
    producer.send(msg);
    System.out.println(msg + " sent");
    }

    저는 다음의 코드로 2000년에 코딩을 한 기억이 있습니다. 물론 똑같지는 않지만 대충 흐름은 비슷합니다. 정말 straightfoward하죠. 위 소스가 Sender이고 아랫소스가 Receiver입니다. configuration 다 소스에서 선언하고 처리 객체들 다 만들어서 직접 메시지를 보냅니다.  이런 코드가 수십 군데 있다고 쳐 보세요. 그리고 한 3년 쯤 흘러서 이를 신기술 예를 들어 PassiveMQ로 바꾼다고 해 보죠. 재앙이겠죠? ^^ 

    ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection conn = null;
    Session session = null;
    try {
    conn = cf.createConnection();
    session = conn.createSession(false,
    Session.AUTO_ACKNOWLEDGE);
    Destination destination = new ActiveMQQueue("myQueue");
    MessageProducer producer = session.createProducer(destination);
    TextMessage message = session.createTextMessage();
    message.setText("Hello world!");
    producer.send(message);
    } catch (JMSException e) {
    } finally {
    try {
    if(session != null) { session.close(); }
    if(conn != null) { conn.close(); }
    } catch (JMSException ex) {}
    }


    ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
    Connection conn = null;
    Session session = null;
    try {
    conn = cf.createConnection();
    session = conn.createSession(false,
    Session.AUTO_ACKNOWLEDGE);
    Destination destination = new ActiveMQQueue("myQueue");
    MessageConsumer consumer = session.createConsumer(destination);
    conn.start();
    Message message = consumer.receive();
    TextMessage textMessage = (TextMessage) message;
    System.out.println("GOT A MESSAGE: " + textMessage.getText());
    } catch (JMSException e) {
    System.out.println(e);
    } finally {
    try {
    if(session != null) { session.close(); }
    if(conn != null) { conn.close(); }
    } catch (JMSException ex) {}
    }

    spring의 매직은 상당히 많은 설정을 xml에서 처리가능하다는 것이고요 이것은 분명히 유연성을 가져다 주고 있습니다. 이 예가 그것을 강력히 증명합니다. 
    ActiveMQ는 spring 인터페이스 말고도 Ruby와 PHP같은 다른 언어의 인터페이스도 제공하므로 spring외의 다른 환경에서도 사용 가치는 충분할 것입니다. 
Designed by Tistory.