mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
06.33.2013 f5f321ac378eda6f2effe0fb897ddeed6c1eb188
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -26,16 +26,15 @@
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.JebMessages.NOTE_JEB_EXPORT_FINAL_STATUS;
import static org.opends.messages.JebMessages.NOTE_JEB_EXPORT_PROGRESS_REPORT;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.*;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.JebMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -63,11 +62,13 @@
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationServerListener;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -106,10 +107,6 @@
import org.opends.server.util.ModifyChangeRecordEntry;
import org.opends.server.util.ModifyDNChangeRecordEntry;
import org.opends.server.util.Validator;
import static org.opends.server.config.ConfigConstants.ATTR_OBJECTCLASSES_LC;
import org.opends.server.protocols.internal.InternalSearchOperation;
import static org.opends.server.util.ServerConstants.*;
/**
 * This class defines a backend that stores its information in an
@@ -136,17 +133,17 @@
  private static final String BASE_DN = "dc=replicationchanges";
  // The base DNs for this backend.
  /** The base DNs for this backend. */
  private DN[] baseDNs;
  // The base DNs for this backend, in a hash set.
  private HashSet<DN> baseDNSet;
  /** The base DNs for this backend, in a hash set. */
  private Set<DN> baseDNSet;
  // The set of supported controls for this backend.
  private HashSet<String> supportedControls;
  /** The set of supported controls for this backend. */
  private Set<String> supportedControls;
  // The set of supported features for this backend.
  private HashSet<String> supportedFeatures;
  /** The set of supported features for this backend. */
  private Set<String> supportedFeatures;
  private ReplicationServer server;
@@ -165,13 +162,13 @@
   */
  private long skippedCount = 0;
  //Objectclass for getEntry root entries.
  private HashMap<ObjectClass,String> rootObjectclasses;
  /** Objectclass for getEntry root entries. */
  private Map<ObjectClass, String> rootObjectclasses;
  //Attributes used for getEntry root entries.
  private LinkedHashMap<AttributeType,List<Attribute>> attributes;
  /** Attributes used for getEntry root entries. */
  private Map<AttributeType, List<Attribute>> attributes;
  //Operational attributes used for getEntry root entries.
  /** Operational attributes used for getEntry root entries. */
  private Map<AttributeType,List<Attribute>> operationalAttributes;
@@ -265,7 +262,7 @@
    attributes = new LinkedHashMap<AttributeType,List<Attribute>>();
    Attribute a = Attributes.create("changetype", "add");
    ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
    List<Attribute> attrList = new ArrayList<Attribute>(1);
    attrList.add(a);
    attributes.put(a.getAttributeType(), attrList);
    operationalAttributes = new LinkedHashMap<AttributeType,List<Attribute>>();
@@ -333,17 +330,13 @@
    //This method only returns the number of actual change entries, the
    //domain and any baseDN entries are not counted.
    long retNum=0;
    Iterator<ReplicationServerDomain> rcachei = server.getDomainIterator();
    if (rcachei != null)
    for (Iterator<ReplicationServerDomain> iter = server.getDomainIterator();
         iter.hasNext();)
    {
      while (rcachei.hasNext())
      {
        ReplicationServerDomain rsd = rcachei.next();
        retNum += rsd.getChangesCount();
      }
      ReplicationServerDomain rsd = iter.next();
      retNum += rsd.getChangesCount();
    }
    return retNum;
  }
@@ -479,7 +472,7 @@
   * {@inheritDoc}
   */
  @Override()
  public HashSet<String> getSupportedControls()
  public Set<String> getSupportedControls()
  {
    return supportedControls;
  }
@@ -490,7 +483,7 @@
   * {@inheritDoc}
   */
  @Override()
  public HashSet<String> getSupportedFeatures()
  public Set<String> getSupportedFeatures()
  {
    return supportedFeatures;
  }
