mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
28.34.2014 1d5d1a6a4a0a58d6bb4803527dacb6641c027816
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -43,7 +43,7 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.tasks.InitializeTargetTask;
@@ -55,7 +55,6 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.AssuredMode.*;
import static org.opends.server.replication.common.StatusMachine.*;
@@ -110,11 +109,7 @@
   * Current status for this replicated domain.
   */
  protected ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  /** The configuration of the replication domain. */
  protected volatile ReplicationDomainCfg config;
@@ -366,8 +361,8 @@
   */
  private void receiveChangeStatus(ChangeStatusMsg csMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("Replication domain " + getBaseDN() +
    if (logger.isTraceEnabled())
      logger.trace("Replication domain " + getBaseDN() +
        " received change status message:\n" + csMsg);
    ServerStatus reqStatus = csMsg.getRequestedStatus();
@@ -413,8 +408,8 @@
      case BAD_GEN_ID_STATUS:
        break;
      default:
        if (debugEnabled())
          TRACER.debugInfo("updateDomainForNewStatus: unexpected status: " +
        if (logger.isTraceEnabled())
          logger.trace("updateDomainForNewStatus: unexpected status: " +
            status);
    }
  }
@@ -700,9 +695,9 @@
          return null;
        }
        if (debugEnabled() && !(msg instanceof HeartbeatMsg))
        if (logger.isTraceEnabled() && !(msg instanceof HeartbeatMsg))
        {
          TRACER.debugVerbose("LocalizableMessage received <" + msg + ">");
          logger.trace("LocalizableMessage received <" + msg + ">");
        }
        if (msg instanceof AckMsg)
