package org.hpsearch.demo.CrisisGridServices.RunOffModel; /** * Software License, Version 0.1 Copyright 2004 The Trustees of Indiana * University. All rights reserved. */ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.util.Enumeration; import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.hpsearch.demo.CrisisGridServices.ComputeService; public class RainFallComm implements javax.jms.MessageListener { private static final String RAINFALL_FILE = "rain.dat"; private ComputeService parent_RunoffModelMain; private TopicSession pubSession; private TopicSession subSession; private TopicPublisher rainfallPub; private TopicPublisher visualPublisher; private TopicSubscriber rainfallSub; private TopicConnection connection; private String userName; public RainFallComm(ComputeService parent) throws Exception { this.parent_RunoffModelMain = parent; } public void initializeSession(TopicConnection connection, String username) throws Exception { // Create a JMS session object pubSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); subSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); // Lookup a JMS topic Topic mapTopic = subSession.createTopic("MapServer"); TopicSubscriber mapSubscriber = subSession.createSubscriber(mapTopic); mapSubscriber.setMessageListener(this); // Lookup a JMS topic Topic rainfallTopic = subSession.createTopic("RainFall"); rainfallPub = pubSession.createPublisher(rainfallTopic); rainfallSub = subSession.createSubscriber(rainfallTopic); rainfallSub.setMessageListener(this); Topic visualizationTopic = pubSession.createTopic("Visualization"); visualPublisher = pubSession.createPublisher(visualizationTopic); } /////////////////////////////////////////////////////// /* Receive message from topic subscriber */ public byte[] bytesMsg; private File outputFile; private FileOutputStream out; public void onMessage(Message message) { int time = 0; float rainfall = 0; try { Enumeration propertyNames = message.getPropertyNames(); String propertyName = (String) propertyNames.nextElement(); String value = message.getStringProperty(propertyName); if (message.getJMSType().equals("BytesMessage")) { BytesMessage byteMessage = (BytesMessage) message; if (propertyName.equals("TerrainFile")) { int length = byteMessage.readInt(); byte[] buffer = new byte[length]; byteMessage.readBytes(buffer); out.write(buffer); } else if (propertyName.equals("RainfallData")) { time = byteMessage.readInt(); rainfall = byteMessage.readFloat(); System.out.println("time = " + time + ", rainfall = " + rainfall); if ((int) rainfall == 2) oldTime = 99; // new rainfall writeOnFileThenCompute(time, rainfall); } } else if (message.getJMSType().equals("TextMessage")) { TextMessage textMessage = (TextMessage) message; if (propertyName.equals("HostName")) { System.out.println("value = " + value); if (value.equals("RunoffModel")) { System.out.println("RunoffModel HostName = " + textMessage.getText()); } } else if (propertyName.equals("FileStatus")) { System.out.println("File Status value = " + value); if (value.equals("TerrainFileEnd")) { out.close(); } else if (value.equals("TerrainFileName")) { String text = textMessage.getText(); System.out.println("++++++ TerrainFileName = " + text); outputFile = new File(text); out = new FileOutputStream(outputFile); } else if (value.equals("AllTerrainFilesSendingEnd")) { System.out .println("All Terrain Files Received: Now Requesting NextRainfall Data"); //writeTextMessage2WebClient("RainfallComm", // "RainfalComm", "GotTerrainData"); writeTextMessage("aaa", "bbb", "SendNextRainfall"); } } } } catch (JMSException jmse) { jmse.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } //////////////////////////////////////////////////////// private static int oldTime = 99; private static float oldRainfall = 0; // write radar time series rainfall on file which will be read by C code. public void writeOnFileThenCompute(int currentTime, float currentRainfall) { System.out.println("[222222222222] " + oldTime); if (oldTime != 99) { try { FileOutputStream outFile = new FileOutputStream(RAINFALL_FILE); PrintStream pf = new PrintStream(outFile); pf.print(oldTime); pf.print(" "); pf.println(oldRainfall); pf.print(currentTime); pf.print(" "); pf.println(currentRainfall); pf.close(); System.out.println("[222222222222]"); // run runoff computation this.parent_RunoffModelMain.runoffModel(); // Q. Should the computation be over here... or any further action is necessary.... this.parent_RunoffModelMain.serviceFinished(); } catch (Exception e) { } } else {// ask to send next rainfall try { //this.parent_RunoffModelMain.runoffMaps4VisualComm.writeTextMessage2Rainfall( writeTextMessage("aaa", "bbb", "SendNextRainfall"); } catch (Exception e) { } } oldTime = currentTime; oldRainfall = currentRainfall; } /* Create and send message using topic publisher */ public void writeTextMessage(String name, String value, String text) throws JMSException { TextMessage message = pubSession.createTextMessage(); message.setText(text); message.setStringProperty(name, value); rainfallPub.publish(message, DeliveryMode.NON_PERSISTENT, 0, 2000); } /* Create and send message using topic publisher */ public void writeTextMessage2WebClient(String name, String value, String text) throws JMSException { TextMessage message = pubSession.createTextMessage(); message.setText(text); message.setStringProperty(name, value); visualPublisher.publish(message, DeliveryMode.NON_PERSISTENT, 0, 2000); } /* Close the JMS connection */ public void close() throws JMSException { connection.close(); } }