/* Distributed synchronization via token passing. The horserace program was not too difficult because it relied on a centralized server. The disadvantage of this scenario is that twice as many communications are needed between clients and server before a synchronization barrier can be crossed by all clients. The token passing method is a de-centralized solution to the critical section problem between distributed processes. One or more token is passed around a virtural "ring". A process that wants to enter a critical section must wait for the token to arrive first. After leaving the critical section it passes the token onwards. However, even if you're not waiting for the token, you still need to listen for it, so you can pass it onto you neighbor - you can't hog the token if you don't need it! Each distributed agent implements the following critical functions passtoken : called from remote object when it passes a token to you. waitfortoken : called locally by an application before entering c.s. freetoken : called locally, before passing the implicit token onwards Note that freetoken does not call notify(): it merely indicates that the local application nolonger want to use the token. The passtoken function is the only one that's remotely invoked, and is declared in the interface (in dsem.java): public interface dsem extends Remote { void passtoken() throws RemoteException; } Usage: $ javac dagent.java // $ rmic dagent // deprecated $ rmiregistry & $ java dagent myip myname neighborip neighborname // include arbitrary fifth command-line arg for first (initiating) process --------------------------------------------------------------- Now consider what you would need to do to effect the following: 1. Make it possible to have n tokens on the ring, enabling up to n processes to access their critical sections at the same time. 2. It is possible that a host on the ring will go down. In such a case we may have to restablish the neighbor relationship, and recover any lost tokens. We can do this by having each process run a "reconfigure daemon". Since RMI starts threads automatically, the daemon can be implemented using a set of "remote-callback" methods. If a process can't invoke its neighbor's "passtoken" method (an exception will be thrown), it should pass the token to the next host in line. That's obvious. But what if the process that died was holding a token?! You would have to contact all the hosts to see who's still alive, how many tokens are currently on the ring, and retransmit the lost tokens. You'll obviously need to maintain a list of all the hosts on the ring (an array or list of ip addresses and server names would do). You'll probably need a three step process: a. "ping" all the hosts to see who's alive and who's holding what token. "ping" just means call some method on the remote object. b. transmit the information you learned to each host, so they can reconfigure their "neighbor" relationship if appropriate. This can be done by writing an "update" method that accepts the reconfiguration information. c. Retransmit lost tokens. The wait() call can be given an argument indicating the max ammount of milliseconds it should wait (consult java documentation for details). Using this primitive together with a boolean variable, you should be able to tell if a wait() was interrupted by a notify() or simply timed out. You can have your waits time out after, say, 10 seconds, and run the reconfig protocol (it may be that host holding the token is just taking a long time, in which case you'll just go back and wait). */ import java.rmi.*; import java.rmi.server.*; public class dagent extends UnicastRemoteObject implements dsem { public static dsem neighbor; // neighbor? private static dagent neigh; private boolean waiting = false; // is process waiting to FINISH cs? dagent() throws RemoteException { super(); } // called when token is passed to this process, // in RMI, this starts a new thread! public synchronized void passtoken() throws RemoteException { try { if (!waiting) { neighbor.passtoken(); // forward the token System.out.println("\t ... passed token ..."); } else // local process wants to use the token. { notify(); // assume one user thread per site System.out.println("\t ... using token to enter CS ..."); } } catch(Exception ie){ie.printStackTrace();} } public synchronized void waitfortoken() { waiting = true; System.out.println("\t ... waiting for token ..."); try { wait(); } catch(InterruptedException ie){System.out.println(ie);} } public synchronized void freetoken() { waiting = false; System.out.println("\t ... exiting CS ..."); } public void simulate(String args[]) { int x=0, y=0; while(++x<1000) { if (x>1 || args.length<=4) waitfortoken(); // uncomment for first process for(int i=0;i<5;i++) System.out.println("IN CS! -------------------------!"); System.out.println(); try {Thread.sleep(1000);} catch (Exception e) {System.out.println("EEEEE in cs: "+e); } freetoken(); try {passtoken();} catch (Exception e){System.out.println(e);} } } // simulate // args0=hostname // args1=bind name // args2 = neighbor hostname // args3=neighbor bind name // args4 exists if first process public static void main(String[] args) { try { System.setProperty("java.security.policy", "server.policy"); if (System.getSecurityManager() == null) { System.setSecurityManager(new RMISecurityManager()); } dagent agent = new dagent(); Naming.rebind("//"+args[0]+"/"+args[1],agent); // loop needed because initially nobody's agent's bound: boolean tryagain = true; System.out.println("finding neighbor ..."); while (tryagain || agent.neighbor==null) { try { agent.neighbor = (dsem)Naming.lookup("//"+args[2]+"/"+args[3]); tryagain = false; } catch (Exception ie) { tryagain = true;} } agent.simulate(args); } catch (Exception e) {System.out.println(e);} } // main } //dagent