@@ -745,8 +740,8 @@
            A remote error during the import will be received in the
            receiveEntryBytes() method.
            */
            if (debugEnabled())
              TRACER.debugInfo(
            if (logger.isTraceEnabled())
              logger.trace(
                  "[IE] processErrorMsg:" + getServerId() +
                  " baseDN: " + getBaseDN() +
                  " Error Msg received: " + errorMsg);
@@ -1007,8 +1002,8 @@
    @Override
    public void run()
    {
      if (debugEnabled())
        TRACER.debugInfo("[IE] starting " + getName());
      if (logger.isTraceEnabled())
        logger.trace("[IE] starting " + getName());
      try
      {
        initializeRemote(serverIdToInitialize, serverIdToInitialize, null,
@@ -1022,8 +1017,8 @@
        */
      }
      if (debugEnabled())
        TRACER.debugInfo("[IE] ending " + getName());
      if (logger.isTraceEnabled())
        logger.trace("[IE] ending " + getName());
    }
  }
@@ -1270,8 +1265,8 @@
     */
    public void setAckVal(int serverId, int numAck)
    {
      if (debugEnabled())
        TRACER.debugInfo("[IE] setAckVal[" + serverId + "]=" + numAck);
      if (logger.isTraceEnabled())
        logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
      this.ackVals.put(serverId, numAck);
@@ -1294,8 +1289,8 @@
     */
    public int getSlowestServer()
    {
      if (debugEnabled())
        TRACER.debugInfo("[IE] getSlowestServer" + slowestServerId
      if (logger.isTraceEnabled())
        logger.trace("[IE] getSlowestServer" + slowestServerId
            + " " + this.ackVals.get(slowestServerId));
      return this.slowestServerId;
@@ -1480,8 +1475,8 @@
        exportRootException = ieEx != null ? ieEx : exportException;
      }
      if (debugEnabled())
        TRACER.debugInfo("[IE] In " + broker.getReplicationMonitorInstanceName()
      if (logger.isTraceEnabled())
        logger.trace("[IE] In " + broker.getReplicationMonitorInstanceName()
            + " export ends with " + " connected=" + broker.isConnected()
            + " exportRootException=" + exportRootException);
@@ -1502,8 +1497,8 @@
          {
            // We are still disconnected, so we wait for the listener thread
            // to reconnect - wait 10s
            if (debugEnabled())
              TRACER.debugInfo(
            if (logger.isTraceEnabled())
              logger.trace(
                "[IE] Exporter wait for reconnection by the listener thread");
            int att=0;
            while (!broker.shuttingDown() && !broker.isConnected()
@@ -1596,8 +1591,8 @@
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieCtx.startList);
    if (debugEnabled())
      TRACER.debugInfo(
    if (logger.isTraceEnabled())
      logger.trace(
      "[IE] wait for start replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
    int waitResultAttempt = 0;
@@ -1607,8 +1602,8 @@
      done = true;
      for (DSInfo dsi : getReplicasList())
      {
        if (debugEnabled())
          TRACER.debugInfo(
        if (logger.isTraceEnabled())
          logger.trace(
            "[IE] wait for start dsId " + dsi.getDsId()
            + " " + dsi.getStatus()
            + " " + dsi.getGenerationId()
@@ -1639,8 +1634,8 @@
    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
    if (debugEnabled())
      TRACER.debugInfo(
    if (logger.isTraceEnabled())
      logger.trace(
        "[IE] wait for start ends with " + ieCtx.failureList);
  }
@@ -1654,8 +1649,8 @@
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieCtx.startList);
    if (debugEnabled())
      TRACER.debugInfo(
    if (logger.isTraceEnabled())
      logger.trace(
        "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
    /*
@@ -1736,8 +1731,8 @@
    ieCtx.failureList.addAll(replicasWeAreWaitingFor);
    if (debugEnabled())
      TRACER.debugInfo(
    if (logger.isTraceEnabled())
      logger.trace(
        "[IE] wait for end ends with " + ieCtx.failureList);
  }
@@ -1827,9 +1822,9 @@
        // potential disconnection of the exporter.
        msg = broker.receive(false, false, true);
        if (debugEnabled())
        if (logger.isTraceEnabled())
        {
          TRACER.debugInfo("[IE] In "
          logger.trace("[IE] In "
              + broker.getReplicationMonitorInstanceName()
              + ", receiveEntryBytes " + msg);
        }
@@ -1877,9 +1872,9 @@
              final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
                  getServerId(), entryMsg.getSenderID(), ieCtx.msgCnt);
              broker.publish(amsg, false);
              if (debugEnabled())
              if (logger.isTraceEnabled())
              {
                TRACER.debugInfo("[IE] In "
                logger.trace("[IE] In "
                    + broker.getReplicationMonitorInstanceName()
                    + ", publish InitializeRcvAckMsg" + amsg);
              }
@@ -1988,8 +1983,8 @@
  public void exportLDIFEntry(byte[] lDIFEntry, int pos, int length)
  throws IOException
  {
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" +
    if (logger.isTraceEnabled())
      logger.trace("[IE] Entering exportLDIFEntry entry=" +
          Arrays.toString(lDIFEntry));
    // build the message
@@ -2025,14 +2020,14 @@
      int ourLastExportedCnt = ieCtx.msgCnt;
      int slowestCnt = ieCtx.ackVals.get(slowestServerId);
      if (debugEnabled())
        TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting " +
      if (logger.isTraceEnabled())
        logger.trace("[IE] Entering exportLDIFEntry waiting " +
            " our=" + ourLastExportedCnt + " slowest=" + slowestCnt);
      if ((ourLastExportedCnt - slowestCnt) > ieCtx.initWindow)
      {
        if (debugEnabled())
          TRACER.debugInfo("[IE] Entering exportLDIFEntry waiting");
        if (logger.isTraceEnabled())
          logger.trace("[IE] Entering exportLDIFEntry waiting");
        // our export is too far beyond the slowest importer - let's wait
        try { Thread.sleep(100); }
@@ -2053,14 +2048,14 @@
      }
      else
      {
        if (debugEnabled())
          TRACER.debugInfo("[IE] slowest got to us => stop waiting");
        if (logger.isTraceEnabled())
          logger.trace("[IE] slowest got to us => stop waiting");
        break;
      }
    } // Waiting the slowest loop
    if (debugEnabled())
      TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry="
    if (logger.isTraceEnabled())
      logger.trace("[IE] Entering exportLDIFEntry pub entry="
          + Arrays.toString(lDIFEntry));
    boolean sent = broker.publish(entryMessage, false);
@@ -2164,9 +2159,9 @@
  public void initializeFromRemote(int source, Task initTask)
  throws DirectoryException
  {
    if (debugEnabled())
    if (logger.isTraceEnabled())
    {
      TRACER.debugInfo("[IE] Entering initializeFromRemote for " + this);
      logger.trace("[IE] Entering initializeFromRemote for " + this);
    }
    LocalizableMessage errMsg = !broker.isConnected()
@@ -2243,9 +2238,9 @@
  {
    InitializeTask initFromTask = null;
    if (debugEnabled())
    if (logger.isTraceEnabled())
    {
      TRACER.debugInfo("[IE] Entering initialize - domain=" + this);
      logger.trace("[IE] Entering initialize - domain=" + this);
    }
    int source = initTargetMsgReceived.getSenderID();
@@ -2293,9 +2288,9 @@
    }
    finally
    {
      if (debugEnabled())
      if (logger.isTraceEnabled())
      {
        TRACER.debugInfo("[IE] Domain=" + this
        logger.trace("[IE] Domain=" + this
          + " ends import with exception=" + ieCtx.getException()
          + " connected=" + broker.isConnected());
      }
@@ -2359,9 +2354,9 @@
      // ===================
      // No new attempt case
      if (debugEnabled())
      if (logger.isTraceEnabled())
      {
        TRACER.debugInfo("[IE] Domain=" + this
        logger.trace("[IE] Domain=" + this
          + " ends initialization with exception=" + ieCtx.getException()
          + " connected=" + broker.isConnected()
          + " task=" + initFromTask
@@ -2447,9 +2442,9 @@
      }
      status = newStatus;
      if (debugEnabled())
      if (logger.isTraceEnabled())
      {
        TRACER.debugInfo("Replication domain " + getBaseDN()
        logger.trace("Replication domain " + getBaseDN()
            + " new status is: " + status);
      }
@@ -2565,9 +2560,9 @@
  public void resetGenerationId(Long generationIdNewValue)
      throws DirectoryException
  {
    if (debugEnabled())
    if (logger.isTraceEnabled())
    {
      TRACER.debugInfo("Server id " + getServerId() + " and domain "
      logger.trace("Server id " + getServerId() + " and domain "
          + getBaseDN() + " resetGenerationId " + generationIdNewValue);
    }
@@ -2967,9 +2962,9 @@
        @Override
        public void run()
        {
          if (debugEnabled())
          if (logger.isTraceEnabled())
          {
            TRACER.debugInfo("Replication Listener thread starting.");
            logger.trace("Replication Listener thread starting.");
          }
          // Loop processing any incoming update messages.
@@ -2994,9 +2989,9 @@
            }
          }
          if (debugEnabled())
          if (logger.isTraceEnabled())
          {
            TRACER.debugInfo("Replication Listener thread stopping.");
            logger.trace("Replication Listener thread stopping.");
          }
        }
      }, threadName);
@@ -3339,9 +3334,9 @@
          msg.wait(10);
        } catch (InterruptedException e)
        {
          if (debugEnabled())
          if (logger.isTraceEnabled())
          {
            TRACER.debugInfo("waitForAck method interrupted for replication " +
            logger.trace("waitForAck method interrupted for replication " +
              "baseDN: " + getBaseDN());
          }
          break;