/* Author: Jeff Dalton * Updated: Sun Dec 2 02:07:31 2001 by Jeff Dalton * Copyright: (c) 2001, AIAI, University of Edinburgh */ package ix.util; import java.util.*; import java.io.*; import java.net.*; import ix.iface.util.XML; // import ix.util.lisp.Lisp; // for Lisp.readFromString(String) import ix.util.lisp.*; /** * Support for interprocess communication in a framework that allows * different communication strategies to be used in a uniform way. * A default communication strategy that sends serialized objects * via sockets is included.

* * The framework specifies certain entities only a in very general * way. Communication strategies that are meant to be interchangeable * must therefore follow shared conventions - not enforced by the * framework - chiefly concerning the objects used to represent * "destinations" and implementations of the InputMessage interface.

* * Note that a "destination" need not contain all the information * needed for communication with the corresponding agent. Instead, * the destination may be merely a name, perhaps as a String, with * the rest of the information stored elsewhere. */ public class IPC { private IPC() { } // can't instantiate /** * The communication strategy used by the static methods of this * class that do not have a communication strategy as a parameter. */ static CommunicationStrategy defaultCommunicationStrategy = new SimpleIXCommunicationStrategy() {}; /** * Sets the communication strategy used when none is explicitly * specified. */ public static void setDefaultCommunicationStrategy (CommunicationStrategy s) { Debug.noteln("Setting default communication strategy to", s); defaultCommunicationStrategy = s; } /** * Sets the communication strategy used when none is explicitly * specified. The strategyName is interpreted by * the getCommunicationStrategy(String) method. * * @see #getCommunicationStrategy(String) */ public static void setDefaultCommunicationStrategy(String strategyName) { setDefaultCommunicationStrategy (getCommunicationStrategy(strategyName)); } /** * Returns a CommunicationStrategy based on a name that is either * a special abbreviation or the name of a class. The algorithm * is as follows: *

    *
  1. if name is "", * use SimpleIXCommunicationStrategy; *
  2. else if name is "xml", * use SimpleIXXMLCommunicationStrategy; *
  3. else if a class named name exists, * use that class; *
  4. else if a class named * ix.name.NameCommunicationStrategy * exists, where Name is name with its first * character changed to upper case, * use that class; *
  5. else throw an IllegalArgumentException. *
* * @see IPC.SimpleIXCommunicationStrategy * @see IPC.SimpleIXXMLCommunicationStrategy * @see #setDefaultCommunicationStrategy(String strategyName) */ public static CommunicationStrategy getCommunicationStrategy(String strategyName) { // Try special case abbreviations if (strategyName.equals("")) return new SimpleIXCommunicationStrategy(); else if (strategyName.equals("xml")) return new SimpleIXXMLCommunicationStrategy(); Class c; if (// First try name as-is (c = findClassForName(strategyName)) != null || // Then as ix.name.NameCommunicationStrategy (c = findClassForName ("ix." + strategyName + "." + Util.capitalizeString(strategyName) + "CommunicationStrategy")) != null) // We seem to have a class ... try { return (CommunicationStrategy)c.newInstance(); } catch (Exception e) { Debug.noteException(e); throw new IllegalArgumentException ("Can't make communication strategy from " + strategyName + " because " + e); } else throw new IllegalArgumentException ("Can't find communication strategy class specified by " + Util.quote(strategyName)); } private static Class findClassForName(String name) { Debug.noteln("Looking for class", name); try { return Class.forName(name); } catch (ClassNotFoundException e) { return null; } } /** * Sends an object to the specified destination using the default * communication strategy. The agents who are sent these objects * must be using a compatible strategy. The recipient should in * effect get a copy of the object that was sent, but the exact * relationship between the two objects depends on the communication * strategy. */ public static void sendObject(Object destination, Object contents) { defaultCommunicationStrategy.sendObject(destination, contents); } /** * Like sendObject but catches any IPC.IPCException. It can be used * in cases where you don't want to detect a failure and don't want * an exception to be thrown any further. * * @see IPC.IPCException * @see #sendObject(Object, Object) */ public static void exceptionlessSend(Object destination, Object contents) { try { sendObject(destination, contents); } catch (IPCException e) { Debug.noteln("Send failure: " + e.getMessage()); } } /** * Does what is required for this agent to receive messages sent * using the default communication strategy. Here the "destination" * is the agent in which setupServer was called, not a * remote agent. The (remote) agents sending messages to the server * must be using a compatible communication strategy. Note that * setupServer does not assume that all messages are * "sending an object" in the sense of the sendObject method. */ public static void setupServer(Object destination, MessageListener listener) { defaultCommunicationStrategy.setupServer(destination, listener); } /** * An object that determines how various IPC operations are * performed. */ public static interface CommunicationStrategy { public void sendObject(Object destination, Object contents); public void setupServer(Object destination, MessageListener listener); } /** * An object that represents an incoming message. The InputMessage * provides a place for information that is not part of the message * contents but may be used for debugging or for operations such as * sending replies. In later versions, this interface may include * methods for accessing such information. */ public static interface InputMessage { public Object getContents(); } /** * A minimal implementation of InputMessage. */ public static class BasicInputMessage implements InputMessage { protected Object contents; public BasicInputMessage(Object contents) { this.contents = contents; } public Object getContents() { return contents; } } /** * An object that is notified when a message is received. */ public static interface MessageListener { public void messageReceived(InputMessage message); } /** * A CommunicationStrategy that provides "connections" analogous * to sockets and a visible mapping from destination names to the * data needed to establish a connection. */ public static interface SocketlikeCommunicationStrategy extends CommunicationStrategy { public Object getDestinationData(Object destination); public void setDestinationData(Object destination, Object data); public Connection connectTo(Object destination); public Connection getConnection(Object destination); } /** * An object that can send and receive. */ public static interface Connection { public void send(Object contents); public Object receive(); } /** * A mapping from destination names to the data needed to establish * connections with the corresponding agents. */ public static interface DestinationTable { public Object get(Object destination); public Object put(Object destination, Object data); } /** * A HashMap implementation of the DestinationTable interface. */ public static class BasicDestinationTable extends HashMap implements DestinationTable { public BasicDestinationTable() { } } /** * A DestinationTable that provides default host and port assignments * for standard I-X agents, suitable for use with an instance of * ObjectStreamCommunicationStrategy.

* * * * * * *
Name Host Port
IDEEL localhost 5040
ILEED localhost 5050
ITEST localhost 5060
*/ public static class SimpleIXDestinationTable extends BasicDestinationTable { public SimpleIXDestinationTable() { } { put("IDEEL", new ServiceAddress("localhost", 5040)); put("ILEED", new ServiceAddress("localhost", 5050)); put("ITEST", new ServiceAddress("localhost", 5060)); } } /** * An ObjectStream communication strategy that provides default * host and port assignments for standard I-X agents. * * @see IPC.SimpleIXDestinationTable */ public static class SimpleIXCommunicationStrategy extends ObjectStreamCommunicationStrategy { public SimpleIXCommunicationStrategy() { this.destinationTable = new SimpleIXDestinationTable(); } } /** * An ObjectStream communication strategy that provides default * host and port assignments for standard I-X agents and encodes * the message contents in XML rather than serializing. * * @see IPC.SimpleIXDestinationTable */ public static class SimpleIXXMLCommunicationStrategy extends XMLObjectStreamCommunicationStrategy { public SimpleIXXMLCommunicationStrategy() { this.destinationTable = new SimpleIXDestinationTable(); } } /** * A version of ObjectStreamCommunicationStrategy that encodes * the message contents in XML rather than serializing. */ public static class XMLObjectStreamCommunicationStrategy extends ObjectStreamCommunicationStrategy { public XMLObjectStreamCommunicationStrategy() { } public void sendObject(Object destination, Object contents) { super.sendObject(destination, XML.objectToXML(contents)); } public void setupServer(Object destination, MessageListener listener) { final MessageListener innerListener = listener; super.setupServer(destination, new MessageListener() { public void messageReceived(InputMessage message) { String xml = (String)message.getContents(); Object contents = XML.objectFromXML(xml); innerListener.messageReceived (new BasicInputMessage(contents)); } }); } } /** * A communication strategy in which a destination is mapped to a * host and port number, and objects are sent by writing their * serialization to a socket.

* * Command-line arguments / parameters: * *

     *    -name-server=host:port
     *    -run-as-name-server=boolean
     *    -host=host
     * 
* * -host is used when it is necessary to specify the host name * this agent should use when registering with the name-server. * * @see ix.util.Parameters */ public static class ObjectStreamCommunicationStrategy implements SocketlikeCommunicationStrategy { DestinationTable destinationTable = new BasicDestinationTable(); // dest -> data HashMap connectionTable = new HashMap(); // dest -> connection Object serverDestination = null; // see setupServer method public ObjectStreamCommunicationStrategy() { } /** The data must be a ServiceAddress. */ public void setDestinationData(Object destination, Object data) { ServiceAddress addr = (ServiceAddress)data; destinationTable.put(destination, addr); } public Object getDestinationData(Object destination) { Object data = destinationTable.get(destination); if (data == null && !destination.equals("name-server") && destinationTable.get("name-server") != null) { data = askNameServer(destination); // /\/: Save the address in the destinationTable? // Perhaps better not to. The connection is saved, // thus avoiding repeated name-sever requests, and // if the connection is lost, it might be better to // look the name up again. } if (data == null) throw new IPCException("Unknown IPC destination " + destination); return data; } public Object askNameServer(Object destination) { Debug.assert(destinationTable.get("name-server") != null, "No name server addr"); Object addr = sendRequest("name-server", destination); if (addr.equals("unknown")) { throw new IPCException ("Name server doesn't know about " + destination); } else { Debug.assert(addr instanceof ServiceAddress, "Bogus addr for " + destination, addr); return addr; } } public void setDestinationTable(DestinationTable table) { this.destinationTable = table; } public Connection connectTo(Object destination) { ServiceAddress addr = (ServiceAddress)getDestinationData(destination); Debug.noteln("Connecting to " + destination + " on " + addr.getHost() + " port " + addr.getPort()); Debug.assert(connectionTable.get(destination) == null, "connecting twice"); try { Socket s = new Socket(addr.getHost(), addr.getPort()); Connection c = new ObjectStreamConnection(s); connectionTable.put(destination, c); return c; } catch (IOException e) { Debug.noteException(e, false); e.fillInStackTrace(); throw new IPCException(e); } } public Connection getConnection(Object destination) { Connection c = (Connection)connectionTable.get(destination); if (c == null) { c = connectTo(destination); // connectionTable.put(destination, c); // done in connectTo } return c; } public void sendObject(Object destination, Object contents) { try { Connection c = getConnection(destination); c.send(contents); } catch (IPCException e) { discardAnyConnection(destination); e.fillInStackTrace(); throw e; } } public Object sendRequest(Object destination, Object contents) { try { Debug.noteln("Sending request to " + destination, contents); Connection c = getConnection(destination); c.send(contents); Object reply = c.receive(); Debug.noteln("Received reply from " + destination, reply); return reply; } catch (IPCException e) { discardAnyConnection(destination); e.fillInStackTrace(); throw e; } } public void discardAnyConnection(Object destination) { if (connectionTable.get(destination) != null) { Debug.noteln("Discarding connection to", destination); connectionTable.remove(destination); } else Debug.noteln("No connection to " + destination + " to discard."); } public void setupServer(Object destination, MessageListener listener) { // In this method, the destination is the agent who is setting // up the server, not a remote agent. We must avoid calling // getDestinationData(destination) because that might ask // the name-server. serverDestination = destination; // remember the destination setupNameServerAddress(); if (Parameters.haveParameter("run-name-server")) setupNameServer(); ObjectStreamServer server = new ObjectStreamServer(this, destination, listener); server.start(); } public static String defaultNameServerAddress = "localhost:5555"; protected void setupNameServerAddress() { // If the user's said "-no name-server", or in some other way // made the value of the "name-server" parameter be "false", // don't set up the address. if (Parameters.getParameter("name-server", "unspecified") .equals("false")) return; // If we already have a name-server addr and no name-server // parameter has been specified, we don't have to do anything. if (destinationTable.get("name-server") != null && !Parameters.haveParameter("name-server")) return; // Syntax is host:port, both required. String addr = Parameters.getParameter("name-server", defaultNameServerAddress); String[] parts = Util.breakStringAtFirst(addr, ":"); String host = parts[0]; Long port = (Long)Lisp.readFromString(parts[1]); Debug.noteln("Setting name-server addr " + host + ":" + port); setDestinationData("name-server", new ServiceAddress(host, port.intValue())); } protected void setupNameServer() { ServiceAddress a = (ServiceAddress)destinationTable.get("name-server"); Debug.assert(a != null, "Can't run name server, address unknown"); new ObjectStreamNameServer(this, a.port) .start(); } } /** * A Thread that accepts connections to a ServerSocket * and creates an object-reading thread for each connection. * Each of the object-reading threads notifies the specified * listener when an object is received. The listener's * messageReceived method should presumably therefore * be synchronized. */ public static class ObjectStreamServer extends Thread { ObjectStreamCommunicationStrategy strategy; Object destination; MessageListener listener; ServiceAddress addr; ServerSocket servSock; public ObjectStreamServer(ObjectStreamCommunicationStrategy strategy, Object destination, MessageListener listener) { this.strategy = strategy; this.destination = destination; this.listener = listener; addr = (ServiceAddress)strategy.destinationTable.get(destination); } public void run() { Debug.noteln("Server running for", destination); try { if (addr == null) { // If we haven't been told a port number, ask the // operating system for a free one. servSock = new ServerSocket(0); // It's not clear how we ought to get the host name. // Note that // InetAddress in_addr = servSock.getInetAddress(); // String host = in_addr.getHostName(); // just seems to get "0.0.0.0". /\/ String host = Parameters.haveParameter("host") ? Parameters.getParameter("host") // N.B. the following call might fail. : InetAddress.getLocalHost().getHostName(); addr = new ServiceAddress (host, servSock.getLocalPort()); } else { // In this case, we keep the addr, including host name, // obtained from the destination table. Could the name // end up being "localhost" when it shouldn't be? /\/ servSock = new ServerSocket(addr.getPort()); } // By this point, we should have a table entry for the // name-server if there's supposed to be one. if (strategy.destinationTable.get("name-server") != null) registerWithNameServer(); // ... our main loop ... while (true) { Socket s = servSock.accept(); Debug.noteln("Client connection", s); receiveFrom(new IPC.ObjectStreamConnection(s)); } } catch (IOException e) { Debug.noteException(e); e.fillInStackTrace(); throw new IPCException(e); } } protected void registerWithNameServer() { Debug.noteln("Registering " + destination + " as " + addr.host + ":" + addr.port); while (true) { try { Object reply = strategy.sendRequest ("name-server", Lisp.list("register", destination, addr)); if (reply.equals("ok")) return; } catch (IPCException e) { Debug.noteException(e, false); } if (!shouldWaitForNameServer()) return; } } protected boolean shouldWaitForNameServer() { // System.out.println // ("Could not register " + destination + // " with name-server at " + // strategy.destinationTable.get("name-server")); // Util.askLine("Type return when server is available"); try { return askAboutWaiting(); } catch (Exception e) { Debug.noteException(e, false); return true; } } private boolean askAboutWaiting() throws InterruptedException, java.lang.reflect.InvocationTargetException { final Object server_addr = strategy.destinationTable.get("name-server"); final boolean[] result = {true}; // Gak! /\/ javax.swing.SwingUtilities.invokeAndWait(new Runnable() { public void run() { Object[] message = { "Could not register " + destination + " with name-server at " + server_addr, "Do you want to try again?" }; switch(javax.swing.JOptionPane.showConfirmDialog(null, message, "Confirm", javax.swing.JOptionPane.YES_NO_OPTION)) { case javax.swing.JOptionPane.YES_OPTION: result[0] = true; break; case javax.swing.JOptionPane.NO_OPTION: result[0] = false; } } }); return result[0]; } protected void receiveFrom(final ObjectStreamConnection connection) { // An exception thrown by connection.receive() will propagate // beyond the run method and hence cause the thread to die, // which is more or less what we want. new Thread() { public void run() { Debug.noteln(destination + " receiver running ..."); while (true) { Debug.noteln(destination + " waiting to receive ..."); Object contents = connection.receive(); Debug.noteln(destination + " received", contents); listener.messageReceived (new BasicInputMessage(contents)); } } }.start(); } } /** * A Connection that is used to send and receive serialized objects * via a socket. */ public static class ObjectStreamConnection implements Connection { Socket sock; ObjectInputStream in; ObjectOutputStream out; public ObjectStreamConnection(Socket s) /* throws IOException */ { sock = s; /* /\/: For some reason, messages don't seem to get through if we create the Object streams right away. */ // in = new ObjectInputStream(sock.getInputStream()); // out = new ObjectOutputStream(sock.getOutputStream()); // ? do we need to put a buffer in there ? /\/ } public Socket getSocket() { return sock; } public void send(Object contents) { try { if (out == null) out = new ObjectOutputStream(sock.getOutputStream()); out.writeObject(contents); out.flush(); // Forget the object(s) just written, so that writing them // again will send their full state instead of just a ref. out.reset(); } catch (IOException e) { Debug.noteException(e); e.fillInStackTrace(); throw new IPCException(e); } } public Object receive() { try { if (in == null) in = new ObjectInputStream(sock.getInputStream()); return in.readObject(); } catch (ClassNotFoundException e) { Debug.noteException(e); e.fillInStackTrace(); throw new IPCException(e); } catch (IOException e) { Debug.noteException(e); e.fillInStackTrace(); throw new IPCException(e); } } } /** * A Thread that acts as a name-server on a specified port. */ public static class ObjectStreamNameServer extends Thread { ObjectStreamCommunicationStrategy strategy; int port; DestinationTable nameTable = new BasicDestinationTable(); TextAreaFrame textFrame; ServerSocket servSock; public ObjectStreamNameServer (ObjectStreamCommunicationStrategy strategy, int port) { this.strategy = strategy; this.port = port; // Set up the text frame and the server socket here, // rather than in the new thread, so that it's clear // when they've happened. textFrame = new TextAreaFrame (strategy.serverDestination + " Name-Server Status"); try { servSock = new ServerSocket(port); } catch (IOException e) { Debug.noteException(e); e.fillInStackTrace(); throw new IPCException(e); } } public void run() { Debug.noteln("Name-server running on port " + port); try { while (true) { Socket s = servSock.accept(); Debug.noteln("Client connection", s); serveClientOn(new IPC.ObjectStreamConnection(s)); } } catch (IOException e) { Debug.noteException(e); e.fillInStackTrace(); throw new IPCException(e); } } protected void serveClientOn(final ObjectStreamConnection connection) { // An exception thrown by connection.receive() will propagate // beyond the run method and hence cause the thread to die, // which is more or less what we want. new Thread() { public void run() { while (true) { Object contents = connection.receive(); ObjectStreamNameServer.this .handleMessage(connection, contents); } } }.start(); } protected synchronized void handleMessage (ObjectStreamConnection connection, Object contents) { Debug.noteln("Name-server received", contents); if (contents instanceof String) { ServiceAddress addr = (ServiceAddress)nameTable.get(contents); if (addr != null) connection.send(addr); else connection.send("unknown"); } else if (contents instanceof List) { List req = (List)contents; if (req.get(0).equals("register") && req.get(1) instanceof String && req.get(2) instanceof ServiceAddress) { recordRegistration((String)req.get(1), (ServiceAddress)req.get(2)); connection.send("ok"); } else { Debug.noteln("Ilegal name-server request", contents); } } else { Debug.noteln("Ilegal name-server request", contents); } } protected void recordRegistration(final String name, final ServiceAddress addr) { nameTable.put(name, addr); javax.swing.SwingUtilities.invokeLater(new Runnable() { public void run() { textFrame.appendLine(name + " at " + addr); } }); } } /** * An object that contains a host name and a port number. */ public static class ServiceAddress implements Serializable { public String host; public int port; public ServiceAddress(String host, int port) { this.host = host; this.port = port; } public String getHost() { return host; } public int getPort() { return port; } public String toString() { return "addr[" + host + ":" + port + "]"; } } /** * The exception thrown by IPC methods. These exceptions are * thrown in place of whatever exceptions are thrown internally * (such as IOExceptions) in order to hide the internal details. */ public static class IPCException extends RuntimeException { Throwable reason; public IPCException() { super(); } public IPCException(String message) { super(message); } public IPCException(Throwable reason) { super("Caused by " + reason.getMessage()); this.reason = reason; } public Throwable getReason() { return reason; } } } // Issues: // * The name SocketlikeCommunicationStrategy may be too specific; // what it really does, we might say, is to expose more of the // details. Connections objects are the most socket-like part; // the mapping from destination names to more elaborate "information" // objects is much more general. // // * Consider renaming "destination" to "homeDestination" (or some // similar name) when it refers to the agent setting up a server // to receive messages rather than referring to a remote agent.