mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
06.33.2013 f5f321ac378eda6f2effe0fb897ddeed6c1eb188
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -698,108 +698,103 @@
    try
    {
      Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
      // Creates the table that will contain the real-time info for each
      // and every domain.
      Set<DomainContext> tmpSet = new HashSet<DomainContext>();
      String missingDomains = "";
      if (rsdi != null)
      for (Iterator<ReplicationServerDomain> iter = rs.getDomainIterator();
           iter.hasNext();)
      {
        while (rsdi.hasNext())
        ReplicationServerDomain rsd = iter.next();
        // skip the 'unreal' changelog domain
        if (rsd == this.replicationServerDomain)
          continue;
        // skip the excluded domains
        if (excludedBaseDNs.contains(rsd.getBaseDn()))
        {
          // process a domain
          ReplicationServerDomain rsd = rsdi.next();
          // skip the 'unreal' changelog domain
          if (rsd == this.replicationServerDomain)
            continue;
          // skip the excluded domains
          if (excludedBaseDNs.contains(rsd.getBaseDn()))
          {
            // this is an excluded domain
            if (allowUnknownDomains)
              startStatesFromProvidedCookie.remove(rsd.getBaseDn());
            continue;
          }
          // skip unused domains
          if (rsd.getDbServerState().isEmpty())
            continue;
          // Creates the new domain context
          DomainContext newDomainCtxt = new DomainContext();
          newDomainCtxt.active = true;
          newDomainCtxt.rsd = rsd;
          newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate();
          // Assign the start state for the domain
          if (isPersistent == PERSISTENT_CHANGES_ONLY)
          {
            newDomainCtxt.startState = rsd.getEligibleState(eligibleCN);
          // this is an excluded domain
          if (allowUnknownDomains)
            startStatesFromProvidedCookie.remove(rsd.getBaseDn());
          continue;
        }
        // skip unused domains
        if (rsd.getDbServerState().isEmpty())
          continue;
        // Creates the new domain context
        DomainContext newDomainCtxt = new DomainContext();
        newDomainCtxt.active = true;
        newDomainCtxt.rsd = rsd;
        newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate();
        // Assign the start state for the domain
        if (isPersistent == PERSISTENT_CHANGES_ONLY)
        {
          newDomainCtxt.startState = rsd.getEligibleState(eligibleCN);
          startStatesFromProvidedCookie.remove(rsd.getBaseDn());
        }
        else
        {
          // let's take the start state for this domain from the provided
          // cookie
          newDomainCtxt.startState =
              startStatesFromProvidedCookie.remove(rsd.getBaseDn());
          if (providedCookie == null
              || providedCookie.length() == 0
              || allowUnknownDomains)
          {
            // when there is no cookie provided in the request,
            // let's start traversing this domain from the beginning of
            // what we have in the replication changelog
            if (newDomainCtxt.startState == null)
            {
              ChangeNumber latestTrimCN =
                  new ChangeNumber(newDomainCtxt.domainLatestTrimDate, 0, 0);
              newDomainCtxt.startState =
                  rsd.getStartState().duplicateOnlyOlderThan(latestTrimCN);
            }
          }
          else
          {
            // let's take the start state for this domain from the provided
            // cookie
            newDomainCtxt.startState =
              startStatesFromProvidedCookie.remove(rsd.getBaseDn());
            if ((providedCookie==null)||(providedCookie.length()==0)
                ||allowUnknownDomains)
            // when there is a cookie provided in the request,
            if (newDomainCtxt.startState == null)
            {
              // when there is no cookie provided in the request,
              // let's start traversing this domain from the beginning of
              // what we have in the replication changelog
              if (newDomainCtxt.startState == null)
              missingDomains += (rsd.getBaseDn() + ":;");
              continue;
            }
            else if (!newDomainCtxt.startState.isEmpty())
            {
              if (hasCookieBeenTrimmedFromDB(rsd, newDomainCtxt.startState))
              {
                ChangeNumber latestTrimCN =
                    new ChangeNumber(newDomainCtxt.domainLatestTrimDate, 0,0);
                newDomainCtxt.startState = rsd.getStartState()
                        .duplicateOnlyOlderThan(latestTrimCN);
                throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                    ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE
                        .get(newDomainCtxt.rsd.getBaseDn()));
              }
            }
            else
            {
              // when there is a cookie provided in the request,
              if (newDomainCtxt.startState == null)
              {
                missingDomains += (rsd.getBaseDn() + ":;");
                continue;
              }
              else if (!newDomainCtxt.startState.isEmpty())
              {
                if (hasCookieBeenTrimmedFromDB(rsd, newDomainCtxt.startState))
                {
                  throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                      ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
                          newDomainCtxt.rsd.getBaseDn()));
                }
              }
            }
            // Set the stop state for the domain from the eligibleCN
            newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
          }
          newDomainCtxt.currentState = new ServerState();
          // Creates an unconnected SH for the domain
          MessageHandler mh =
              new MessageHandler(maxQueueSize, replicationServer);
          mh.setInitialServerState(newDomainCtxt.startState);
          mh.setBaseDNAndDomain(rsd.getBaseDn(), false);
          // register the unconnected into the domain
          rsd.registerHandler(mh);
          newDomainCtxt.mh = mh;
          previousCookie.update(newDomainCtxt.rsd.getBaseDn(),
                                newDomainCtxt.startState);
          // store the new context
          tmpSet.add(newDomainCtxt);
          // Set the stop state for the domain from the eligibleCN
          newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
        }
        newDomainCtxt.currentState = new ServerState();
        // Creates an unconnected SH for the domain
        MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
        mh.setInitialServerState(newDomainCtxt.startState);
        mh.setBaseDNAndDomain(rsd.getBaseDn(), false);
        // register the unconnected into the domain
        rsd.registerHandler(mh);
        newDomainCtxt.mh = mh;
        previousCookie.update(newDomainCtxt.rsd.getBaseDn(),
                              newDomainCtxt.startState);
        // store the new context
        tmpSet.add(newDomainCtxt);
      }
      if (missingDomains.length()>0)
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -26,16 +26,15 @@
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.JebMessages.NOTE_JEB_EXPORT_FINAL_STATUS;
import static org.opends.messages.JebMessages.NOTE_JEB_EXPORT_PROGRESS_REPORT;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.*;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.JebMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -63,11 +62,13 @@
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationServerListener;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -106,10 +107,6 @@
import org.opends.server.util.ModifyChangeRecordEntry;
import org.opends.server.util.ModifyDNChangeRecordEntry;
import org.opends.server.util.Validator;
import static org.opends.server.config.ConfigConstants.ATTR_OBJECTCLASSES_LC;
import org.opends.server.protocols.internal.InternalSearchOperation;
import static org.opends.server.util.ServerConstants.*;
/**
 * This class defines a backend that stores its information in an
@@ -136,17 +133,17 @@
  private static final String BASE_DN = "dc=replicationchanges";
  // The base DNs for this backend.
  /** The base DNs for this backend. */
  private DN[] baseDNs;
  // The base DNs for this backend, in a hash set.
  private HashSet<DN> baseDNSet;
  /** The base DNs for this backend, in a hash set. */
  private Set<DN> baseDNSet;
  // The set of supported controls for this backend.
  private HashSet<String> supportedControls;
  /** The set of supported controls for this backend. */
  private Set<String> supportedControls;
  // The set of supported features for this backend.
  private HashSet<String> supportedFeatures;
  /** The set of supported features for this backend. */
  private Set<String> supportedFeatures;
  private ReplicationServer server;
