mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.41.2013 5866e993545a397330a28b084df2cfa676274950
Changes made by AutoRefactor plugin and reviewed by me
5 files modified
194 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 25 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java 22 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 112 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 33 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -114,7 +114,7 @@
   */
  public String dumpState()
  {
    return this.getClass().getCanonicalName() +
    return getClass().getCanonicalName() +
           "[" +
           "[draftCompat=" + draftCompat +
           "] [persistent=" + isPersistent +
@@ -170,7 +170,8 @@
          .append(rsd).append("] [nextMsg=").append(nextMsg).append("(")
          .append(nextMsg != null ?
          new Date(nextMsg.getChangeNumber().getTime()).toString():"")
          .append(")" + "] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
          .append(")")
          .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
          .append("] [startState=").append(startState).append("] [stopState=")
          .append(stopState).append("] [currentState=").append(currentState)
          .append("]]");
@@ -248,9 +249,9 @@
                + " getNextEligibleMessageForDomain(" + opid+ ") "
                + "newMsg isEligible=" + isEligible + " since "
                + "newMsg=[" + newMsg.getChangeNumber()
                + " " + new Date(newMsg.getChangeNumber().getTime()).toString()
                + " " + new Date(newMsg.getChangeNumber().getTime())
                + "] eligibleCN=[" + eligibleCN
                + " " + new Date(eligibleCN.getTime()).toString()+"]"
                + " " + new Date(eligibleCN.getTime())+"]"
                + dumpState());
            if (isEligible)
@@ -430,7 +431,7 @@
    {
      // no chance to have a bad domain set here
    }
    this.initialize(startECLSessionMsg);
    initialize(startECLSessionMsg);
  }
  /**
@@ -668,7 +669,7 @@
      TRACER.debugCaught(DebugLogLevel.ERROR, de);
      if (draftCNDbIter != null)
        draftCNDbIter.releaseCursor();
      throw(de);
      throw de;
    }
    catch(Exception e)
    {
@@ -800,8 +801,8 @@
                    rsd.getStartState().getMaxChangeNumber(aServerId);
                  ChangeNumber providedChange =
                    newDomainCtxt.startState.getMaxChangeNumber(aServerId);
                  if ((providedChange != null)
                      && (providedChange.older(dbOldestChange)))
                  if (providedChange != null
                      && providedChange.older(dbOldestChange))
                  {
                    cookieTooOld=true;
                  }
@@ -848,7 +849,7 @@
        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
          ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
              missingDomains,
              "<" + (providedCookie + missingDomains)+ ">"));
              "<" + providedCookie + missingDomains + ">"));
      }
      domainCtxts = tmpSet.toArray(new DomainContext[tmpSet.size()]);
@@ -1201,7 +1202,7 @@
      {
        // loop until not interrupted
      }
    } while (((interrupted) || (!acquired)) && (!shutdownWriter));
    } while ((interrupted || !acquired) && !shutdownWriter);
    if (msg != null)
    {
      incrementOutCount();
@@ -1271,7 +1272,7 @@
      int iDom;
      boolean continueLooping = true;
      while ((continueLooping) && (searchPhase == INIT_PHASE))
      while (continueLooping && searchPhase == INIT_PHASE)
      {
        // Step 1 & 2
        if (searchPhase == INIT_PHASE)
@@ -1590,7 +1591,7 @@
    int oldest = -1;
    for (int i=0; i<domainCtxts.length; i++)
    {
      if ((domainCtxts[i].active))
      if (domainCtxts[i].active)
      {
        // on the first loop, oldest==-1
        // .msg is null when the previous (non blocking) nextMessage did
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -666,7 +666,7 @@
      attrs.put(ocType, ldapAttrList);
      TRACER.debugInfo("State=" +
          exportContainer.getDbServerState().toString());
          exportContainer.getDbServerState());
      Attribute stateAttr = Attributes.create("state", exportContainer
          .getDbServerState().toString());
      ldapAttrList.clear();
@@ -727,8 +727,8 @@
        SearchFilter filter = searchOperation.getFilter();
        previousChangeNumber = extractChangeNumber(filter);
        if ((previousChangeNumber == null) &&
            (filter.getFilterType().equals(FilterType.AND)))
        if (previousChangeNumber == null &&
            filter.getFilterType().equals(FilterType.AND))
        {
          for (SearchFilter filterComponents: filter.getFilterComponents())
          {
@@ -818,7 +818,7 @@
    if ( (filterType.equals(FilterType.GREATER_OR_EQUAL) ||
             filterType.equals(FilterType.EQUALITY) ) &&
             (filter.getAttributeType().equals(changeNumberAttrType)))
             filter.getAttributeType().equals(changeNumberAttrType))
    {
      try
      {
@@ -867,7 +867,7 @@
          AddOperation addOperation = (AddOperation)msg.createOperation(conn);
          dn = DN.decode("puid=" + addMsg.getParentEntryUUID() + "+" +
              CHANGE_NUMBER + "=" + msg.getChangeNumber().toString() + "+" +
              CHANGE_NUMBER + "=" + msg.getChangeNumber() + "+" +
              msg.getDn() + "," + BASE_DN);
          Map<AttributeType,List<Attribute>> attrs =
@@ -918,7 +918,7 @@
          DeleteMsg delMsg = (DeleteMsg)msg;
          dn = DN.decode("uuid=" + msg.getEntryUUID() + "," +
              CHANGE_NUMBER + "=" + delMsg.getChangeNumber().toString()+ "," +
              CHANGE_NUMBER + "=" + delMsg.getChangeNumber() + "," +
              msg.getDn() +","+ BASE_DN);
          DeleteChangeRecordEntry changeRecord =
@@ -941,7 +941,7 @@
          ModifyOperation op = (ModifyOperation)msg.createOperation(conn);
          dn = DN.decode("uuid=" + msg.getEntryUUID() + "," +
              CHANGE_NUMBER + "=" + msg.getChangeNumber().toString()+ "," +
              CHANGE_NUMBER + "=" + msg.getChangeNumber() + "," +
              msg.getDn() +","+ BASE_DN);
          op.setInternalOperation(true);
@@ -965,7 +965,7 @@
          ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn);
          dn = DN.decode("uuid=" + msg.getEntryUUID() + "," +
              CHANGE_NUMBER + "=" + msg.getChangeNumber().toString()+ "," +
              CHANGE_NUMBER + "=" + msg.getChangeNumber() + "," +
              msg.getDn() +","+ BASE_DN);
          op.setInternalOperation(true);
@@ -1286,7 +1286,7 @@
    {
      for (Control c : requestControls)
      {
        if (c.getOID().equals(OID_INTERNAL_GROUP_MEMBERSHIP_UPDATE))
        if (OID_INTERNAL_GROUP_MEMBERSHIP_UPDATE.equals(c.getOID()))
        {
          return;
        }
@@ -1298,8 +1298,8 @@
    try
    {
      DN backendBaseDN = DN.decode(BASE_DN);
      if ( (searchOperation.getScope().equals(SearchScope.BASE_OBJECT)) &&
           (backendBaseDN.equals(searchOperation.getBaseDN())) )
      if ( searchOperation.getScope().equals(SearchScope.BASE_OBJECT) &&
           backendBaseDN.equals(searchOperation.getBaseDN()) )
      {
        return;
      }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -28,10 +28,9 @@
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
import java.util.*;
@@ -67,7 +66,6 @@
 * are removed and should they be needed again must be read from the backing
 * file
 *
 *
 * it runs a thread that is responsible for saving the messages
 * received to the disk and for trimming them
 * Decision to trim can be based on disk space or age of the message
@@ -382,13 +380,11 @@
            TRACER.debugInfo("In " + "Replication Server " +
              replicationServer.getReplicationPort() + " " +
              baseDn + " " + replicationServer.getServerId() +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " for dn " + baseDn + ", update " + update.getChangeNumber() +
              " will not be sent to replication server " +
              Integer.toString(handler.getServerId()) + " with generation id " +
              Long.toString(handler.getGenerationId()) +
              " different from local " +
              "generation id " + Long.toString(generationId));
              handler.getServerId() + " with generation id " +
              handler.getGenerationId() + " different from local " +
              "generation id " + generationId);
          continue;
        }
@@ -447,20 +443,16 @@
        {
          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
            TRACER.debugInfo("In " + this +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " for dn " + baseDn + ", update " + update.getChangeNumber() +
              " will not be sent to directory server " +
              Integer.toString(handler.getServerId()) + " with generation id " +
              Long.toString(handler.getGenerationId()) +
              " different from local " +
              "generation id " + Long.toString(generationId));
              handler.getServerId() + " with generation id " +
              handler.getGenerationId() + " different from local " +
              "generation id " + generationId);
          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
            TRACER.debugInfo("In RS " +
              replicationServer.getServerId() +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " will not be sent to directory server " +
              Integer.toString(handler.getServerId()) +
              " for dn " + baseDn + ", update " + update.getChangeNumber() +
              " will not be sent to directory server " + handler.getServerId() +
              " as it is in full update");
        }
@@ -697,26 +689,20 @@
    }
    List<Integer> expectedServers = new ArrayList<Integer>();
    if (interestedInAcks)
    {
      if (sourceHandler.isDataServer())
    if (interestedInAcks && sourceHandler.isDataServer())
      {
        // Look for RS eligible for assured
        for (ReplicationServerHandler handler : replicationServers.values())
        {
          if (handler.getGroupId() == groupId)
        if (handler.getGroupId() == groupId
            // No ack expected from a RS with different group id
          {
            if ((generationId > 0) &&
              (generationId == handler.getGenerationId()))
            && generationId > 0 && (generationId == handler.getGenerationId()))
              // No ack expected from a RS with bad gen id
            {
              expectedServers.add(handler.getServerId());
            }
          }
        }
      }
    }
    // Return computed structures
    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
@@ -835,6 +821,7 @@
     * Run when the assured timeout for an assured update message we are waiting
     * acks for occurs.
     */
    @Override
    public void run()
    {
      ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
@@ -857,11 +844,9 @@
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          if (debugEnabled())
            TRACER.debugInfo(
              "In RS " + Integer.toString(replicationServer.getServerId()) +
              " for " + baseDn +
              "In RS " + replicationServer.getServerId() + " for " + baseDn +
              ", sending timeout for assured update with change " + " number " +
              cn.toString() + " to server id " +
              Integer.toString(origServer.getServerId()));
              cn + " to server id " + origServer.getServerId());
          try
          {
            origServer.send(finalAck);
@@ -882,7 +867,7 @@
          }
          // Increment assured counters
          boolean safeRead =
              (expectedAcksInfo instanceof SafeReadExpectedAcksInfo);
              expectedAcksInfo instanceof SafeReadExpectedAcksInfo;
          if (safeRead)
          {
            origServer.incrementAssuredSrReceivedUpdatesTimeout();
@@ -1246,9 +1231,8 @@
          " has servers connected to it - will not reset generationId");
    }
    if ((!lDAPServersConnectedInTheTopology) &&
      (!this.generationIdSavedStatus) &&
      (generationId != -1))
    if (!lDAPServersConnectedInTheTopology && !this.generationIdSavedStatus
        && generationId != -1)
    {
      changeGenerationId(-1, false);
    }
