package org.hpsearch.demo.CrisisGridServices; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.util.Properties; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.hpsearch.proxy.ProxyWebService; import org.hpsearch.proxy.WrapperProxyWebService; import cgl.narada.jms.NBJmsInitializer; import cgl.workflow.common.Debug; /** * This is the equivalent of the MapService in CrisisGrid Demo. Purpose of this * service is to start a JMS connection on the specified topic and send the * files t2_cn.asc and t2_fd.asc and then exit
* Created: May 13, 2004
* Modified from MapServer.java written by Sunghoon Ko(suko@indiana.edu) * * @author Harshawardhan Gadgil (hgadgil@grids.ucs.indiana.edu) */ public class MapService extends WrapperProxyWebService implements javax.jms.MessageListener { public MapService() { } /* * (non-Javadoc) * * @see org.hpsearch.proxy.RunnableProxyWebService#process() */ public void process() throws Exception { try { String[] fileName = { "t2_cn.asc", "t2_fd.asc"}; for (int i = 0; i < fileName.length; i++) { // create two file references File inputFile = new File(fileName[i]); FileInputStream fis = new FileInputStream(inputFile); byte[] buffer = new byte[(int) inputFile.length()]; writeTextMessage("FileStatus", "TerrainFileName", "X" + fileName[i]); System.out.println("++++ MapService Sending: " + fileName[i]); int size = 1024 * 2; byte[] tempByte = new byte[size]; int length = 0; while ((length = fis.read(tempByte)) > -1) writeBytesMessage(tempByte, length); if (length < 0) writeTextMessage("FileStatus", "TerrainFileEnd", " "); fis.close(); } writeTextMessage("FileStatus", "AllTerrainFilesSendingEnd", " "); } catch (Exception e) { } serviceFinished(); } public void startService() throws Exception { // Start the JMS connection; allows messages to be delivered setStatus(ProxyWebService.SERVICE_RUNNING); connection.start(); process(); } public void stopService() throws Exception { // Stop the JMS connection; connection.stop(); setStatus(ProxyWebService.SERVICE_STOPPED); } public void resumeService() throws Exception { // Start the JMS connection; allows messages to be delivered connection.start(); setStatus(ProxyWebService.SERVICE_RUNNING); } public void suspendService() throws Exception { // Stop the JMS connection connection.stop(); setStatus(ProxyWebService.SERVICE_SUSPENDED); } public void initService(InputStream[] in, OutputStream[] out, String[] params) { topicString = "MapServer"; initJMSConnection(); } public String[] serviceExceptions() { return new String[0]; } // Stuff private to the service private void initJMSConnection() { try { Properties props = new Properties(); props.put("hostname", InetAddress.getLocalHost().getHostName()); props.put("portnum", "3045"); NBJmsInitializer ini = new NBJmsInitializer(props, "niotcp", 9); // Lookup a JMS connection factory TopicConnectionFactory conFactory = (TopicConnectionFactory) ini .lookup(); connection = conFactory.createTopicConnection("guest", "password"); // Create a JMS session object pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // Lookup a JMS topic Topic streamTopic = pubSession.createTopic(topicString); Debug.Log("MapService", "Publishing to topic {" + topicString + "}", Debug.INFO); // Create a JMS publisher and subscriber publisher = pubSession.createPublisher(streamTopic); subscriber = subSession.createSubscriber(streamTopic); // Set a JMS message listener subscriber.setMessageListener(this); } catch (Exception e) { Debug.Log("MapService", "Error: " + topicString + "\n" + e, Debug.ERROR); setStatus(ProxyWebService.INIT_FAILURE); } setStatus(ProxyWebService.SERVICE_READY); } public void writeTextMessage(String name, String value, String text) throws JMSException { TextMessage message = pubSession.createTextMessage(); message.setText(text); message.setStringProperty(name, value); publisher.publish(message, DeliveryMode.NON_PERSISTENT, 0, 2000); } /* Create and send message using topic publisher */ public void writeBytesMessage(byte[] bMsg, int length) throws JMSException { BytesMessage bMessage = pubSession.createBytesMessage(); bMessage.writeInt(length); bMessage.writeBytes(bMsg, 0, length); bMessage.setStringProperty("TerrainFile", "ByteData"); publisher.publish(bMessage, DeliveryMode.NON_PERSISTENT, 0, 2000); } public void onMessage(Message arg0) { } private TopicSession pubSession, subSession; private TopicPublisher publisher; private TopicSubscriber subscriber; private TopicConnection connection; private String bridgeId; private String topicString; }