@@ -165,13 +162,13 @@
   */
  private long skippedCount = 0;
  //Objectclass for getEntry root entries.
  private HashMap<ObjectClass,String> rootObjectclasses;
  /** Objectclass for getEntry root entries. */
  private Map<ObjectClass, String> rootObjectclasses;
  //Attributes used for getEntry root entries.
  private LinkedHashMap<AttributeType,List<Attribute>> attributes;
  /** Attributes used for getEntry root entries. */
  private Map<AttributeType, List<Attribute>> attributes;
  //Operational attributes used for getEntry root entries.
  /** Operational attributes used for getEntry root entries. */
  private Map<AttributeType,List<Attribute>> operationalAttributes;
@@ -265,7 +262,7 @@
    attributes = new LinkedHashMap<AttributeType,List<Attribute>>();
    Attribute a = Attributes.create("changetype", "add");
    ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
    List<Attribute> attrList = new ArrayList<Attribute>(1);
    attrList.add(a);
    attributes.put(a.getAttributeType(), attrList);
    operationalAttributes = new LinkedHashMap<AttributeType,List<Attribute>>();
@@ -333,17 +330,13 @@
    //This method only returns the number of actual change entries, the
    //domain and any baseDN entries are not counted.
    long retNum=0;
    Iterator<ReplicationServerDomain> rcachei = server.getDomainIterator();
    if (rcachei != null)
    for (Iterator<ReplicationServerDomain> iter = server.getDomainIterator();
         iter.hasNext();)
    {
      while (rcachei.hasNext())
      {
        ReplicationServerDomain rsd = rcachei.next();
        retNum += rsd.getChangesCount();
      }
      ReplicationServerDomain rsd = iter.next();
      retNum += rsd.getChangesCount();
    }
    return retNum;
  }
