mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
04.16.2013 b6baf9b93cd501ac43200589d37feefd1e5291c2
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2012 ForgeRock AS
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -492,10 +492,7 @@
    }
    // Push the message to the other subscribing handlers
    Iterator<MessageHandler> otherIter = otherHandlers.iterator();
    while (otherIter.hasNext())
    {
      MessageHandler handler = otherIter.next();
    for (MessageHandler handler : otherHandlers) {
      handler.add(update, sourceHandler);
    }
  }
@@ -905,8 +902,7 @@
          }
          // Increment assured counters
          boolean safeRead =
            (expectedAcksInfo instanceof SafeReadExpectedAcksInfo)
            ? true : false;
              (expectedAcksInfo instanceof SafeReadExpectedAcksInfo);
          if (safeRead)
          {
            origServer.incrementAssuredSrReceivedUpdatesTimeout();
@@ -963,24 +959,6 @@
    }
  }
  /**
   * Wait a short while for ServerId disconnection.
   *
   * @param serverId the serverId to be checked.
   */
  public void waitDisconnection(int serverId)
  {
    if (directoryServers.containsKey(serverId))
    {
      // try again
      try
      {
        Thread.sleep(100);
      } catch (InterruptedException e)
      {
      }
    }
  }
  /**
   * Stop operations with a list of replication servers.
@@ -1026,14 +1004,13 @@
   */
  public boolean checkForDuplicateDS(DataServerHandler handler)
  {
    DataServerHandler oldHandler = directoryServers.get(handler.getServerId());
    if (directoryServers.containsKey(handler.getServerId()))
    {
      // looks like two LDAP servers have the same serverId
      // looks like two connected LDAP servers have the same serverId
      Message message = ERR_DUPLICATE_SERVER_ID.get(
        replicationServer.getMonitorInstanceName(), oldHandler.toString(),
        handler.toString(), handler.getServerId());
          replicationServer.getMonitorInstanceName(),
          directoryServers.get(handler.getServerId()).toString(),
          handler.toString(), handler.getServerId());
      logError(message);
      return false;
    }