@@ -1265,7 +1249,7 @@
  {
    ReplicationServerHandler oldHandler =
      replicationServers.get(handler.getServerId());
    if ((oldHandler != null))
    if (oldHandler != null)
    {
      if (oldHandler.getServerAddressURL().equals(
        handler.getServerAddressURL()))
@@ -1593,7 +1577,7 @@
              // We log the error. The requestor will detect a timeout or
              // any other failure on the connection.
              logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
                  Integer.toString((msg.getDestination()))));
                  Integer.toString(msg.getDestination())));
            }
          }
        }
@@ -1868,14 +1852,13 @@
  {
    for (DataServerHandler handler : directoryServers.values())
    {
      if ((notThisOne == null) || ((handler != notThisOne)))
      if ((notThisOne == null) || (handler != notThisOne))
        // All except passed one
      {
        for (int i=1; i<=2; i++)
        {
          if (!handler.shuttingDown())
          {
            if (handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          if (!handler.shuttingDown()
              && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
            {
              TopologyMsg topoMsg=createTopologyMsgForDS(handler.getServerId());
              try
@@ -1887,16 +1870,14 @@
              {
                if (i==2)
                {
                  Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                      baseDn,
                      "directory",
                      Integer.toString(handler.getServerId()),
                      e.getMessage());
                Message message =
                    ERR_EXCEPTION_SENDING_TOPO_INFO
                        .get(baseDn, "directory", Integer.toString(handler
                            .getServerId()), e.getMessage());
                  logError(message);
                }
              }
            }
          }
          try { Thread.sleep(100); } catch(Exception e) {}
        }
      }
@@ -1914,9 +1895,8 @@
    {
      for (int i=1; i<=2; i++)
      {
        if (!handler.shuttingDown())
        {
          if (handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
        if (!handler.shuttingDown()
            && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS)
          {
            try
            {
@@ -1927,16 +1907,13 @@
            {
              if (i==2)
              {
                Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(
                    baseDn,
                    "replication",
                    Integer.toString(handler.getServerId()),
                    e.getMessage());
              Message message =
                  ERR_EXCEPTION_SENDING_TOPO_INFO.get(baseDn, "replication",
                      Integer.toString(handler.getServerId()), e.getMessage());
                logError(message);
              }
            }
          }
        }
        try { Thread.sleep(100); } catch(Exception e) {}
      }
    }
@@ -2852,7 +2829,7 @@
   */
  public boolean hasLock()
  {
    return (lock.getHoldCount() > 0);
    return lock.getHoldCount() > 0;
  }
  /**
@@ -2920,7 +2897,7 @@
   */
  public boolean isRunningStatusAnalyzer()
  {
    return (statusAnalyzer != null);
    return statusAnalyzer != null;
  }
  /**
@@ -2971,7 +2948,7 @@
   */
  public boolean isRunningMonitoringPublisher()
  {
    return (monitoringPublisher != null);
    return monitoringPublisher != null;
  }
  /**
@@ -3229,16 +3206,13 @@
      }
      ChangeNumber changelogLastCN = db.getLastChange();
      if (changelogLastCN != null)
      {
        if ((eligibleCN == null) || (changelogLastCN.newer(eligibleCN)))
      if (changelogLastCN != null
          && (eligibleCN == null || changelogLastCN.newer(eligibleCN)))
        {
          eligibleCN = changelogLastCN;
        }
      }
      if ((heartbeatLastDN != null) &&
       ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN))))
      if (heartbeatLastDN != null
          && (eligibleCN == null || heartbeatLastDN.newer(eligibleCN)))
      {
        eligibleCN = heartbeatLastDN;
      }
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -145,7 +145,7 @@
      if (debugEnabled())
      {
        TRACER.debugInfo(getName() + "could not send a heartbeat." +
            e.getMessage() + e.toString());
            e.getMessage() + e);
      }
      // This will be caught in another thread.
    }
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -540,7 +540,7 @@
      // Unsupported message type: should not happen
      throw new IllegalArgumentException("Unexpected PDU type: " +
        msg.getClass().getName() + " :\n" + msg.toString());
        msg.getClass().getName() + " :\n" + msg);
    }
    /**
@@ -1167,9 +1167,8 @@
        if (debugEnabled())
        {
          TRACER.debugInfo("RB for dn " + baseDn
            + " and with server id " + Integer.toString(serverId)
            + " computed " + Integer.toString(nChanges) + " changes late.");
          TRACER.debugInfo("RB for dn " + baseDn + " and with server id "
              + serverId + " computed " + nChanges + " changes late.");
        }
        /*
@@ -1276,8 +1275,7 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
          + serverStartMsg.toString() + "\nAND RECEIVED:\n"
          + msg.toString());
            + serverStartMsg + "\nAND RECEIVED:\n" + msg);
      }
      // Wrap received message in a server info object
@@ -1286,7 +1284,7 @@
      // Sanity check
      String repDn = replServerInfo.getBaseDn();
      if (!(this.baseDn.equals(repDn)))
      if (!this.baseDn.equals(repDn))
      {
        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn,
            this.baseDn);
@@ -1412,8 +1410,8 @@
       */
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn
          + "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString());
        TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
            + startECLSessionMsg);
      }
      // Alright set the timeout to the desired value
@@ -1484,9 +1482,8 @@
      if (debugEnabled())
      {
        TRACER.debugInfo("In RB for " + baseDn
          + "\nRB HANDSHAKE SENT:\n" + startSessionMsg.toString()
          + "\nAND RECEIVED:\n" + topologyMsg.toString());
        TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
            + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
      }
      // Alright set the timeout to the desired value
@@ -2368,7 +2365,7 @@
            }
          }
        }
        if ((!credit) && (currentWindowSemaphore.availablePermits() == 0))
        if (!credit && currentWindowSemaphore.availablePermits() == 0)
        {
          synchronized (connectPhaseLock)
          {
@@ -2829,19 +2826,15 @@
    // These parameters needs to be renegotiated with the ReplicationServer
    // so if they have changed, that requires restarting the session with
    // the ReplicationServer.
    Boolean needToRestartSession = false;
    // A new session is necessary only when information regarding
    // the connection is modified
    if (this.replicationServerUrls == null
    boolean needToRestartSession =
        this.replicationServerUrls == null
        || replicationServers.size() != this.replicationServerUrls.size()
        || !replicationServers.containsAll(this.replicationServerUrls)
        || window != this.maxRcvWindow
        || heartbeatInterval != this.heartbeatInterval
        || groupId != this.groupId)
    {
      needToRestartSession = true;
    }
        || groupId != this.groupId;
    this.replicationServerUrls = replicationServers;
    this.rcvWindow = window;