package org.hpsearch.demo.CrisisGridServices; import java.io.InputStream; import java.io.OutputStream; import java.net.InetAddress; import java.util.Properties; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.hpsearch.proxy.ProxyWebService; import org.hpsearch.proxy.WrapperProxyWebService; import cgl.narada.jms.NBJmsInitializer; import cgl.workflow.common.Debug; /** * This is the equivalent of the RainFallPublisher in CrisisGrid Demo. Purpose * of this service is to start a JMS connection on the specified topic and * listen for a "SendNextRainfall" command. On receiving this message it send * the next rainfall reading
* Created: May 13, 2004
* Modified from RainfallPublisher.java written by Sunghoon Ko(suko@indiana.edu) * * @author Harshawardhan Gadgil (hgadgil@grids.ucs.indiana.edu) */ public class RainFallPublisher extends WrapperProxyWebService implements MessageListener { public RainFallPublisher() { } public void process() throws Exception { } public void initService(InputStream[] in, OutputStream[] out, String[] params) { initJMSConnection(); } public String[] serviceExceptions() { return new String[0]; } public void startService() throws Exception { // Start the JMS connection; allows messages to be delivered connection.start(); setStatus(ProxyWebService.SERVICE_RUNNING); } public void stopService() throws Exception { // Stop the JMS connection; connection.stop(); setStatus(ProxyWebService.SERVICE_STOPPED); } public void resumeService() throws Exception { // Start the JMS connection; allows messages to be delivered connection.start(); setStatus(ProxyWebService.SERVICE_RUNNING); } public void suspendService() throws Exception { // Stop the JMS connection connection.stop(); setStatus(ProxyWebService.SERVICE_SUSPENDED); } // Stuff private to the service private void initJMSConnection() { try { Properties props = new Properties(); props.put("hostname", InetAddress.getLocalHost().getHostName()); props.put("portnum", "3045"); NBJmsInitializer ini = new NBJmsInitializer(props, "niotcp", 777); // Lookup a JMS connection factory TopicConnectionFactory conFactory = (TopicConnectionFactory) ini.lookup(); connection = conFactory.createTopicConnection("guest", "password"); // Create a JMS session object pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // Lookup a JMS topic Topic rainfallTopic = pubSession.createTopic("RainFall"); Topic visual = pubSession.createTopic("Visualization"); Debug.Log("RainFallPublisher", "Publishing to topic {RainFall}", Debug.INFO); // Create a JMS publisher and subscriber publisher = pubSession.createPublisher(rainfallTopic); visualPublisher = pubSession.createPublisher(visual); subscriber = subSession.createSubscriber(rainfallTopic); // Set a JMS message listener subscriber.setMessageListener(this); } catch (Exception e) { Debug.Log("RainFallPublisher", "Error: " + topicString + "\n" + e, Debug.ERROR); setStatus(ProxyWebService.INIT_FAILURE); } setStatus(ProxyWebService.SERVICE_READY); } public void writeMessage(int time, float rainfall) throws JMSException { BytesMessage byteMessage = pubSession.createBytesMessage(); byteMessage.writeInt(time); byteMessage.writeFloat(rainfall); byteMessage.setStringProperty("RainfallData", "ByteData"); publisher.publish(byteMessage, DeliveryMode.NON_PERSISTENT, 0, 2000); visualPublisher.publish(byteMessage, DeliveryMode.NON_PERSISTENT, 0, 2000); // also // send // to // web // for // display } private TopicSession pubSession, subSession; private TopicPublisher publisher, visualPublisher; private TopicSubscriber subscriber; private TopicConnection connection; private String rainFallTopic, visualTopic; private String topicString; /* * (non-Javadoc) * * @see javax.jms.MessageListener#onMessage(javax.jms.Message) */ public void onMessage(Message message) { try { if (message.getJMSType().equals("TextMessage")) { TextMessage textMessage = (TextMessage) message; if (textMessage.getText().equals("SendNextRainfall")) generateRainfall(); } } catch (JMSException jmse) { jmse.printStackTrace(); } } //////////////////////////////////////////////////////////////// // generate time series rainfall data for demo private static int timeStamp = 1230; // 12:30 PM private static int hour = 12; private static int min = 0; private static int interval = 30; // every 30 min. private static float rainfall = 0; public void generateRainfall() { int time = 0; float[] data = new float[2]; min = min + interval; if (min >= 60) { min = min - 60; hour = hour + 1; if (hour >= 24) hour = 12; } time = hour * 100 + min; rainfall = rainfall + (float) 2; if (rainfall > 20) rainfall = (float) 2; System.out.println("------- Sending RAINFALL data : <" + time + ", " + rainfall + ">"); try { writeMessage(time, rainfall); } catch (Exception e) { e.printStackTrace(); } } }