@@ -1096,7 +1073,7 @@
        if (handler.isReplicationServer())
        {
          if (replicationServers.containsValue(handler))
          if (replicationServers.containsKey(handler.getServerId()))
          {
            unregisterServerHandler(handler);
            handler.shutdown();
@@ -1110,7 +1087,7 @@
              buildAndSendTopoInfoToDSs(null);
            }
          }
        } else if (directoryServers.containsValue(handler))
        } else if (directoryServers.containsKey(handler.getServerId()))
        {
          // If this is the last DS for the domain,
          // shutdown the status analyzer
@@ -1434,7 +1411,7 @@
      return null;
    }
    if (it.next() == false)
    if (!it.next())
    {
      it.releaseCursor();
      return null;
@@ -1459,8 +1436,7 @@
    if (handler == null)
      return 0;
    int count = handler.getCount(from, to);
    return count;
    return handler.getCount(from, to);
  }
  /**
@@ -1655,9 +1631,8 @@
        MessageBuilder mb1 = new MessageBuilder();
        mb1.append(
            NOTE_ERR_ROUTING_TO_SERVER.get(msg.getClass().getCanonicalName()));
        mb1.append("serverID:" + msg.getDestination());
        ErrorMsg errMsg = new ErrorMsg(
          msg.getSenderID(), mb1.toMessage());
        mb1.append("serverID:").append(msg.getDestination());
        ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb1.toMessage());
        try
        {
          senderHandler.send(errMsg);
@@ -1678,9 +1653,9 @@
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
          this.baseDn, Integer.toString(msg.getDestination())));
      mb.append(" In Replication Server=" +
      mb.append(" In Replication Server=").append(
        this.replicationServer.getMonitorInstanceName());
      mb.append(" unroutable message =" + msg.getClass().getSimpleName());
      mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
      mb.append(" Details:routing table is empty");
      ErrorMsg errMsg = new ErrorMsg(
        this.replicationServer.getServerId(),
@@ -1915,13 +1890,12 @@
  {
    for (DataServerHandler handler : directoryServers.values())
    {
      if ((notThisOne == null) || // All DSs requested
          ((notThisOne != null) && (handler != notThisOne)))
      if ((notThisOne == null) || ((handler != notThisOne)))
        // All except passed one
      {
        for (int i=1; i<2; i++)
        for (int i=1; i<=2; i++)
        {
          if (handler.shuttingDown()==false)
          if (!handler.shuttingDown())
          {
            if (handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
            {
@@ -1960,9 +1934,9 @@
    TopologyMsg topoMsg = createTopologyMsgForRS();
    for (ReplicationServerHandler handler : replicationServers.values())
    {
      for (int i=1; i<2; i++)
      for (int i=1; i<=2; i++)
      {
        if (handler.shuttingDown()==false)
        if (!handler.shuttingDown())
        {
          if (handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          {
@@ -2685,42 +2659,38 @@
  /**
   * Start collecting global monitoring information for this
   * ReplicationServerDomain.
   *
   * @throws DirectoryException
   *           In case the monitoring information could not be collected.
   */
  private void initializePendingMonitorData()
  {
    // Let's process our directly connected LSes
    // - in the ServerHandler for a given LS1, the stored state contains :
    // - the max CN produced by LS1
    // - the last CN consumed by LS1 from LS2..n
    // Let's process our directly connected DS
    // - in the ServerHandler for a given DS1, the stored state contains :
    // - the max CN produced by DS1
    // - the last CN consumed by DS1 from DS2..n
    // - in the RSdomain/dbHandler, the built-in state contains :
    // - the max CN produced by each server
    // So for a given LS connected we can take the state and the max from
    // the LS/state.
    // So for a given DS connected we can take the state and the max from
    // the DS/state.
    for (ServerHandler directlsh : directoryServers.values())
    for (ServerHandler ds : directoryServers.values())
    {
      int serverID = directlsh.getServerId();
      int serverID = ds.getServerId();
      // the state comes from the state stored in the SH
      ServerState directlshState = directlsh.getServerState()
      ServerState dsState = ds.getServerState()
          .duplicate();
      // the max CN sent by that LS also comes from the SH
      ChangeNumber maxcn = directlshState
          .getMaxChangeNumber(serverID);
      ChangeNumber maxcn = dsState.getMaxChangeNumber(serverID);
      if (maxcn == null)
      {
        // This directly connected LS has never produced any change
        maxcn = new ChangeNumber(0, 0, serverID);
      }
      pendingMonitorData.setMaxCN(serverID, maxcn);
      pendingMonitorData.setLDAPServerState(serverID, directlshState);
      pendingMonitorData.setLDAPServerState(serverID, dsState);
      pendingMonitorData.setFirstMissingDate(serverID,
          directlsh.getApproxFirstMissingDate());
          ds.getApproxFirstMissingDate());
    }
    // Then initialize the max CN for the LS that produced something
