mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.41.2013 5866e993545a397330a28b084df2cfa676274950
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -28,10 +28,9 @@
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
import java.util.*;
@@ -67,7 +66,6 @@
 * are removed and should they be needed again must be read from the backing
 * file
 *
 *
 * it runs a thread that is responsible for saving the messages
 * received to the disk and for trimming them
 * Decision to trim can be based on disk space or age of the message
@@ -382,13 +380,11 @@
            TRACER.debugInfo("In " + "Replication Server " +
              replicationServer.getReplicationPort() + " " +
              baseDn + " " + replicationServer.getServerId() +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " for dn " + baseDn + ", update " + update.getChangeNumber() +
              " will not be sent to replication server " +
              Integer.toString(handler.getServerId()) + " with generation id " +
              Long.toString(handler.getGenerationId()) +
              " different from local " +
              "generation id " + Long.toString(generationId));
              handler.getServerId() + " with generation id " +
              handler.getGenerationId() + " different from local " +
              "generation id " + generationId);
          continue;
        }
@@ -447,20 +443,16 @@
        {
          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
            TRACER.debugInfo("In " + this +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " for dn " + baseDn + ", update " + update.getChangeNumber() +
              " will not be sent to directory server " +
              Integer.toString(handler.getServerId()) + " with generation id " +
              Long.toString(handler.getGenerationId()) +
              " different from local " +
              "generation id " + Long.toString(generationId));
              handler.getServerId() + " with generation id " +
              handler.getGenerationId() + " different from local " +
              "generation id " + generationId);
          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
            TRACER.debugInfo("In RS " +
              replicationServer.getServerId() +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " will not be sent to directory server " +
              Integer.toString(handler.getServerId()) +
              " for dn " + baseDn + ", update " + update.getChangeNumber() +
              " will not be sent to directory server " + handler.getServerId() +
              " as it is in full update");
        }
@@ -697,23 +689,17 @@
    }
    List<Integer> expectedServers = new ArrayList<Integer>();
    if (interestedInAcks)
    if (interestedInAcks && sourceHandler.isDataServer())
    {
      if (sourceHandler.isDataServer())
      // Look for RS eligible for assured
      for (ReplicationServerHandler handler : replicationServers.values())
      {
        // Look for RS eligible for assured
        for (ReplicationServerHandler handler : replicationServers.values())
        if (handler.getGroupId() == groupId
        // No ack expected from a RS with different group id
            && generationId > 0 && (generationId == handler.getGenerationId()))
        // No ack expected from a RS with bad gen id
        {
          if (handler.getGroupId() == groupId)
            // No ack expected from a RS with different group id
          {
            if ((generationId > 0) &&
              (generationId == handler.getGenerationId()))
              // No ack expected from a RS with bad gen id
            {
              expectedServers.add(handler.getServerId());
            }
          }
          expectedServers.add(handler.getServerId());
        }
      }
    }
@@ -835,6 +821,7 @@
     * Run when the assured timeout for an assured update message we are waiting
     * acks for occurs.
     */
    @Override
    public void run()
    {
      ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
@@ -857,11 +844,9 @@
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          if (debugEnabled())
            TRACER.debugInfo(
              "In RS " + Integer.toString(replicationServer.getServerId()) +
              " for " + baseDn +
              "In RS " + replicationServer.getServerId() + " for " + baseDn +
              ", sending timeout for assured update with change " + " number " +
              cn.toString() + " to server id " +
              Integer.toString(origServer.getServerId()));
              cn + " to server id " + origServer.getServerId());
          try
          {
            origServer.send(finalAck);
@@ -882,7 +867,7 @@
          }
          // Increment assured counters
          boolean safeRead =
              (expectedAcksInfo instanceof SafeReadExpectedAcksInfo);
              expectedAcksInfo instanceof SafeReadExpectedAcksInfo;
          if (safeRead)
          {
            origServer.incrementAssuredSrReceivedUpdatesTimeout();
@@ -1246,9 +1231,8 @@
          " has servers connected to it - will not reset generationId");
    }
    if ((!lDAPServersConnectedInTheTopology) &&
      (!this.generationIdSavedStatus) &&
      (generationId != -1))
    if (!lDAPServersConnectedInTheTopology && !this.generationIdSavedStatus
        && generationId != -1)
    {
      changeGenerationId(-1, false);
    }