@@ -515,41 +508,13 @@
  public synchronized void exportLDIF(LDIFExportConfig exportConfig)
  throws DirectoryException
  {
    List<DN> includeBranches = exportConfig.getIncludeBranches();
    DN baseDN;
    ArrayList<ReplicationServerDomain> exportContainers =
      new ArrayList<ReplicationServerDomain>();
    if(server == null) {
       Message message = ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED.get();
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,message);
    }
    Iterator<ReplicationServerDomain> rsdi = server.getDomainIterator();
    if (rsdi != null)
    {
      while (rsdi.hasNext())
      {
        ReplicationServerDomain rc = rsdi.next();
        // Skip containers that are not covered by the include branches.
        baseDN = DN.decode(rc.getBaseDn() + "," + BASE_DN);
        if (includeBranches == null || includeBranches.isEmpty())
        {
          exportContainers.add(rc);
        }
        else
        {
          for (DN includeBranch : includeBranches)
          {
            if (includeBranch.isDescendantOf(baseDN) ||
                includeBranch.isAncestorOf(baseDN))
            {
              exportContainers.add(rc);
            }
          }
        }
      }
    }
    final List<ReplicationServerDomain> exportContainers =
        findExportContainers(exportConfig);
    // Make a note of the time we started.
    long startTime = System.currentTimeMillis();
@@ -612,7 +577,7 @@
    }
    long finishTime = System.currentTimeMillis();
    long totalTime = (finishTime - startTime);
    long totalTime = finishTime - startTime;
    float rate = 0;
    if (totalTime > 0)