@@ -479,7 +472,7 @@
   * {@inheritDoc}
   */
  @Override()
  public HashSet<String> getSupportedControls()
  public Set<String> getSupportedControls()
  {
    return supportedControls;
  }
@@ -490,7 +483,7 @@
   * {@inheritDoc}
   */
  @Override()
  public HashSet<String> getSupportedFeatures()
  public Set<String> getSupportedFeatures()
  {
    return supportedFeatures;
  }
@@ -515,41 +508,13 @@
  public synchronized void exportLDIF(LDIFExportConfig exportConfig)
  throws DirectoryException
  {
    List<DN> includeBranches = exportConfig.getIncludeBranches();
    DN baseDN;
    ArrayList<ReplicationServerDomain> exportContainers =
      new ArrayList<ReplicationServerDomain>();
    if(server == null) {
       Message message = ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED.get();
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,message);
    }
    Iterator<ReplicationServerDomain> rsdi = server.getDomainIterator();
    if (rsdi != null)
    {
      while (rsdi.hasNext())
      {
        ReplicationServerDomain rc = rsdi.next();
        // Skip containers that are not covered by the include branches.
        baseDN = DN.decode(rc.getBaseDn() + "," + BASE_DN);
        if (includeBranches == null || includeBranches.isEmpty())
        {
          exportContainers.add(rc);
        }
        else
        {
          for (DN includeBranch : includeBranches)
          {
            if (includeBranch.isDescendantOf(baseDN) ||
                includeBranch.isAncestorOf(baseDN))
            {
              exportContainers.add(rc);
            }
          }
        }
      }
    }
    final List<ReplicationServerDomain> exportContainers =
        findExportContainers(exportConfig);
    // Make a note of the time we started.
    long startTime = System.currentTimeMillis();
@@ -612,7 +577,7 @@
    }
    long finishTime = System.currentTimeMillis();
    long totalTime = (finishTime - startTime);
    long totalTime = finishTime - startTime;
    float rate = 0;
    if (totalTime > 0)
