mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.17.2013 20396bec93ad0de9b6cefee7c7b1ad628d6ea1ae
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -75,7 +75,7 @@
 */
public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
{
  private final String baseDn;
  private final DN baseDN;
  /**
   * The Status analyzer that periodically verifies whether the connected DSs
@@ -172,21 +172,21 @@
  private ServerState ctHeartbeatState;
  /**
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
   * Creates a new ReplicationServerDomain associated to the baseDN.
   *
   * @param baseDn
   *          The baseDn associated to the ReplicationServerDomain.
   * @param baseDN
   *          The baseDN associated to the ReplicationServerDomain.
   * @param localReplicationServer
   *          the ReplicationServer that created this instance.
   */
  public ReplicationServerDomain(String baseDn,
  public ReplicationServerDomain(DN baseDN,
      ReplicationServer localReplicationServer)
  {
    this.baseDn = baseDn;
    this.baseDN = baseDN;
    this.localReplicationServer = localReplicationServer;
    this.assuredTimeoutTimer = new Timer("Replication server RS("
        + localReplicationServer.getServerId()
        + ") assured timer for domain \"" + baseDn + "\"", true);
        + ") assured timer for domain \"" + baseDN + "\"", true);
    this.changelogDB = localReplicationServer.getChangelogDB();
    DirectoryServer.registerMonitorProvider(this);
@@ -253,7 +253,8 @@
          // Unknown assured mode: should never happen
          Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
            Integer.toString(localReplicationServer.getServerId()),
            assuredMode.toString(), baseDn, update.toString());
            assuredMode.toString(), baseDN.toNormalizedString(),
            update.toString());
          logError(errorMsg);
          assuredMessage = false;
        }
@@ -405,7 +406,7 @@
  {
    try
    {
      if (this.changelogDB.publishUpdateMsg(baseDn, serverId, updateMsg))
      if (this.changelogDB.publishUpdateMsg(baseDN, serverId, updateMsg))
      {
        /*
         * JNR: Matt and I had a hard time figuring out where to put this
@@ -608,7 +609,8 @@
      // Should never happen
      Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get(
        Integer.toString(localReplicationServer.getServerId()),
        Byte.toString(safeDataLevel), baseDn, update.toString());
        Byte.toString(safeDataLevel), baseDN.toNormalizedString(),
        update.toString());
      logError(errorMsg);
    } else if (sourceGroupId == groupId
    // Assured feature does not cross different group IDS
@@ -760,7 +762,7 @@
            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
              Integer.toString(localReplicationServer.getServerId()),
              Integer.toString(origServer.getServerId()),
              csn.toString(), baseDn));
              csn.toString(), baseDN.toNormalizedString()));
            mb.append(" ");
            mb.append(stackTraceToSingleLineString(e));
            logError(mb.toMessage());
@@ -838,7 +840,7 @@
            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
                Integer.toString(localReplicationServer.getServerId()),
                Integer.toString(origServer.getServerId()),
                csn.toString(), baseDn));
                csn.toString(), baseDN.toNormalizedString()));
            mb.append(" ");
            mb.append(stackTraceToSingleLineString(e));
            logError(mb.toMessage());
@@ -1275,7 +1277,7 @@
   */
  public Set<Integer> getServerIds()
  {
    return changelogDB.getDomainServerIds(baseDn);
    return changelogDB.getDomainServerIds(baseDN);
  }
  /**
@@ -1292,7 +1294,7 @@
   */
  public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN)
  {
    return changelogDB.getCursorFrom(baseDn, serverId, startAfterCSN);
    return changelogDB.getCursorFrom(baseDN, serverId, startAfterCSN);
  }
 /**
@@ -1305,7 +1307,7 @@
  */
  public long getCount(int serverId, CSN from, CSN to)
  {
    return changelogDB.getCount(baseDn, serverId, from, to);
    return changelogDB.getCount(baseDN, serverId, from, to);
  }
  /**
@@ -1315,16 +1317,17 @@
   */
  public long getChangesCount()
  {
    return changelogDB.getDomainChangesCount(baseDn);
    return changelogDB.getDomainChangesCount(baseDN);
  }
  /**
   * Get the baseDn.
   * @return Returns the baseDn.
   * Get the baseDN.
   *
   * @return Returns the baseDN.
   */
  public String getBaseDn()
  public DN getBaseDN()
  {
    return baseDn;
    return baseDN;
  }
  /**
@@ -1520,7 +1523,7 @@
  {
    MessageBuilder mb = new MessageBuilder();
    mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
        this.baseDn, Integer.toString(msg.getDestination())));
        baseDN.toNormalizedString(), Integer.toString(msg.getDestination())));
    mb.append(" In Replication Server=").append(
      this.localReplicationServer.getMonitorInstanceName());
    mb.append(" unroutable message =").append(msg.getClass().getSimpleName());
@@ -1567,7 +1570,8 @@
         */
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get(
            this.baseDn, Integer.toString(msg.getDestination())));
            baseDN.toNormalizedString(),
            Integer.toString(msg.getDestination())));
        mb.append(" unroutable message =" + msg.getClass().getSimpleName());
        mb.append(" Details: " + ioe.getLocalizedMessage());
        final Message message = mb.toMessage();
