本文共 2519 字,大约阅读时间需要 8 分钟。
import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;import com.ailk.biapp.ci.localization.cntv.service.IUserSynchronizationService;import com.asiainfo.biframe.utils.config.Configure;import com.asiainfo.biframe.utils.spring.SystemServiceLocator;public class JmsTopicReceiver { public void topicListener() { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; String activeUrl = Configure.getInstance().getProperty("ACTIVE_URL"); String topic = Configure.getInstance().getProperty("TOPIC_NAME"); connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, activeUrl); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //test-queue跟sender的保持一致,一个创建一个来接收 destination = session.createTopic(topic); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { TextMessage txtMsg = (TextMessage)message; String msg = txtMsg.getText(); //收到topic的时候增量同步用户 IUserSynchronizationService service = (IUserSynchronizationService)SystemServiceLocator.getInstance().getService("userSynchronizationServiceImpl"); service.startIncrementalSync(msg); } catch (Exception e) { e.printStackTrace(); } } }); } catch (Exception e) { e.printStackTrace(); } } }
转载地址:http://kefao.baihongyu.com/