mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.43.2013 6591ed28aac023a77de1564a3b65526a53229fe8
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -46,6 +46,7 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationServerListener;
import org.opends.server.replication.protocol.*;
@@ -53,27 +54,30 @@
import org.opends.server.types.*;
import org.opends.server.util.*;
import static java.util.Collections.*;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.JebMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.types.FilterType.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class defines a backend that stores its information in an
 * associated replication server object.
 * This class defines a backend that stores its information in an associated
 * replication server object.
 * <p>
 * This is primarily intended to take advantage of the backup/restore/
 * import/export of the backend API, and to provide an LDAP access
 * to the replication server database.
 * <BR><BR>
 * Entries stored in this backend are held in the DB associated with
 * the replication server.
 * <BR><BR>
 * import/export of the backend API, and to provide an LDAP access to the
 * replication server database.
 * <p>
 * Entries stored in this backend are held in the DB associated with the
 * replication server.
 * <p>
 * Currently are only implemented the create and restore backup features.
 *
 */
public class ReplicationBackend
       extends Backend
@@ -137,20 +141,6 @@
    // Perform all initialization in initializeBackend.
  }
  /**
   * Set the base DNs for this backend.  This is used by the unit tests
   * to set the base DNs without having to provide a configuration
   * object when initializing the backend.
   * @param baseDNs The set of base DNs to be served by this memory backend.
   */
  public void setBaseDNs(DN[] baseDNs)
  {
    this.baseDNs = baseDNs;
  }
  /**
   * {@inheritDoc}
   */
@@ -163,7 +153,7 @@
      BackendCfg cfg = (BackendCfg) config;
      DN[] newBaseDNs = new DN[cfg.getBaseDN().size()];
      cfg.getBaseDN().toArray(newBaseDNs);
      setBaseDNs(newBaseDNs);
      this.baseDNs = newBaseDNs;
    }
  }
@@ -182,8 +172,7 @@
      throw new ConfigException(message);
    }
    baseDNSet = new HashSet<DN>();
    baseDNSet.addAll(Arrays.asList(baseDNs));
    baseDNSet = new HashSet<DN>(Arrays.asList(baseDNs));
    supportedControls = new HashSet<String>();
    supportedFeatures = new HashSet<String>();
@@ -213,8 +202,8 @@
    ObjectClass objectclassOC =
                   DirectoryServer.getObjectClass(ATTR_OBJECTCLASSES_LC, true);
    rootObjectclasses.put(objectclassOC, ATTR_OBJECTCLASSES_LC);
    attributes = new LinkedHashMap<AttributeType,List<Attribute>>();
    attributes = new LinkedHashMap<AttributeType,List<Attribute>>();
    Attribute a = Attributes.create("changetype", "add");
    List<Attribute> attrList = new ArrayList<Attribute>(1);
    attrList.add(a);
@@ -284,10 +273,8 @@
    //This method only returns the number of actual change entries, the
    //domain and any baseDN entries are not counted.
    long retNum=0;
    for (Iterator<ReplicationServerDomain> iter = server.getDomainIterator();
         iter.hasNext();)
    for (ReplicationServerDomain rsd : toIterable(server.getDomainIterator()))
    {
      ReplicationServerDomain rsd = iter.next();
      retNum += rsd.getChangesCount();
    }
    return retNum;
@@ -507,7 +494,8 @@
        {
          break;
        }
        processContainer(exportContainer, exportConfig, ldifWriter, null);
        writeChangesAfterChangeNumber(exportContainer, exportConfig,
            ldifWriter, null, null);
      }
    }
    finally
