package kraft.gateway; import kraft.TermInString; import java.net.*; import java.io.*; import Linja.*; public class SocketGateway { private String linda_host; //linda-server host private int linda_port; //linda-server port private String my_name; //gateway's kraft name private int my_port; //gateways' server socket port private GatewayMap map; //remote gateway entries map private boolean verbose=false; //debug message flag, default off // // the constructor w/o a known-map // public SocketGateway( String l_host, //linda server host int l_port, //linda server port String myname, //my name int myport, //my port no. to use boolean verbose) //whether to print debug messages throws Exception { this(l_host,l_port,myname,myport,null,verbose); //call overloaded constructor w/ null gateway map } // // the constructor w/ a known map provided // public SocketGateway( String l_host, //linda server host int l_port, //linda server port String myname, //my name int myport, //my port no. to use GatewayMap known_map, //known gateway map boolean verbose) throws Exception { // // use map if one is provided // if (known_map==null) map=new GatewayMap(); else this.map=known_map; // // init other parameters // this.linda_host=l_host; this.linda_port=l_port; this.my_port=myport; this.my_name=myname; this.verbose=verbose; } // // main gateway method // public void work() throws Exception { OutgoingGate out_gate; IncomingGate in_gate; //start the outgoing gate thread out_gate=new OutgoingGate(linda_host,linda_port,my_name,map,verbose); out_gate.start(); //start the incoming gate thread in_gate=new IncomingGate(linda_host,linda_port,my_name,my_port,map,verbose); in_gate.start(); } } // //the incoming-gate class // class IncomingGate extends Thread { LindaClient client; //linda-client String myname; //gateway's kraft name String mysite; //my local site name int myport; //the server socket port ServerSocket server_socket; //the server socket GatewayMap map; //the remote gateway entries map boolean verbose; //debug message flag // // constructor // public IncomingGate( String linda_host, //linda host int linda_port, //linda port String myname, //gateway's kraft name int myport, //server socket port GatewayMap map, //remote gateway entries map boolean verbose) throws Exception { super(); //call superclass constructor client=new LindaClient(linda_host,linda_port); //create linda client this.myport=myport; this.myname=myname; this.map=map; this.verbose=verbose; TermInString term; term=new TermInString(myname); //create term from myname if (term.functor.equals("krl")) { this.mysite=new TermInString(term.argument(1)).functor; //get site from 1st argument of krl/2 } } // // thread execution // public void run() { String message=null; try { server_socket=new ServerSocket(myport); //create server socket } catch (IOException e) { print_message("ERROR: fail to create server socket at port#"+myport); print_message("Stopping Incoming gate..."); stop(); } print_message("Incoming gate running..."); while (true) { message=receive_from_remote(); //wait for a message from remote print_message("Incoming:"+message); if (message!=null) handle_incoming_message(message); //handle the incoming message } } // // handle an incoming message // private void handle_incoming_message(String message) { TermInString term; try { term=new TermInString(message); //create term from string // // check for remote_monitor/3 // if (term.functor.equals("remote_monitor") && term.arity==3) //check for remote_monitor/3 { // // check for targetting site of remote_monitor/3 // String target_site; target_site=new TermInString(term.argument(2)).functor; //get site from 2nd argument of remote_monitor/3 if (!target_site.equals(mysite)) //check for correct site { print_message("remote_monitor/3 not to my site... discarded..."); return; } // // check for on/off flag of argument 1 in remote_monitor/3 // String flag; String monitor; flag=new TermInString(term.argument(1)).functor; if (flag.equals("on")) //remote_monitor(on,_,_) { String tuple; monitor=term.argument(3); //get monitor's name tuple="monitor("+monitor+")"; //compose monitor/1 string client.connect(); /*T.Francis*/ client.set_timeout(1000); //wait max 1000ms client.in_noblock(tuple);//In case an old one //has been left behind client.out(tuple); //out tuple string to linda server client.disconnect(); } else if (flag.equals("off")) //remote_monitor(off,_,_) { String mask; monitor=term.argument(3); //get monitor's name mask="monitor("+monitor+")"; //compose monitor/1 mask client.connect(); client.set_timeout(1000); client.in_noblock(mask); //wait max 1000ms /*T.Francis*/ //make it non-blocking just in case the // predicate is not there to remove! client.disconnect(); } else { print_message("Unknown remote_monitor:"+message); return; } } // // check for kraft_msg & correct targetting site // else { // String sender; String receiver; // String sender_site; String receiver_site; receiver=MessagingHandler.get_message_receiver(message); // // message to me // if (receiver.equals(myname)) { print_message("message to me:"+message); return; } receiver_site=MessagingHandler.get_name_site(receiver); // // discard message if not to my site // if (!receiver_site.equals(mysite)) { print_message("WARNING: message to foreign site discarded by incoming-gate:"+message); return; } // // get sender // // if ((sender=MessagingHandler.get_message_sender(message))==null) // { // print_message("WARNING: cannot find sender:"+message); // return; // } // if ((sender_site=MessagingHandler.get_name_site(sender))==null) // { // print_message("WARNING: cannot find site from sender:"+message); // return; // } // if (sender_site.equals(mysite)) // { // print_message("WARNING: message from local skipped by incoming-gate:"+message); // } // // otherwise put it into linda server // client.connect(); //connect to Linda client.out(message); //'out' message to Linda client.disconnect(); //disconnect from Linda } } catch (IOException e) { print_message("ERROR: cannot 'out' to Linda "); print_message("Stopping Incoming gate..."); stop(); //stop thread } catch (Exception e) { print_message("Invalid Prolog term"); return; } } // //wait and receive a string from the socket // private String receive_from_remote() { Socket incoming_socket; DataInputStream in_stream; String message; try { incoming_socket=server_socket.accept(); //accept incoming connection in_stream=new DataInputStream(incoming_socket.getInputStream()); //create stream message=in_stream.readUTF(); //read message incoming_socket.close(); //close incoming connection } catch (IOException e) { return(null); } return(message); } // // print verbose message depending on debug flag // private void print_message(String s) { if (verbose) { System.out.println(s); } } } // //the outgoing-gate class // class OutgoingGate extends Thread { LindaClient client; //linda-client String myname; //gateways' full kraft name String mysite; //my local site name GatewayMap map; //remote gateway entries map boolean verbose; //debug message flag // // constructor // public OutgoingGate( String linda_host, //linda server host int linda_port, //linda server port String myname, //gateway's kraft name GatewayMap map, //*** must provide a gateway map boolean verbose) //debug flag throws Exception { super(); //call superclass constructor client=new LindaClient(linda_host,linda_port); //create linda client this.myname=myname; this.map=map; this.verbose=verbose; TermInString term; term=new TermInString(myname); //create term from myname if (term.functor.equals("krl")) { this.mysite=new TermInString(term.argument(1)).functor; //get site from 1st argument of krl/2 } } // // thread execution // public void run() { String message=null; String[] sites; String mask[]; int i; print_message("Outgoing gate running..."); try { client.connect(); } catch (IOException e) { print_message("ERROR: cannot connect to Linda "); print_message("Stopping Outgoing gate..."); stop(); //stop thread } sites=map.get_all_sites(); //get all site names from gateway map mask=new String[sites.length+2]; //create array of masking string for "in" // // compose "in" masks // mask[0]=MessagingHandler.compose_in_template_from_receiver_name(myname); //looking for msg to myself mask[1]="remote_monitor(_,_,_)"; //remote_monitor/3 for (i=0;i