@@ -2729,10 +2699,7 @@
    ServerState dbServerState = getDbServerState();
    pendingMonitorData.setRSState(replicationServer.getServerId(),
        dbServerState);
    Iterator<Integer> it = dbServerState.iterator();
    while (it.hasNext())
    {
      int sid = it.next();
    for (int sid : dbServerState) {
      ChangeNumber storedCN = dbServerState.getMaxChangeNumber(sid);
      pendingMonitorData.setMaxCN(sid, storedCN);
    }
@@ -2746,7 +2713,7 @@
   *
   * @param msg
   *          The message to be processed.
   * @param globalServerHandlerId
   * @param serverId
   *          server handler that is receiving the message.
   */
  private void receivesMonitorDataResponse(MonitorMsg msg,
@@ -2833,7 +2800,7 @@
      finally
      {
        // Decreases the number of expected responses and potentially
        // wakes up the waiting requestor thread.
        // wakes up the waiting requester thread.
        if (pendingMonitorDataServerIDs.remove(serverId))
        {
          pendingMonitorDataLatch.countDown();
@@ -2986,7 +2953,7 @@
  {
    if (statusAnalyzer != null)
    {
      statusAnalyzer.setDeradedStatusThreshold(degradedStatusThreshold);
      statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
    }
  }
@@ -3160,65 +3127,46 @@
   */
  public ServerState getEligibleState(ChangeNumber eligibleCN)
  {
    ServerState result = new ServerState();
    ServerState dbState = this.getDbServerState();
    // The result is initialized from the dbState.
    // From it, we don't want to keep the changes newer than eligibleCN.
    result = dbState.duplicate();
    ServerState result = dbState.duplicate();
    if (eligibleCN != null)
    {
      Iterator<Integer> it = dbState.iterator();
      while (it.hasNext())
      {
        int sid = it.next();
      for (int sid : dbState) {
        DbHandler h = sourceDbHandlers.get(sid);
        ChangeNumber mostRecentDbCN = dbState.getMaxChangeNumber(sid);
        try
        {
        try {
          // Is the most recent change in the Db newer than eligible CN ?
          // if yes (like cn15 in the example above, then we have to go back
          // to the Db and look for the change older than  eligible CN (cn14)
          if (eligibleCN.olderOrEqual(mostRecentDbCN))
          {
          if (eligibleCN.olderOrEqual(mostRecentDbCN)) {
            // let's try to seek the first change <= eligibleCN
            ReplicationIterator ri = null;
            try
            {
            try {
              ri = h.generateIterator(eligibleCN);
              if ((ri != null) && (ri.getChange()!=null))
              {
              if ((ri != null) && (ri.getChange() != null)) {
                ChangeNumber newCN = ri.getChange().getChangeNumber();
                result.update(newCN);
              }
            }
            catch(Exception e)
            {
            } catch (Exception e) {
              // there's no change older than eligibleCN (case of s3/cn31)
              result.update(new ChangeNumber(0,0,sid));
            }
            finally
            {
              if (ri != null)
              {
              result.update(new ChangeNumber(0, 0, sid));
            } finally {
              if (ri != null) {
                ri.releaseCursor();
                ri = null;
              }
            }
          }
          else
          {
            // for this serverid, all changes in the ChangelogDb are holder
          } else {
            // for this serverId, all changes in the ChangelogDb are holder
            // than eligibleCN , the most recent in the db is our guy.
            result.update(mostRecentDbCN);
          }
        }
        catch(Exception e)
        {
        } catch (Exception e) {
          Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(
              " " +  stackTraceToSingleLineString(e));
              " " + stackTraceToSingleLineString(e));
          logError(errMessage);
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
@@ -3234,7 +3182,7 @@
  /**
   * Returns the start state of the domain, made of the first (oldest)
   * change stored for each serverId.
   * Note: Because the replication changelogdb triming always keep one change
   * Note: Because the replication changelogdb trimming always keep one change
   * whatever its date, the change contained in the returned state can be very
   * old.
   * @return the start state of the domain.
@@ -3433,21 +3381,18 @@
    // Parses the dbState of the domain , server by server
    ServerState dbState = this.getDbServerState();
    Iterator<Integer> serverIDIterator = dbState.iterator();
    while (serverIDIterator.hasNext())
    {
    for (int sid : dbState) {
      // process one sid
      int sid = serverIDIterator.next();
      ChangeNumber startCN = null;
      if (startState.getMaxChangeNumber(sid) != null)
        startCN = startState.getMaxChangeNumber(sid);
      long sidRes = getCount(sid, startCN, endCN);
      // The startPoint is excluded when counting the ECL eligible changes
      if ((startCN!=null)&&(sidRes>0))
      if ((startCN != null) && (sidRes > 0))
        sidRes--;
      res+=sidRes;
      res += sidRes;
    }
    return res;
  }
@@ -3466,14 +3411,11 @@
    // Parses the dbState of the domain , server by server
    ServerState dbState = this.getDbServerState();
    Iterator<Integer> serverIDIterator = dbState.iterator();
    while (serverIDIterator.hasNext())
    {
    for (int sid : dbState) {
      // process one sid
      int sid = serverIDIterator.next();
      ChangeNumber lStartCN =
        new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), sid);
      res+=getCount(sid, lStartCN, endCN);
          new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), sid);
      res += getCount(sid, lStartCN, endCN);
    }
    return res;
  }