@@ -540,23 +528,22 @@
    for (Iterator<ReplicationServerDomain> iter = server.getDomainIterator();
         iter.hasNext();)
    {
      ReplicationServerDomain rc = iter.next();
      ReplicationServerDomain rsd = iter.next();
      // Skip containers that are not covered by the include branches.
      DN baseDN = DN.decode(rc.getBaseDn() + "," + BASE_DN);
      if (includeBranches == null || includeBranches.isEmpty())
      {
        exportContainers.add(rc);
        exportContainers.add(rsd);
      }
      else
      {
        DN baseDN = DN.decode(rsd.getBaseDn() + "," + BASE_DN);
        for (DN includeBranch : includeBranches)
        {
          if (includeBranch.isDescendantOf(baseDN)
              || includeBranch.isAncestorOf(baseDN))
          {
            exportContainers.add(rc);
            exportContainers.add(rsd);
          }
        }
      }
@@ -576,12 +563,9 @@
    builder.add("domain");
    Attribute ocAttr = builder.toAttribute();
    List<Attribute> ldapAttrList = new ArrayList<Attribute>();
    ldapAttrList.add(ocAttr);
    Map<AttributeType, List<Attribute>> attrs =
        new HashMap<AttributeType, List<Attribute>>();
    attrs.put(ocType, ldapAttrList);
    attrs.put(ocType, singletonList(ocAttr));
    try
    {
@@ -603,31 +587,22 @@
        break;
      }
      attrs.clear();
      // TODO JNR these multiple calls to clear() are more than suspect!
      ldapAttrList.clear();
      ldapAttrList.add(ocAttr);
      attrs.put(ocType, ldapAttrList);
      TRACER.debugInfo("State=" +
          exportContainer.getDbServerState());
      Attribute stateAttr = Attributes.create("state",
          exportContainer.getDbServerState().toString());
      ldapAttrList.clear();
      ldapAttrList.add(stateAttr);
      attrs.put(stateAttr.getAttributeType(), ldapAttrList);
      final ServerState serverState = exportContainer.getDbServerState();
      TRACER.debugInfo("State=" + serverState);
      Attribute stateAttr = Attributes.create("state", serverState.toString());
      Attribute genidAttr = Attributes.create("generation-id",
          exportContainer.getGenerationId() + exportContainer.getBaseDn());
      ldapAttrList.clear();
      ldapAttrList.add(genidAttr);
      attrs.put(genidAttr.getAttributeType(), ldapAttrList);
      attrs.clear();
      attrs.put(ocType, singletonList(ocAttr));
      attrs.put(stateAttr.getAttributeType(), singletonList(stateAttr));
      attrs.put(genidAttr.getAttributeType(), singletonList(genidAttr));
      final String dnString = exportContainer.getBaseDn() + "," + BASE_DN;
      try
      {
        ChangeRecordEntry changeRecord = new AddChangeRecordEntry(
            DN.decode(exportContainer.getBaseDn() + "," + BASE_DN), attrs);
        DN dn = DN.decode(dnString);
        ChangeRecordEntry changeRecord = new AddChangeRecordEntry(dn, attrs);
        ldifWriter.writeChangeRecord(changeRecord);
      }
      catch (Exception e)
@@ -636,100 +611,47 @@
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message message = ERR_BACKEND_EXPORT_ENTRY.get(
            exportContainer.getBaseDn() + "," + BASE_DN,
            String.valueOf(e));
        logError(message);
        logError(ERR_BACKEND_EXPORT_ENTRY.get(dnString, String.valueOf(e)));
      }
    }
  }
  /**
   * Processes the changes for a given ReplicationServerDomain.
   * Exports or returns all the changes from a ReplicationServerDomain coming
   * after the changeNumber specified in the searchOperation.
   */
  private void processContainer(ReplicationServerDomain rsd,
      LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
      SearchOperation searchOperation)
  private void writeChangesAfterChangeNumber(ReplicationServerDomain rsd,
      final LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
      SearchOperation searchOperation, final ChangeNumber previousCN)
  {
    for (int serverId : rsd.getServers())
    {
      if (exportConfig != null && exportConfig.isCancelled())
      { // Abort if cancelled
        break;
        return;
      }
      ChangeNumber previousChangeNumber = null;
      if (searchOperation != null)
      {
        // Try to optimize for filters like replicationChangeNumber>=xxxxx
        // or replicationChangeNumber=xxxxx :
        // If the search filter is one of these 2 filters, move directly to
        // ChangeNumber=xxxx before starting the iteration.
        SearchFilter filter = searchOperation.getFilter();
        previousChangeNumber = extractChangeNumber(filter);
        if (previousChangeNumber == null &&
            filter.getFilterType().equals(FilterType.AND))
        {
          for (SearchFilter filterComponents: filter.getFilterComponents())
          {
            previousChangeNumber = extractChangeNumber(filterComponents);
            if (previousChangeNumber != null)
              break;
          }
        }
      }
      ReplicationIterator ri = rsd.getChangelogIterator(serverId,
          previousChangeNumber);
      ReplicationIterator ri = rsd.getChangelogIterator(serverId, previousCN);
      if (ri != null)
      {
        try
        {
          int lookthroughCount = 0;
          int lookthroughLimit = 0;
          if (searchOperation != null)
          {
            lookthroughLimit =
              searchOperation.getClientConnection().getLookthroughLimit();
          }
          // Walk through the changes
          while (ri.getChange() != null)
          {
            if (exportConfig != null && exportConfig.isCancelled())
            { // abort if cancelled
              break;
              return;
            }
            if (searchOperation != null)
            if (!canContinue(searchOperation, lookthroughCount))
            {
              try
              {
                if (lookthroughLimit > 0 && lookthroughCount > lookthroughLimit)
                {
                  // Lookthrough limit exceeded
                  searchOperation.setResultCode(
                      ResultCode.ADMIN_LIMIT_EXCEEDED);
                  searchOperation.setErrorMessage(null);
                  break;
                }
                searchOperation.checkIfCanceled(false);
              } catch (CanceledOperationException e)
              {
                searchOperation.setResultCode(ResultCode.CANCELED);
                searchOperation.setErrorMessage(null);
                break;
              }
              break;
            }
            lookthroughCount++;
            UpdateMsg msg = ri.getChange();
            processChange(
                msg, exportConfig, ldifWriter, searchOperation,
                rsd.getBaseDn());
            if (!ri.next())
              break;
            writeChange(ri.getChange(), ldifWriter, searchOperation,
                rsd.getBaseDn(), exportConfig != null);
          }
        }
        finally
@@ -740,6 +662,45 @@
    }
  }
  private boolean canContinue(SearchOperation searchOperation,
      int lookthroughCount)
  {
    if (searchOperation == null)
    {
      return true;
    }
    int limit = searchOperation.getClientConnection().getLookthroughLimit();
    if (lookthroughCount > limit && limit > 0)
    {
      // lookthrough limit exceeded
      searchOperation.setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED);
      searchOperation.setErrorMessage(null);
      return false;
    }
    try
    {
      searchOperation.checkIfCanceled(false);
      return true;
    }
    catch (CanceledOperationException e)
    {
      searchOperation.setResultCode(ResultCode.CANCELED);
      searchOperation.setErrorMessage(null);
      return false;
    }
  }
  private ChangeNumber extractChangeNumber(SearchOperation searchOperation)
  {
    if (searchOperation != null)
    {
      return extractChangeNumber(searchOperation.getFilter());
    }
    return null;
  }
  /**
   * Attempt to extract a ChangeNumber from searchFilter like
   * ReplicationChangeNumber=xxxx or ReplicationChangeNumber>=xxxx.
@@ -751,27 +712,42 @@
   */
  private ChangeNumber extractChangeNumber(SearchFilter filter)
  {
    AttributeType changeNumberAttrType =
      DirectoryServer.getDefaultAttributeType(CHANGE_NUMBER);
    FilterType filterType = filter.getFilterType();
    if ( (filterType.equals(FilterType.GREATER_OR_EQUAL) ||
             filterType.equals(FilterType.EQUALITY) ) &&
             filter.getAttributeType().equals(changeNumberAttrType))
    // Try to optimize for filters like replicationChangeNumber>=xxxxx
    // or replicationChangeNumber=xxxxx :
    // If the search filter is one of these 2 filters, move directly to
    // ChangeNumber=xxxx before starting the iteration.
    final FilterType filterType = filter.getFilterType();
    if (GREATER_OR_EQUAL.equals(filterType) || EQUALITY.equals(filterType))
    {
      try
      AttributeType changeNumberAttrType =
          DirectoryServer.getDefaultAttributeType(CHANGE_NUMBER);
      if (filter.getAttributeType().equals(changeNumberAttrType))
      {
        ChangeNumber startingChangeNumber =
          new ChangeNumber(filter.getAssertionValue().getValue().toString());
         return new ChangeNumber(
              startingChangeNumber.getTime(),
              startingChangeNumber.getSeqnum()-1,
              startingChangeNumber.getServerId());
        try
        {
          ChangeNumber startingCN =
             new ChangeNumber(filter.getAssertionValue().getValue().toString());
          return new ChangeNumber(startingCN.getTime(),
              startingCN.getSeqnum() - 1, startingCN.getServerId());
        }
        catch (Exception e)
        {
          // don't try to optimize the search if the ChangeNumber is
          // not a valid replication ChangeNumber.
        }
      }
      catch (Exception e)
    }
    else if (AND.equals(filterType))
    {
      for (SearchFilter filterComponent : filter.getFilterComponents())
      {
        // don't try to optimize the search if we the ChangeNumber is
        // not a valid replication ChangeNumber.
        // This code does not expect more than one CN in the search filter.
        // It is ok, since it is only used by developers/testers for debugging.
        final ChangeNumber previousCN = extractChangeNumber(filterComponent);
        if (previousCN != null)
        {
          return previousCN;
        }
      }
    }
    return null;
@@ -779,21 +755,19 @@
  /**
   * Export one change.
   * Exports one change.
   */
  private void processChange(UpdateMsg updateMsg,
      LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
      SearchOperation searchOperation, String baseDN)
  private void writeChange(UpdateMsg updateMsg, LDIFWriter ldifWriter,
      SearchOperation searchOperation, String baseDN, boolean isExport)
  {
    InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
    Entry entry = null;
    DN dn = null;
    ObjectClass objectclass =
    ObjectClass extensibleObjectOC =
      DirectoryServer.getDefaultObjectClass("extensibleObject");
    try
    {
      if (updateMsg instanceof LDAPUpdateMsg)
@@ -837,11 +811,9 @@
              addAttribute(attrs, attr);
            }
          }
          addAttribute(attrs, "changetype", "add");
          Attribute changetype = Attributes.create("changetype", "add");
          addAttribute(attrs, changetype);
          if (exportConfig != null)
          if (isExport)
          {
            ChangeRecordEntry changeRecord =
              new AddChangeRecordEntry(dn, attrs);
@@ -855,51 +827,45 @@
        else if (msg instanceof DeleteMsg)
        {
          dn = computeDN(msg);
          ChangeRecordEntry changeRecord = new DeleteChangeRecordEntry(dn);
          entry = writeChangeRecord(exportConfig, ldifWriter, changeRecord);
          entry = writeChangeRecord(ldifWriter, changeRecord, isExport);
        }
        else if (msg instanceof ModifyMsg)
        {
          ModifyOperation op = (ModifyOperation)msg.createOperation(conn);
          op.setInternalOperation(true);
          dn = computeDN(msg);
          ChangeRecordEntry changeRecord =
            new ModifyChangeRecordEntry(dn, op.getRawModifications());
          entry = writeChangeRecord(exportConfig, ldifWriter, changeRecord);
          entry = writeChangeRecord(ldifWriter, changeRecord, isExport);
        }
        else if (msg instanceof ModifyDNMsg)
        {
          ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn);
          op.setInternalOperation(true);
          dn = computeDN(msg);
          ChangeRecordEntry changeRecord =
            new ModifyDNChangeRecordEntry(dn, op.getNewRDN(), op.deleteOldRDN(),
                op.getNewSuperior());
          entry = writeChangeRecord(exportConfig, ldifWriter, changeRecord);
          ChangeRecordEntry changeRecord = new ModifyDNChangeRecordEntry(
              dn, op.getNewRDN(), op.deleteOldRDN(), op.getNewSuperior());
          entry = writeChangeRecord(ldifWriter, changeRecord, isExport);
        }
        if (exportConfig != null)
        if (isExport)
        {
          this.exportedCount++;
        }
        else
        {
          // Add extensibleObject objectclass and the ChangeNumber
          // in the entry.
          if (!entry.getObjectClasses().containsKey(objectclass))
            entry.addObjectClass(objectclass);
          Attribute changeNumber =
            Attributes.create(CHANGE_NUMBER,
                msg.getChangeNumber().toString());
          addAttribute(entry.getUserAttributes(), changeNumber);
          Attribute domain = Attributes.create("replicationDomain", baseDN);
          addAttribute(entry.getUserAttributes(), domain);
          // Add extensibleObject objectclass and the ChangeNumber in the entry.
          if (!entry.getObjectClasses().containsKey(extensibleObjectOC))
            entry.addObjectClass(extensibleObjectOC);
          addAttribute(entry.getUserAttributes(), CHANGE_NUMBER,
              msg.getChangeNumber().toString());
          addAttribute(entry.getUserAttributes(), "replicationDomain", baseDN);
          // Get the base DN, scope, and filter for the search.
          DN  searchBaseDN = searchOperation.getBaseDN();
          DN     searchBaseDN = searchOperation.getBaseDN();
          SearchScope  scope  = searchOperation.getScope();
          SearchFilter filter = searchOperation.getFilter();
@@ -918,52 +884,47 @@
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      String dnStr;
      if (dn == null)
      {
        dnStr = "Unkown";
      }
      else
      {
        dnStr = dn.toNormalizedString();
      }
      final String dnStr = (dn != null) ? dn.toNormalizedString() : "Unknown";
      Message message;
      if (exportConfig != null)
      if (isExport)
      {
        message = ERR_BACKEND_EXPORT_ENTRY.get(
          dnStr, String.valueOf(e));
        message = ERR_BACKEND_EXPORT_ENTRY.get(dnStr, String.valueOf(e));
      }
      else
      {
        message = ERR_BACKEND_SEARCH_ENTRY.get(
            dnStr, e.getLocalizedMessage());
        message = ERR_BACKEND_SEARCH_ENTRY.get(dnStr, e.getLocalizedMessage());
      }
      logError(message);
    }
  }
  private DN computeDN(LDAPUpdateMsg msg) throws DirectoryException
  {
    return DN.decode("uuid=" + msg.getEntryUUID() + "," + CHANGE_NUMBER + "="
        + msg.getChangeNumber() + "," + msg.getDn() + "," + BASE_DN);
  }
  private Entry writeChangeRecord(LDIFExportConfig exportConfig,
      LDIFWriter ldifWriter, ChangeRecordEntry changeRecord)
      throws IOException, LDIFException
  private Entry writeChangeRecord(LDIFWriter ldifWriter,
      ChangeRecordEntry changeRecord, boolean isExport) throws IOException,
      LDIFException
  {
    if (exportConfig != null)
    if (isExport)
    {
      ldifWriter.writeChangeRecord(changeRecord);
      return null;
    }
    final Writer writer = new Writer();
    final LDIFWriter ldifWriter2 = writer.getLDIFWriter();
    ldifWriter2.writeChangeRecord(changeRecord);
    final LDIFReader ldifReader = writer.getLDIFReader();
    return ldifReader.readEntry();
    writer.getLDIFWriter().writeChangeRecord(changeRecord);
    return writer.getLDIFReader().readEntry();
  }
  private void addAttribute(Map<AttributeType, List<Attribute>> attributes,
      String attrName, String attrValue)
  {
    addAttribute(attributes, Attributes.create(attrName, attrValue));
  }
  /**
@@ -1035,37 +996,39 @@
    return true;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override()
  public void createBackup(BackupConfig backupConfig)
         throws DirectoryException
  {
    BackupManager backupManager = new BackupManager(getBackendID());
    File backendDir = getFileForPath(getReplicationServerCfg()
        .getReplicationDBDirectory());
    backupManager.createBackup(backendDir, backupConfig);
    createBackupManager().createBackup(getBackendDir(), backupConfig);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override()
  public void removeBackup(BackupDirectory backupDirectory,
                           String backupID)
  public void restoreBackup(RestoreConfig restoreConfig)
         throws DirectoryException
  {
    BackupManager backupManager =
      new BackupManager(getBackendID());
    backupManager.removeBackup(backupDirectory, backupID);
    createBackupManager().restoreBackup(getBackendDir(), restoreConfig);
  }
  /** {@inheritDoc} */
  @Override()
  public void removeBackup(BackupDirectory backupDirectory, String backupID)
      throws DirectoryException
  {
    createBackupManager().removeBackup(backupDirectory, backupID);
  }
  private BackupManager createBackupManager()
  {
    return new BackupManager(getBackendID());
  }
  private File getBackendDir() throws DirectoryException
  {
   return getFileForPath(getReplicationServerCfg().getReplicationDBDirectory());
  }
  /**
   * {@inheritDoc}
@@ -1076,24 +1039,6 @@
    return true;
  }
  /**
   * {@inheritDoc}
   */
  @Override()
  public void restoreBackup(RestoreConfig restoreConfig)
         throws DirectoryException
  {
    BackupManager backupManager =
      new BackupManager(getBackendID());
    File backendDir = getFileForPath(getReplicationServerCfg()
        .getReplicationDBDirectory());
    backupManager.restoreBackup(backendDir, restoreConfig);
  }
  /**
   * {@inheritDoc}
   */
@@ -1204,8 +1149,7 @@
      }
    }
    // don't do anything if the search is a base search on
    // the backend suffix.
    // don't do anything if the search is a base search on the backend suffix.
    try
    {
      DN backendBaseDN = DN.decode(BASE_DN);
@@ -1261,7 +1205,9 @@
        findSearchContainers(searchBaseDN);
    for (ReplicationServerDomain exportContainer : searchContainers)
    {
      processContainer(exportContainer, null, null, searchOperation);
      final ChangeNumber previousCN = extractChangeNumber(searchOperation);
      writeChangesAfterChangeNumber(exportContainer, null, null,
          searchOperation, previousCN);
    }
  }
@@ -1413,4 +1359,3 @@
    throw new UnsupportedOperationException("Operation not supported.");
  }
}