@@ -625,22 +590,56 @@
    logError(message);
  }
  /*
  private List<ReplicationServerDomain> findExportContainers(
      LDIFExportConfig exportConfig) throws DirectoryException
  {
    List<DN> includeBranches = exportConfig.getIncludeBranches();
    List<ReplicationServerDomain> exportContainers =
        new ArrayList<ReplicationServerDomain>();
    for (Iterator<ReplicationServerDomain> iter = server.getDomainIterator();
         iter.hasNext();)
    {
      ReplicationServerDomain rc = iter.next();
      // Skip containers that are not covered by the include branches.
      DN baseDN = DN.decode(rc.getBaseDn() + "," + BASE_DN);
      if (includeBranches == null || includeBranches.isEmpty())
      {
        exportContainers.add(rc);
      }
      else
      {
        for (DN includeBranch : includeBranches)
        {
          if (includeBranch.isDescendantOf(baseDN)
              || includeBranch.isAncestorOf(baseDN))
          {
            exportContainers.add(rc);
          }
        }
      }
    }
    return exportContainers;
  }
  /**
   * Exports the root changes of the export, and one entry by domain.
   */
  private void exportRootChanges(List<ReplicationServerDomain> exportContainers,
      LDIFExportConfig exportConfig, LDIFWriter ldifWriter)
  {
    Map<AttributeType,List<Attribute>> attrs =
      new HashMap<AttributeType,List<Attribute>>();
    ArrayList<Attribute> ldapAttrList = new ArrayList<Attribute>();
    AttributeType ocType = DirectoryServer.getObjectClassAttributeType();
    AttributeBuilder builder = new AttributeBuilder(ocType);
    builder.add("top");
    builder.add("domain");
    Attribute ocAttr = builder.toAttribute();
    List<Attribute> ldapAttrList = new ArrayList<Attribute>();
    ldapAttrList.add(ocAttr);
    Map<AttributeType, List<Attribute>> attrs =
        new HashMap<AttributeType, List<Attribute>>();
    attrs.put(ocType, ldapAttrList);
    try
@@ -1241,7 +1240,7 @@
    public void run()
    {
      long latestCount = exportedCount;
      long deltaCount = (latestCount - previousCount);
      long deltaCount = latestCount - previousCount;
      long latestTime = System.currentTimeMillis();
      long deltaTime = latestTime - previousTime;
@@ -1270,12 +1269,6 @@
  public synchronized void search(SearchOperation searchOperation)
         throws DirectoryException
  {
    // Get the base DN, scope, and filter for the search.
    DN           searchBaseDN = searchOperation.getBaseDN();
    DN baseDN;
    ArrayList<ReplicationServerDomain> searchContainers =
      new ArrayList<ReplicationServerDomain>();
    //This check is for GroupManager initialization. It currently doesn't
    //come into play because the replication server variable is null in
    //the check above. But if the order of initialization of the server variable
@@ -1310,6 +1303,7 @@
    }
    // Make sure the base entry exists if it's supposed to be in this backend.
    final DN searchBaseDN = searchOperation.getBaseDN();
    if (!handlesEntry(searchBaseDN))
    {
      DN matchedDN = searchBaseDN.getParentDNInSuffix();
@@ -1349,30 +1343,35 @@
    }
    // Walk through all entries and send the ones that match.
    Iterator<ReplicationServerDomain> rsdi = server.getDomainIterator();
    if (rsdi != null)
    {
      while (rsdi.hasNext())
      {
        ReplicationServerDomain rsd = rsdi.next();
        // Skip containers that are not covered by the include branches.
        baseDN = DN.decode(rsd.getBaseDn() + "," + BASE_DN);
            if (searchBaseDN.isDescendantOf(baseDN) ||
                searchBaseDN.isAncestorOf(baseDN))
            {
              searchContainers.add(rsd);
            }
      }
    }
    final List<ReplicationServerDomain> searchContainers =
        findSearchContainers(searchBaseDN);
    for (ReplicationServerDomain exportContainer : searchContainers)
    {
      processContainer(exportContainer, null, null, searchOperation);
    }
  }
  private List<ReplicationServerDomain> findSearchContainers(DN searchBaseDN)
      throws DirectoryException
  {
    List<ReplicationServerDomain> searchContainers =
        new ArrayList<ReplicationServerDomain>();
    for (Iterator<ReplicationServerDomain> iter = server.getDomainIterator();
         iter.hasNext();)
    {
      ReplicationServerDomain rsd = iter.next();
      // Skip containers that are not covered by the include branches.
      DN baseDN = DN.decode(rsd.getBaseDn() + "," + BASE_DN);
      if (searchBaseDN.isDescendantOf(baseDN)
          || searchBaseDN.isAncestorOf(baseDN))
      {
        searchContainers.add(rsd);
      }
    }
    return searchContainers;
  }
  /**
   * Retrieves the replication server associated to this backend.
@@ -1382,9 +1381,6 @@
   */
  private ReplicationServer getReplicationServer() throws DirectoryException
  {
    ReplicationServer replicationServer = null;
    DirectoryServer.getSynchronizationProviders();
    for (SynchronizationProvider<?> provider :
      DirectoryServer.getSynchronizationProviders())
    {
@@ -1394,16 +1390,17 @@
        ReplicationServerListener list = mmp.getReplicationServerListener();
        if (list != null)
        {
          replicationServer = list.getReplicationServer();
          break;
          return list.getReplicationServer();
        }
      }
    }
    return replicationServer;
    return null;
  }
  // Find the replication server configuration associated with this
  // replication backend.
  /**
   * Find the replication server configuration associated with this replication
   * backend.
   */
  private ReplicationServerCfg getReplicationServerCfg()
      throws DirectoryException {
    RootCfg root = ServerManagementContext.getInstance().getRootConfiguration();
@@ -1438,13 +1435,13 @@
   */
  private static final class Writer
  {
    // The underlying output stream.
    /** The underlying output stream. */
    private final ByteArrayOutputStream stream;
    // The underlying LDIF config.
    /** The underlying LDIF config. */
    private final LDIFExportConfig config;
    // The LDIF writer.
    /** The LDIF writer. */
    private final LDIFWriter writer;
    /**