package org.hpsearch.demo.CrisisGridServices;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.Properties;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.hpsearch.proxy.ProxyWebService;
import org.hpsearch.proxy.WrapperProxyWebService;
import cgl.narada.jms.NBJmsInitializer;
import cgl.workflow.common.Debug;
/**
* This is the equivalent of the RainFallPublisher in CrisisGrid Demo. Purpose
* of this service is to start a JMS connection on the specified topic and
* listen for a "SendNextRainfall" command. On receiving this message it send
* the next rainfall reading
* Created: May 13, 2004
* Modified from RainfallPublisher.java written by Sunghoon Ko(suko@indiana.edu)
*
* @author Harshawardhan Gadgil (hgadgil@grids.ucs.indiana.edu)
*/
public class RainFallPublisher extends WrapperProxyWebService implements MessageListener {
public RainFallPublisher() {
}
public void process() throws Exception {
}
public void initService(InputStream[] in, OutputStream[] out, String[] params) {
initJMSConnection();
}
public String[] serviceExceptions() {
return new String[0];
}
public void startService() throws Exception {
// Start the JMS connection; allows messages to be delivered
connection.start();
setStatus(ProxyWebService.SERVICE_RUNNING);
}
public void stopService() throws Exception {
// Stop the JMS connection;
connection.stop();
setStatus(ProxyWebService.SERVICE_STOPPED);
}
public void resumeService() throws Exception {
// Start the JMS connection; allows messages to be delivered
connection.start();
setStatus(ProxyWebService.SERVICE_RUNNING);
}
public void suspendService() throws Exception {
// Stop the JMS connection
connection.stop();
setStatus(ProxyWebService.SERVICE_SUSPENDED);
}
// Stuff private to the service
private void initJMSConnection() {
try {
Properties props = new Properties();
props.put("hostname", InetAddress.getLocalHost().getHostName());
props.put("portnum", "3045");
NBJmsInitializer ini = new NBJmsInitializer(props, "niotcp", 777);
// Lookup a JMS connection factory
TopicConnectionFactory conFactory = (TopicConnectionFactory) ini.lookup();
connection = conFactory.createTopicConnection("guest", "password");
// Create a JMS session object
pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// Lookup a JMS topic
Topic rainfallTopic = pubSession.createTopic("RainFall");
Topic visual = pubSession.createTopic("Visualization");
Debug.Log("RainFallPublisher", "Publishing to topic {RainFall}", Debug.INFO);
// Create a JMS publisher and subscriber
publisher = pubSession.createPublisher(rainfallTopic);
visualPublisher = pubSession.createPublisher(visual);
subscriber = subSession.createSubscriber(rainfallTopic);
// Set a JMS message listener
subscriber.setMessageListener(this);
} catch (Exception e) {
Debug.Log("RainFallPublisher", "Error: " + topicString + "\n" + e, Debug.ERROR);
setStatus(ProxyWebService.INIT_FAILURE);
}
setStatus(ProxyWebService.SERVICE_READY);
}
public void writeMessage(int time, float rainfall) throws JMSException {
BytesMessage byteMessage = pubSession.createBytesMessage();
byteMessage.writeInt(time);
byteMessage.writeFloat(rainfall);
byteMessage.setStringProperty("RainfallData", "ByteData");
publisher.publish(byteMessage, DeliveryMode.NON_PERSISTENT, 0, 2000);
visualPublisher.publish(byteMessage, DeliveryMode.NON_PERSISTENT, 0, 2000); // also
// send
// to
// web
// for
// display
}
private TopicSession pubSession, subSession;
private TopicPublisher publisher, visualPublisher;
private TopicSubscriber subscriber;
private TopicConnection connection;
private String rainFallTopic, visualTopic;
private String topicString;
/*
* (non-Javadoc)
*
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
public void onMessage(Message message) {
try {
if (message.getJMSType().equals("TextMessage")) {
TextMessage textMessage = (TextMessage) message;
if (textMessage.getText().equals("SendNextRainfall")) generateRainfall();
}
} catch (JMSException jmse) {
jmse.printStackTrace();
}
}
////////////////////////////////////////////////////////////////
// generate time series rainfall data for demo
private static int timeStamp = 1230; // 12:30 PM
private static int hour = 12;
private static int min = 0;
private static int interval = 30; // every 30 min.
private static float rainfall = 0;
public void generateRainfall() {
int time = 0;
float[] data = new float[2];
min = min + interval;
if (min >= 60) {
min = min - 60;
hour = hour + 1;
if (hour >= 24) hour = 12;
}
time = hour * 100 + min;
rainfall = rainfall + (float) 2;
if (rainfall > 20) rainfall = (float) 2;
System.out.println("------- Sending RAINFALL data : <" + time + ", " + rainfall + ">");
try {
writeMessage(time, rainfall);
} catch (Exception e) {
e.printStackTrace();
}
}
}