日期:2014-05-16 浏览次数:20623 次
服务端代码:
package easyway.activemq.app.demo3;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* 消息的创建者
* @author longgangbai
*
*/
public class StreamMsgProducer {
public static void main(String[] args) {
ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml");
ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory");
Connection conn = null;
try {
conn = activeMqfactory.createConnection();
conn.start();
Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("streamMsg");
MessageProducer producer = session.createProducer(queue);
File file=new File("C:\\send.txt");
InputStream in = new FileInputStream(file);
byte[] buffer = new byte[2048];
int c = -1;
while ((c = in.read(buffer)) > 0) {
StreamMessage smsg = session.createStreamMessage();
smsg.writeBytes(buffer, 0, c);
producer.send(smsg);
System.out.println("send: " + c);
}
in.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
?
客户端代码:
package easyway.activemq.app.demo3;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.StreamMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* 消息的消费者
* @author longgangbai
*
*/
public class StreamMsgConsumer {
public void receive() {
ClassPathXmlApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-jdbc.xml");
ActiveMQConnectionFactory activeMqfactory=(ActiveMQConnectionFactory)ctx.getBean("connectionFactory");
Connection conn = null;
try {
conn = activeMqfactory.createConnection();
conn.start();
Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue("streamMsg");
MessageConsumer consumer = session.createConsumer(queue);
OutputStream out = new FileOutputStream("c:\\receive.txt");
byte[] buffer = new byte[2048];
while (true) {
Message msg = consumer.receive(5000);
if (msg == null) {
break;
}
if (msg instanceof StreamMessage) {
StreamMessage smsg = (StreamMessage) msg;
int c = smsg.readBytes(buffer);
out.write(buffer, 0, c);
System.out.println("Receive: " + c);
}
}
out.close();
} catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
new StreamMsgConsumer().receive();
}
}
?
activemq的配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/