| | |
| | | import java.util.Map; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | | |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigEntry; |
| | |
| | | import org.opends.server.synchronization.ServerState; |
| | | import org.opends.server.synchronization.SynchronizationMessage; |
| | | import org.opends.server.synchronization.UpdateMessage; |
| | | import org.opends.server.synchronization.WindowMessage; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | /** |
| | |
| | | private MsgQueue msgQueue = new MsgQueue(); |
| | | private MsgQueue lateQueue = new MsgQueue(); |
| | | private Map<ChangeNumber, AckMessageList> waitingAcks = |
| | | new HashMap<ChangeNumber, AckMessageList>();; |
| | | new HashMap<ChangeNumber, AckMessageList>(); |
| | | private ChangelogCache changelogCache = null; |
| | | private String serverURL; |
| | | private int outCount = 0; // number of update sent to the server |
| | |
| | | private ServerWriter writer = null; |
| | | private DN baseDn = null; |
| | | private String serverAddressURL; |
| | | private int rcvWindow; |
| | | private int rcvWindowSizeHalf; |
| | | private int maxRcvWindow; |
| | | private ServerReader reader; |
| | | private Semaphore sendWindow; |
| | | private int sendWindowSize; |
| | | |
| | | private static Map<ChangeNumber, ChangelogAckMessageList> |
| | | changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>(); |
| | |
| | | * |
| | | * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. |
| | | * null if this is an incoming connection. |
| | | * @param changelogId The identifier of the changelog that creates this |
| | | * server handler. |
| | | * @param changelogURL The URL of the changelog that creates this |
| | | * server handler. |
| | | * @param windowSize the window size that this server handler must use. |
| | | * @param changelog the Changelog that created this server handler. |
| | | */ |
| | | public void start(DN baseDn) |
| | | public void start(DN baseDn, short changelogId, String changelogURL, |
| | | int windowSize, Changelog changelog) |
| | | { |
| | | rcvWindowSizeHalf = windowSize/2; |
| | | maxRcvWindow = windowSize; |
| | | rcvWindow = windowSize; |
| | | try |
| | | { |
| | | if (baseDn != null) |
| | | { |
| | | this.baseDn = baseDn; |
| | | changelogCache = Changelog.getChangelogCache(baseDn); |
| | | changelogCache = changelog.getChangelogCache(baseDn); |
| | | ServerState localServerState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage msg = |
| | | new ChangelogStartMessage(Changelog.getServerId(), |
| | | Changelog.getServerURL(), |
| | | baseDn, localServerState); |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | baseDn, windowSize, localServerState); |
| | | |
| | | session.publish(msg); |
| | | } |
| | |
| | | restartSendDelay = 0; |
| | | serverIsLDAPserver = true; |
| | | |
| | | changelogCache = Changelog.getChangelogCache(this.baseDn); |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | ServerState localServerState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage myStartMsg = |
| | | new ChangelogStartMessage(Changelog.getServerId(), |
| | | Changelog.getServerURL(), |
| | | this.baseDn, localServerState); |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | this.baseDn, windowSize, localServerState); |
| | | session.publish(myStartMsg); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | } |
| | | else if (msg.getClass() == Class.forName( |
| | | "org.opends.server.synchronization.ChangelogStartMessage")) |
| | | else if (msg instanceof ChangelogStartMessage) |
| | | { |
| | | ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg; |
| | | serverId = receivedMsg.getServerId(); |
| | |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | | if (baseDn == null) |
| | | { |
| | | changelogCache = Changelog.getChangelogCache(this.baseDn); |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | ServerState serverState = changelogCache.getDbServerState(); |
| | | ChangelogStartMessage outMsg = |
| | | new ChangelogStartMessage(Changelog.getServerId(), |
| | | Changelog.getServerURL(), |
| | | this.baseDn, serverState); |
| | | new ChangelogStartMessage(changelogId, changelogURL, |
| | | this.baseDn, windowSize, serverState); |
| | | session.publish(outMsg); |
| | | } |
| | | else |
| | | this.baseDn = baseDn; |
| | | this.serverState = receivedMsg.getServerState(); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | } |
| | | else |
| | | { |
| | |
| | | return; // we did not recognize the message, ignore it |
| | | } |
| | | |
| | | changelogCache = Changelog.getChangelogCache(this.baseDn); |
| | | changelogCache = changelog.getChangelogCache(this.baseDn); |
| | | |
| | | if (serverIsLDAPserver) |
| | | { |
| | |
| | | |
| | | writer = new ServerWriter(session, serverId, this, changelogCache); |
| | | |
| | | ServerReader reader = new ServerReader(session, serverId, this, |
| | | reader = new ServerReader(session, serverId, this, |
| | | changelogCache); |
| | | |
| | | reader.start(); |
| | |
| | | // ignore |
| | | } |
| | | } |
| | | |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public UpdateMessage take() |
| | | { |
| | | boolean interrupted = true; |
| | | UpdateMessage msg = getnextMessage(); |
| | | do { |
| | | try |
| | | { |
| | | sendWindow.acquire(); |
| | | interrupted = false; |
| | | } catch (InterruptedException e) |
| | | { |
| | | // loop until not interrupted |
| | | } |
| | | } while (interrupted); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | |
| | | /** |
| | | * Get the next update that must be sent to the server |
| | | * from the message queue or from the database. |
| | | * |
| | | * @return The next update that must be sent to the server. |
| | | */ |
| | | private UpdateMessage getnextMessage() |
| | | { |
| | | UpdateMessage msg; |
| | | do |
| | | { |
| | |
| | | msg1 = msgQueue.removeFirst(); |
| | | } while (!msg.getChangeNumber().equals(msg1.getChangeNumber())); |
| | | this.updateServerState(msg); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | } |
| | |
| | | /* get the next change from the lateQueue */ |
| | | msg = lateQueue.removeFirst(); |
| | | this.updateServerState(msg); |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | } |
| | |
| | | * by the other server. |
| | | * Otherwise just loop to select the next message. |
| | | */ |
| | | this.incrementOutCount(); |
| | | return msg; |
| | | } |
| | | } |
| | |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | String str = changelogCache.getBaseDn().toString() + |
| | | String str = baseDn.toString() + |
| | | " " + serverURL + " " + String.valueOf(serverId); |
| | | |
| | | if (serverIsLDAPserver) |
| | |
| | | attributes.add(new Attribute("server-id", |
| | | String.valueOf(serverId))); |
| | | attributes.add(new Attribute("base-dn", |
| | | changelogCache.getBaseDn().toString())); |
| | | baseDn.toString())); |
| | | attributes.add(new Attribute("waiting-changes", |
| | | String.valueOf(getRcvMsgQueueSize()))); |
| | | attributes.add(new Attribute("update-waiting-acks", |
| | |
| | | String.valueOf(getInAckCount()))); |
| | | attributes.add(new Attribute("approximate-delay", |
| | | String.valueOf(getApproxDelay()))); |
| | | attributes.add(new Attribute("max-send-window", |
| | | String.valueOf(sendWindowSize))); |
| | | attributes.add(new Attribute("current-send-window", |
| | | String.valueOf(sendWindow.availablePermits()))); |
| | | attributes.add(new Attribute("max-rcv-window", |
| | | String.valueOf(maxRcvWindow))); |
| | | attributes.add(new Attribute("current-rcv-window", |
| | | String.valueOf(rcvWindow))); |
| | | long olderUpdateTime = getOlderUpdateTime(); |
| | | if (olderUpdateTime != 0) |
| | | { |
| | |
| | | |
| | | return localString; |
| | | } |
| | | |
| | | /** |
| | | * Check the protocol window and send WindowMessage if necessary. |
| | | * |
| | | * @throws IOException when the session becomes unavailable. |
| | | */ |
| | | public synchronized void checkWindow() throws IOException |
| | | { |
| | | rcvWindow--; |
| | | if (rcvWindow < rcvWindowSizeHalf) |
| | | { |
| | | WindowMessage msg = new WindowMessage(rcvWindowSizeHalf); |
| | | session.publish(msg); |
| | | outAckCount++; |
| | | rcvWindow += rcvWindowSizeHalf; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Update the send window size based on the credit specified in the |
| | | * given window message. |
| | | * |
| | | * @param windowMsg The Window Message containing the information |
| | | * necessary for updating the window size. |
| | | */ |
| | | public void updateWindow(WindowMessage windowMsg) |
| | | { |
| | | sendWindow.release(windowMsg.getNumAck()); |
| | | } |
| | | } |