侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130562 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

ActiveMQ 持久化(数据库),查询队列剩余消息数、出队数的实现

2022-06-17 星期五 / 0 评论 / 0 点赞 / 143 阅读 / 12042 字

《ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现》分析了消息队列持久化保存,假如activemq服务器突然停止,服务器启动后,还可以继续查找队列中的消息。现在分析队列中的消息使用数

《ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现》分析了消息队列持久化保存,假如activemq服务器突然停止,服务器启动后,还可以继续查找队列中的消息。现在分析队列中的消息使用数据库持久化。

本人博客开始迁移,博客整个架构自己搭建及编码http://www.cookqq.com/

消息生产者:

package com.activemq.mysql;import java.io.File;import java.util.Properties;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MessageProducer;import javax.jms.Session;import javax.sql.DataSource;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.broker.BrokerService;import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter;import org.apache.commons.dbcp.BasicDataSourceFactory;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;/** * 消息持久化到数据库 * */public class MessageProductor {	  private static Logger logger=LogManager.getLogger(MessageProductor.class);	  private String username=ActiveMQConnectionFactory.DEFAULT_USER;	  private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD;	  private  String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;	  	  public static String queueName="acticemq_queue";	  private BrokerService brokerService;	  protected static final int messagesExpected = 3;	  	  protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(			    username,password,	            "tcp://localhost:61616?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);	/***	 * 创建Broker服务对象	 * @return	 * @throws Exception	 */	public BrokerService createBroker()throws Exception{			BrokerService  broker=new BrokerService();			JDBCPersistenceAdapter jdbc=createJDBCPersistenceAdapter();			broker.setPersistenceAdapter(jdbc);			jdbc.setDataDirectory(System.getProperty("user")+					File.separator+"data"+File.separator);			jdbc.setAdapter(new MySqlJDBCAdapter());			broker.setPersistent(true);			broker.addConnector("tcp://localhost:61616");			//broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);		return broker;	}	/**	 * 创建Broken的持久化适配器	 * @return	 * @throws Exception	 */	public JDBCPersistenceAdapter createJDBCPersistenceAdapter() throws Exception{		JDBCPersistenceAdapter jdbc=new JDBCPersistenceAdapter();		DataSource datasource=createDataSource();		jdbc.setDataSource(datasource);		jdbc.setUseDatabaseLock(false);		//jdbc.deleteAllMessages();		return jdbc;	}	/**	 * 创建数据源	 * @return	 * @throws Exception	 */	public DataSource createDataSource() throws Exception{		Properties props=new Properties();		props.put("driverClassName", "com.mysql.jdbc.Driver");		props.put("url", "jdbc:mysql://localhost:3306/activemq");		props.put("username", "root");		props.put("password", "16ds");		DataSource datasource=BasicDataSourceFactory.createDataSource(props);		return datasource;	}	/**	 * 启动BrokerService进程	 * @throws Exception	 */	public void init() throws Exception{		createBrokerService();		start();	}		public void start() throws Exception{		if(brokerService!=null){			brokerService.start();		}	}	public BrokerService createBrokerService() throws Exception{		if(brokerService==null){			brokerService=createBroker();		}		return brokerService;	}		public void sendMessage() throws JMSException{		Connection connection=connectionFactory.createConnection();		connection.start();		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);	    Destination destination = session.createQueue(queueName);        	    MessageProducer producer = session.createProducer(destination);	    producer.setDeliveryMode(DeliveryMode.PERSISTENT);		for(int i=0;i<messagesExpected;i++){			 logger.debug("Sending message " + (i+1) + " of " + messagesExpected);	         producer.send(session.createTextMessage("test message " + (i+1)));		}		connection.close();	}	public String getUrl() {		return url;	}	public void setUrl(String url) {		this.url = url;	}	public String getUsername() {		return username;	}	public void setUsername(String username) {		this.username = username;	}}

消息消费者:

package com.activemq.mysql;import javax.jms.Connection;import javax.jms.Destination;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.broker.BrokerService;import org.apache.log4j.LogManager;import org.apache.log4j.Logger;/*** * 消息持久化到数据库 */public class MessageCustomer {	private static Logger logger=LogManager.getLogger(MessageProductor.class);	  protected static final int messagesExpected = 5;	  	/***	 * 创建Broker服务对象	 * @return	 * @throws Exception	 */	public BrokerService createBroker()throws Exception{		BrokerService  broker=new BrokerService();	    broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);		return broker;	}	/**	 * 启动BrokerService进程	 * @throws Exception	 */	public void init() throws Exception{		BrokerService brokerService=createBroker();		brokerService.start();	}	/**	 * 接收的信息	 * @return	 * @throws Exception	 */	public int receiveMessage() throws Exception{		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(	            "tcp://localhost:61616?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);		Connection connection=connectionFactory.createConnection();		connection.start();		Session session = connection.createSession(true, Session.SESSION_TRANSACTED);		return receiveMessages(messagesExpected,session);	}		/**	 * 接受信息的方法	 * @param messagesExpected	 * @param session	 * @return	 * @throws Exception	 */	protected int receiveMessages(int messagesExpected, Session session) throws Exception {        int messagesReceived = 0;        for (int i=0; i<messagesExpected; i++) {            Destination destination = session.createQueue(MessageProductor.queueName);            MessageConsumer consumer = session.createConsumer(destination);            Message message = null;            try {            	logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected);                message = consumer.receive(2000);                logger.info("Received : " + message);                System.out.println("Received : " + message);                if (message != null) {                    session.commit();                    messagesReceived++;                }            } catch (Exception e) {            	logger.debug("Caught exception " + e);                session.rollback();            } finally {                if (consumer != null) {                    consumer.close();                }            }        }        return messagesReceived;    }}

生产者测试类:

package com.activemq.mysql;public class MessageProductorTest {		public static void main(String[] args) throws Exception {		MessageProductor  productor =new MessageProductor();		productor.init();		productor.sendMessage();		//productor.createBrokerService().stop();	}}

消费者测试类:

package com.activemq.mysql;public class MessageCustomerTest {  public static void main(String[] args) throws Exception {	  MessageCustomer  customer=new MessageCustomer();	  //customer.init();  //当两台机器在不同的服务器上启动客户端的broker进程	  customer.receiveMessage();	  }}

数据库形式:

activemq_acks:ActiveMQ的签收信息。

activemq_lock:ActiveMQ的锁信息。

activemq_msgs:ActiveMQ的消息的信息



参照博客: http://topmanopensource.iteye.com/blog/1066383

广告 广告

评论区