@@ -625,22 +590,56 @@
    logError(message);
  }
  /*
  private List<ReplicationServerDomain> findExportContainers(
      LDIFExportConfig exportConfig) throws DirectoryException
  {
    List<DN> includeBranches = exportConfig.getIncludeBranches();
    List<ReplicationServerDomain> exportContainers =
        new ArrayList<ReplicationServerDomain>();
    for (Iterator<ReplicationServerDomain> iter = server.getDomainIterator();
         iter.hasNext();)
    {
      ReplicationServerDomain rc = iter.next();
      // Skip containers that are not covered by the include branches.
      DN baseDN = DN.decode(rc.getBaseDn() + "," + BASE_DN);
      if (includeBranches == null || includeBranches.isEmpty())
      {
        exportContainers.add(rc);
      }
      else
      {
        for (DN includeBranch : includeBranches)
        {
          if (includeBranch.isDescendantOf(baseDN)
              || includeBranch.isAncestorOf(baseDN))
          {
            exportContainers.add(rc);
          }
        }
      }
    }
    return exportContainers;
  }
  /**
   * Exports the root changes of the export, and one entry by domain.
   */
  private void exportRootChanges(List<ReplicationServerDomain> exportContainers,
      LDIFExportConfig exportConfig, LDIFWriter ldifWriter)
  {
    Map<AttributeType,List<Attribute>> attrs =
      new HashMap<AttributeType,List<Attribute>>();
    ArrayList<Attribute> ldapAttrList = new ArrayList<Attribute>();
    AttributeType ocType = DirectoryServer.getObjectClassAttributeType();
    AttributeBuilder builder = new AttributeBuilder(ocType);
    builder.add("top");
    builder.add("domain");
    Attribute ocAttr = builder.toAttribute();
    List<Attribute> ldapAttrList = new ArrayList<Attribute>();
    ldapAttrList.add(ocAttr);
    Map<AttributeType, List<Attribute>> attrs =
        new HashMap<AttributeType, List<Attribute>>();
    attrs.put(ocType, ldapAttrList);
    try
@@ -1241,7 +1240,7 @@
    public void run()
    {
      long latestCount = exportedCount;
      long deltaCount = (latestCount - previousCount);
      long deltaCount = latestCount - previousCount;
      long latestTime = System.currentTimeMillis();
      long deltaTime = latestTime - previousTime;
@@ -1270,12 +1269,6 @@
  public synchronized void search(SearchOperation searchOperation)
         throws DirectoryException
  {
    // Get the base DN, scope, and filter for the search.
    DN           searchBaseDN = searchOperation.getBaseDN();
    DN baseDN;
    ArrayList<ReplicationServerDomain> searchContainers =
      new ArrayList<ReplicationServerDomain>();
    //This check is for GroupManager initialization. It currently doesn't
    //come into play because the replication server variable is null in
    //the check above. But if the order of initialization of the server variable
@@ -1310,6 +1303,7 @@
    }
    // Make sure the base entry exists if it's supposed to be in this backend.
    final DN searchBaseDN = searchOperation.getBaseDN();
    if (!handlesEntry(searchBaseDN))
    {
      DN matchedDN = searchBaseDN.getParentDNInSuffix();
@@ -1349,30 +1343,35 @@
    }
    // Walk through all entries and send the ones that match.
    Iterator<ReplicationServerDomain> rsdi = server.getDomainIterator();
    if (rsdi != null)
    {
      while (rsdi.hasNext())
      {
        ReplicationServerDomain rsd = rsdi.next();
        // Skip containers that are not covered by the include branches.
        baseDN = DN.decode(rsd.getBaseDn() + "," + BASE_DN);
            if (searchBaseDN.isDescendantOf(baseDN) ||
                searchBaseDN.isAncestorOf(baseDN))
            {
              searchContainers.add(rsd);
            }
      }
    }
    final List<ReplicationServerDomain> searchContainers =
        findSearchContainers(searchBaseDN);
    for (ReplicationServerDomain exportContainer : searchContainers)
    {
      processContainer(exportContainer, null, null, searchOperation);
    }
  }
  private List<ReplicationServerDomain> findSearchContainers(DN searchBaseDN)
      throws DirectoryException
  {
    List<ReplicationServerDomain> searchContainers =
        new ArrayList<ReplicationServerDomain>();
    for (Iterator<ReplicationServerDomain> iter = server.getDomainIterator();
         iter.hasNext();)
    {
      ReplicationServerDomain rsd = iter.next();
      // Skip containers that are not covered by the include branches.
      DN baseDN = DN.decode(rsd.getBaseDn() + "," + BASE_DN);
      if (searchBaseDN.isDescendantOf(baseDN)
          || searchBaseDN.isAncestorOf(baseDN))
      {
        searchContainers.add(rsd);
      }
    }
    return searchContainers;
  }
  /**
   * Retrieves the replication server associated to this backend.
@@ -1382,9 +1381,6 @@
   */
  private ReplicationServer getReplicationServer() throws DirectoryException
  {
    ReplicationServer replicationServer = null;
    DirectoryServer.getSynchronizationProviders();
    for (SynchronizationProvider<?> provider :
      DirectoryServer.getSynchronizationProviders())
    {
@@ -1394,16 +1390,17 @@
        ReplicationServerListener list = mmp.getReplicationServerListener();
        if (list != null)
        {
          replicationServer = list.getReplicationServer();
          break;
          return list.getReplicationServer();
        }
      }
    }
    return replicationServer;
    return null;
  }
  // Find the replication server configuration associated with this
  // replication backend.
  /**
   * Find the replication server configuration associated with this replication
   * backend.
   */
  private ReplicationServerCfg getReplicationServerCfg()
      throws DirectoryException {
    RootCfg root = ServerManagementContext.getInstance().getRootConfiguration();
@@ -1438,13 +1435,13 @@
   */
  private static final class Writer
  {
    // The underlying output stream.
    /** The underlying output stream. */
    private final ByteArrayOutputStream stream;
    // The underlying LDIF config.
    /** The underlying LDIF config. */
    private final LDIFExportConfig config;
    // The LDIF writer.
    /** The LDIF writer. */
    private final LDIFWriter writer;
    /**
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1414,12 +1414,7 @@
   */
  public Iterator<ReplicationServerDomain> getDomainIterator()
  {
    Collection<ReplicationServerDomain> domains = getReplicationServerDomains();
    if (!domains.isEmpty())
    {
      return domains.iterator();
    }
    return null;
    return getReplicationServerDomains().iterator();
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
@@ -318,7 +318,7 @@
  private int getDSConnectedToRS(int rsIndex)
  {
    Iterator<ReplicationServerDomain> rsdIt = rs[rsIndex].getDomainIterator();
    if (rsdIt != null)
    if (rsdIt.hasNext())
    {
      return rsdIt.next().getConnectedDSs().keySet().size();
    }
@@ -357,15 +357,17 @@
      {
        int rsIndex = rsIndexes[i];
        ReplicationServer repServer = rs[rsIndex];
        Iterator<ReplicationServerDomain> rsdIt = repServer.getDomainIterator();
        int curRsId = repServer.getServerId();
        if (rsdIt == null)
        Iterator<ReplicationServerDomain> iter = repServer.getDomainIterator();
        if (!iter.hasNext())
        {
          // No domain yet, RS is not yet connected to others
          debugInfo("RS " + curRsId + " has no domain yet");
          break;
        }
        Set<Integer> connectedRSsId = rsdIt.next().getConnectedRSs().keySet();
        Set<Integer> connectedRSsId = iter.next().getConnectedRSs().keySet();
        // Does this RS see all other RSs
        int nPeer = 0;
        debugInfo("Checking RSs connected to RS " + curRsId);
@@ -407,15 +409,16 @@
      for (int i = 0; i < nRSs; i++)
      {
        ReplicationServer repServer = rs[i];
        Iterator<ReplicationServerDomain> rsdIt = repServer.getDomainIterator();
        int curRsId = repServer.getServerId();
        if (rsdIt == null)
        Iterator<ReplicationServerDomain> iter = repServer.getDomainIterator();
        if (!iter.hasNext())
        {
          // No domain yet, RS is not yet connected to others
          debugInfo("RS " + curRsId + " has no domain yet");
          break;
        }
        Long rsGenId = rsdIt.next().getGenerationId();
        Long rsGenId = iter.next().getGenerationId();
        // Expecting all RSs to have gen id equal and not -1
        if (rsGenId == -1L)