@@ -1698,7 +1702,7 @@
    stopAllServers(true);
    changelogDB.shutdownDomain(baseDn);
    changelogDB.shutdownDomain(baseDN);
  }
  /**
@@ -1709,7 +1713,7 @@
  public ServerState getDbServerState()
  {
    ServerState serverState = new ServerState();
    for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDn).values())
    for (CSN lastCSN : changelogDB.getDomainLastCSNs(baseDN).values())
    {
      serverState.update(lastCSN);
    }
@@ -1722,7 +1726,7 @@
  @Override
  public String toString()
  {
    return "ReplicationServerDomain " + baseDn;
    return "ReplicationServerDomain " + baseDN;
  }
  /**
@@ -1755,10 +1759,9 @@
            {
              if (i == 2)
              {
                Message message =
                    ERR_EXCEPTION_SENDING_TOPO_INFO
                        .get(baseDn, "directory", Integer.toString(dsHandler
                            .getServerId()), e.getMessage());
                Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                    baseDN.toNormalizedString(), "directory",
                    Integer.toString(dsHandler.getServerId()), e.getMessage());
                logError(message);
              }
            }
@@ -1793,7 +1796,7 @@
            if (i == 2)
            {
              Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                  baseDn, "replication",
                  baseDN.toNormalizedString(), "replication",
                  Integer.toString(rsHandler.getServerId()), e.getMessage());
              logError(message);
            }
@@ -1934,9 +1937,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In " + this + " Receiving ResetGenerationIdMsg from "
          + senderHandler.getServerId() + " for baseDn " + baseDn + ":\n"
          + genIdMsg);
      debug("Receiving ResetGenerationIdMsg from "
          + senderHandler.getServerId() + ":\n" + genIdMsg);
    }
    try
@@ -1965,10 +1967,8 @@
        // Order to take a gen id we already have, just ignore
        if (debugEnabled())
        {
          TRACER.debugInfo("In " + this
              + " Reset generation id requested for baseDn " + baseDn
              + " but generation id was already " + this.generationId + ":\n"
              + genIdMsg);
          debug("Reset generation id requested but generationId was already "
              + this.generationId + ":\n" + genIdMsg);
        }
      }
@@ -1987,8 +1987,8 @@
          }
        } catch (IOException e)
        {
          logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(baseDn,
              e.getMessage()));
          logError(ERR_EXCEPTION_FORWARDING_RESET_GEN_ID.get(
              baseDN.toNormalizedString(), e.getMessage()));
        }
      }
@@ -2001,7 +2001,8 @@
          dsHandler.changeStatusForResetGenId(newGenId);
        } catch (IOException e)
        {
          logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(baseDn,
          logError(ERR_EXCEPTION_CHANGING_STATUS_AFTER_RESET_GEN_ID.get(
              baseDN.toNormalizedString(),
              Integer.toString(dsHandler.getServerId()),
              e.getMessage()));
        }
@@ -2014,7 +2015,8 @@
      // treatment.
      sendTopoInfoToAll();
      logError(NOTE_RESET_GENERATION_ID.get(baseDn, newGenId));
      logError(NOTE_RESET_GENERATION_ID.get(baseDN.toNormalizedString(),
          newGenId));
    }
    catch(Exception e)
    {
@@ -2069,7 +2071,8 @@
      sendTopoInfoToAllExcept(senderHandler);
      Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
          senderHandler.getServerId(), baseDn, newStatus.toString());
          senderHandler.getServerId(), baseDN.toNormalizedString(),
          newStatus.toString());
      logError(message);
    }
    catch(Exception e)
@@ -2114,7 +2117,7 @@
      // StatusAnalyzer.
      if (debugEnabled())
      {
        TRACER.debugInfo("Status analyzer for domain " + baseDn
        TRACER.debugInfo("Status analyzer for domain " + baseDN
            + " has been interrupted when"
            + " trying to acquire domain lock for changing the status of DS "
            + dsHandler.getServerId());
@@ -2133,7 +2136,7 @@
      catch (IOException e)
      {
        logError(ERR_EXCEPTION_CHANGING_STATUS_FROM_STATUS_ANALYZER
            .get(baseDn,
            .get(baseDN.toNormalizedString(),
                Integer.toString(dsHandler.getServerId()),
                e.getMessage()));
      }
@@ -2186,10 +2189,10 @@
  public void clearDbs()
  {
    // Reset the localchange and state db for the current domain
    changelogDB.clearDomain(baseDn);
    changelogDB.clearDomain(baseDN);
    try
    {
      localReplicationServer.clearGenerationId(baseDn);
      localReplicationServer.clearGenerationId(baseDN);
    }
    catch (Exception e)
    {
@@ -2285,7 +2288,7 @@
            rsHandler.getServerId(),
            rsHandler.session.getReadableRemoteAddress(),
            rsHandler.getGenerationId(),
            baseDn, getLocalRSServerId(), generationId);
            baseDN.toNormalizedString(), getLocalRSServerId(), generationId);
        logError(message);
        ErrorMsg errorMsg = new ErrorMsg(getLocalRSServerId(),
@@ -2494,7 +2497,8 @@
  {
    return "Replication server RS(" + localReplicationServer.getServerId()
        + ") " + localReplicationServer.getServerURL() + ",cn="
        + baseDn.replace(',', '_').replace('=', '_') + ",cn=Replication";
        + baseDN.toNormalizedString().replace(',', '_').replace('=', '_')
        + ",cn=Replication";
  }
  /**
@@ -2509,9 +2513,10 @@
        String.valueOf(localReplicationServer.getServerId())));
    attributes.add(Attributes.create("replication-server-port",
        String.valueOf(localReplicationServer.getReplicationPort())));
    attributes.add(Attributes.create("domain-name", baseDn));
    attributes.add(Attributes.create("domain-name",
        baseDN.toNormalizedString()));
    attributes.add(Attributes.create("generation-id",
        baseDn + " " + generationId));
        baseDN + " " + generationId));
    // Missing changes
    long missingChanges = getDomainMonitorData().getMissingChangesRS(
@@ -2595,7 +2600,7 @@
          if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
          {
            // let's try to seek the first change <= eligibleCSN
            CSN newCSN = changelogDB.getCSNAfter(baseDn, serverId, eligibleCSN);
            CSN newCSN = changelogDB.getCSNAfter(baseDN, serverId, eligibleCSN);
            result.update(newCSN);
          } else {
            // for this serverId, all changes in the ChangelogDb are holder
@@ -2612,8 +2617,7 @@
    if (debugEnabled())
    {
      TRACER
          .debugInfo("In " + this + " getEligibleState() result is " + result);
      debug("getEligibleState() result is " + result);
    }
    return result;
  }
@@ -2629,7 +2633,7 @@
  public ServerState getStartState()
  {
    ServerState domainStartState = new ServerState();
    for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDn).values())
    for (CSN firstCSN : changelogDB.getDomainFirstCSNs(baseDN).values())
    {
      domainStartState.update(firstCSN);
    }
@@ -2650,7 +2654,7 @@
    CSN eligibleCSN = null;
    for (Entry<Integer, CSN> entry :
      changelogDB.getDomainLastCSNs(baseDn).entrySet())
      changelogDB.getDomainLastCSNs(baseDN).entrySet())
    {
      // Consider this producer (DS/db).
      final int serverId = entry.getKey();
@@ -2767,7 +2771,7 @@
            logError(ERR_CHANGELOG_ERROR_SENDING_MSG
                .get("Replication Server "
                    + localReplicationServer.getReplicationPort() + " "
                    + baseDn + " " + localReplicationServer.getServerId()));
                    + baseDN + " " + localReplicationServer.getServerId()));
            stopServer(rsHandler, false);
          }
        }
@@ -2844,7 +2848,7 @@
   */
  public long getLatestDomainTrimDate()
  {
    return changelogDB.getDomainLatestTrimDate(baseDn);
    return changelogDB.getDomainLatestTrimDate(baseDN);
  }
  /**
@@ -2962,8 +2966,9 @@
  private void debug(String message)
  {
    TRACER.debugInfo("In RS serverId=" + localReplicationServer.getServerId()
        + " for baseDn=" + baseDn + " and port="
        + localReplicationServer.getReplicationPort() + ": " + message);
    TRACER.debugInfo("In ReplicationServerDomain serverId="
        + localReplicationServer.getServerId() + " for baseDN=" + baseDN
        + " and port=" + localReplicationServer.getReplicationPort()
        + ": " + message);
  }
}