@@ -1265,7 +1249,7 @@
  {
    ReplicationServerHandler oldHandler =
      replicationServers.get(handler.getServerId());
    if ((oldHandler != null))
    if (oldHandler != null)
    {
      if (oldHandler.getServerAddressURL().equals(
        handler.getServerAddressURL()))
@@ -1593,7 +1577,7 @@
              // We log the error. The requestor will detect a timeout or
              // any other failure on the connection.
              logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
                  Integer.toString((msg.getDestination()))));
                  Integer.toString(msg.getDestination())));
            }
          }
        }
@@ -1868,32 +1852,29 @@
  {
    for (DataServerHandler handler : directoryServers.values())
    {
      if ((notThisOne == null) || ((handler != notThisOne)))
      if ((notThisOne == null) || (handler != notThisOne))
        // All except passed one
      {
        for (int i=1; i<=2; i++)
        {
          if (!handler.shuttingDown())
          if (!handler.shuttingDown()
              && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          {
            if (handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
            TopologyMsg topoMsg = createTopologyMsgForDS(handler.getServerId());
            try
            {
              TopologyMsg topoMsg=createTopologyMsgForDS(handler.getServerId());
              try
              handler.sendTopoInfo(topoMsg);
              break;
            }
            catch (IOException e)
            {
              if (i == 2)
              {
                handler.sendTopoInfo(topoMsg);
                break;
              }
              catch (IOException e)
              {
                if (i==2)
                {
                  Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                      baseDn,
                      "directory",
                      Integer.toString(handler.getServerId()),
                      e.getMessage());
                  logError(message);
                }
                Message message =
                    ERR_EXCEPTION_SENDING_TOPO_INFO
                        .get(baseDn, "directory", Integer.toString(handler
                            .getServerId()), e.getMessage());
                logError(message);
              }
            }
          }
@@ -1914,26 +1895,22 @@
    {
      for (int i=1; i<=2; i++)
      {
        if (!handler.shuttingDown())
        if (!handler.shuttingDown()
            && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
        {
          if (handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          try
          {
            try
            handler.sendTopoInfo(topoMsg);
            break;
          }
          catch (IOException e)
          {
            if (i == 2)
            {
              handler.sendTopoInfo(topoMsg);
              break;
            }
            catch (IOException e)
            {
              if (i==2)
              {
                Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                    baseDn,
                    "replication",
                    Integer.toString(handler.getServerId()),
                    e.getMessage());
                logError(message);
              }
              Message message =
                  ERR_EXCEPTION_SENDING_TOPO_INFO.get(baseDn, "replication",
                      Integer.toString(handler.getServerId()), e.getMessage());
              logError(message);
            }
          }
        }
@@ -2852,7 +2829,7 @@
   */
  public boolean hasLock()
  {
    return (lock.getHoldCount() > 0);
    return lock.getHoldCount() > 0;
  }
  /**
@@ -2920,7 +2897,7 @@
   */
  public boolean isRunningStatusAnalyzer()
  {
    return (statusAnalyzer != null);
    return statusAnalyzer != null;
  }
  /**
@@ -2971,7 +2948,7 @@
   */
  public boolean isRunningMonitoringPublisher()
  {
    return (monitoringPublisher != null);
    return monitoringPublisher != null;
  }
  /**
@@ -3229,16 +3206,13 @@
      }
      ChangeNumber changelogLastCN = db.getLastChange();
      if (changelogLastCN != null)
      if (changelogLastCN != null
          && (eligibleCN == null || changelogLastCN.newer(eligibleCN)))
      {
        if ((eligibleCN == null) || (changelogLastCN.newer(eligibleCN)))
        {
          eligibleCN = changelogLastCN;
        }
        eligibleCN = changelogLastCN;
      }
      if ((heartbeatLastDN != null) &&
       ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN))))
      if (heartbeatLastDN != null
          && (eligibleCN == null || heartbeatLastDN.newer(eligibleCN)))
      {
        eligibleCN = heartbeatLastDN;
      }