| | |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.io.IOException; |
| | | import java.net.ConnectException; |
| | | import java.net.InetAddress; |
| | |
| | | private int maxReceiveDelay; |
| | | private int maxSendQueue; |
| | | private int maxReceiveQueue; |
| | | private Semaphore sendWindow; |
| | | private int maxSendWindow; |
| | | private int rcvWindow; |
| | | private int halfRcvWindow; |
| | | private int maxRcvWindow; |
| | | private int timeout = 0; |
| | | |
| | | /** |
| | | * Creates a new Changelog Broker for a particular SynchronizationDomain. |
| | |
| | | * @param maxSendQueue The maximum size of the send queue to use on |
| | | * the changelog server. |
| | | * @param maxSendDelay The maximum send delay to use on the changelog server. |
| | | * @param window The size of the send and receive window to use. |
| | | */ |
| | | public ChangelogBroker(ServerState state, DN baseDn, short serverID, |
| | | int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue, |
| | | int maxSendDelay ) |
| | | int maxSendDelay, int window) |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.serverID = serverID; |
| | |
| | | this.state = state; |
| | | replayOperations = |
| | | new TreeSet<FakeOperation>(new FakeOperationComparator()); |
| | | this.rcvWindow = window; |
| | | this.maxRcvWindow = window; |
| | | this.halfRcvWindow = window/2; |
| | | } |
| | | |
| | | /** |
| | |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | | InetAddress.getByName(hostname), Integer.parseInt(port)); |
| | | Socket socket = new Socket(); |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.connect(ServerAddr, 500); |
| | | session = new SocketSession(socket); |
| | | |
| | |
| | | */ |
| | | ServerStartMessage msg = new ServerStartMessage( serverID, baseDn, |
| | | maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue, |
| | | state); |
| | | halfRcvWindow*2, state); |
| | | session.publish(msg); |
| | | |
| | | |
| | |
| | | */ |
| | | session.setSoTimeout(1000); |
| | | startMsg = (ChangelogStartMessage) session.receive(); |
| | | session.setSoTimeout(0); |
| | | session.setSoTimeout(timeout); |
| | | |
| | | /* |
| | | * We must not publish changes to a changelog that has not |
| | |
| | | (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber))) |
| | | { |
| | | changelogServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | | break; |
| | | } |
| | |
| | | else |
| | | { |
| | | changelogServer = ServerAddr.toString(); |
| | | maxSendWindow = startMsg.getWindowSize(); |
| | | this.sendWindow = new Semaphore(maxSendWindow); |
| | | connected = true; |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | |
| | | * changes that this server has already processed, start again |
| | | * the loop looking for any changelog server. |
| | | */ |
| | | try |
| | | { |
| | | Thread.sleep(500); |
| | | } catch (InterruptedException e) |
| | | { |
| | | // TODO Auto-generated catch block |
| | | e.printStackTrace(); |
| | | } |
| | | checkState = false; |
| | | int msgID = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES; |
| | | String message = getMessage(msgID); |
| | |
| | | { |
| | | if (this.connected == false) |
| | | this.reStart(failingSession); |
| | | |
| | | if (msg instanceof UpdateMessage) |
| | | sendWindow.acquire(); |
| | | session.publish(msg); |
| | | done = true; |
| | | } catch (IOException e) |
| | | { |
| | | this.reStart(failingSession); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | this.reStart(failingSession); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | ProtocolSession failingSession = session; |
| | | try |
| | | { |
| | | return session.receive(); |
| | | SynchronizationMessage msg = session.receive(); |
| | | if (msg instanceof WindowMessage) |
| | | { |
| | | WindowMessage windowMsg = (WindowMessage) msg; |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | else |
| | | { |
| | | if (msg instanceof UpdateMessage) |
| | | { |
| | | rcvWindow--; |
| | | if (rcvWindow < halfRcvWindow) |
| | | { |
| | | session.publish(new WindowMessage(halfRcvWindow)); |
| | | rcvWindow += halfRcvWindow; |
| | | } |
| | | } |
| | | return msg; |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | if (e instanceof SocketTimeoutException) |
| | |
| | | */ |
| | | public void setSoTimeout(int timeout) throws SocketException |
| | | { |
| | | this.timeout = timeout; |
| | | session.setSoTimeout(timeout); |
| | | } |
| | | |
| | |
| | | { |
| | | // TODO to be implemented |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum receive window size. |
| | | * |
| | | * @return The maximum receive window size. |
| | | */ |
| | | public int getMaxRcvWindow() |
| | | { |
| | | return maxRcvWindow; |
| | | } |
| | | |
| | | /** |
| | | * Get the current receive window size. |
| | | * |
| | | * @return The current receive window size. |
| | | */ |
| | | public int getCurrentRcvWindow() |
| | | { |
| | | return rcvWindow; |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum send window size. |
| | | * |
| | | * @return The maximum send window size. |
| | | */ |
| | | public int getMaxSendWindow() |
| | | { |
| | | return maxSendWindow; |
| | | } |
| | | |
| | | /** |
| | | * Get the current send window size. |
| | | * |
| | | * @return The current send window size. |
| | | */ |
| | | public int getCurrentSendWindow() |
| | | { |
| | | if (connected) |
| | | return sendWindow.availablePermits(); |
| | | else |
| | | return 0; |
| | | } |
| | | } |