mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

jcduff
23.04.2008 b4f8838b15342670c31753a484abf0129e3c9653
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -44,7 +44,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -72,18 +71,22 @@
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationServerListener;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.Attributes;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.BackupDirectory;
import org.opends.server.types.ByteString;
import org.opends.server.types.CanceledOperationException;
import org.opends.server.types.ConditionResult;
import org.opends.server.types.Control;
import org.opends.server.types.DN;
@@ -91,6 +94,7 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.Entry;
import org.opends.server.types.FilterType;
import org.opends.server.types.IndexType;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFExportConfig;
@@ -131,6 +135,8 @@
public class ReplicationBackend
       extends Backend
{
  private static final String CHANGE_NUMBER = "replicationChangeNumber";
  /**
   * The tracer object for the debug logger.
   */
@@ -273,15 +279,11 @@
                   DirectoryServer.getObjectClass(ATTR_OBJECTCLASSES_LC, true);
    rootObjectclasses.put(objectclassOC, ATTR_OBJECTCLASSES_LC);
    attributes = new LinkedHashMap<AttributeType,List<Attribute>>();
    AttributeType changeType =
    DirectoryServer.getAttributeType("changetype", true);
    LinkedHashSet<AttributeValue> valueSet =
                                           new LinkedHashSet<AttributeValue>(1);
    valueSet.add(new AttributeValue(changeType, "add"));
    Attribute a = new Attribute(changeType, "changetype", valueSet);
    Attribute a = Attributes.create("changetype", "add");
    ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
    attrList.add(a);
    attributes.put(changeType, attrList);
    attributes.put(a.getAttributeType(), attrList);
    operationalAttributes = new LinkedHashMap<AttributeType,List<Attribute>>();
  }
@@ -465,7 +467,7 @@
   * {@inheritDoc}
   */
  @Override()
  public synchronized void replaceEntry(Entry entry,
  public synchronized void replaceEntry(Entry oldEntry, Entry newEntry,
                                        ModifyOperation modifyOperation)
         throws DirectoryException
  {
@@ -649,13 +651,11 @@
      new HashMap<AttributeType,List<Attribute>>();
    ArrayList<Attribute> ldapAttrList = new ArrayList<Attribute>();
    AttributeType ocType=
      DirectoryServer.getAttributeType("objectclass", true);
    LinkedHashSet<AttributeValue> ocValues =
      new LinkedHashSet<AttributeValue>();
    ocValues.add(new AttributeValue(ocType, "top"));
    ocValues.add(new AttributeValue(ocType, "domain"));
    Attribute ocAttr = new Attribute(ocType, "objectclass", ocValues);
    AttributeType ocType = DirectoryServer.getObjectClassAttributeType();
    AttributeBuilder builder = new AttributeBuilder(ocType);
    builder.add("top");
    builder.add("domain");
    Attribute ocAttr = builder.toAttribute();
    ldapAttrList.add(ocAttr);
    attributes.put(ocType, ldapAttrList);
@@ -676,31 +676,25 @@
      }
      attributes.clear();
      ldapAttrList.clear();
      ldapAttrList.add(ocAttr);
      attributes.put(ocType, ldapAttrList);
      AttributeType stateType=
        DirectoryServer.getAttributeType("state", true);
      LinkedHashSet<AttributeValue> stateValues =
        new LinkedHashSet<AttributeValue>();
      stateValues.add(new AttributeValue(stateType,
          exportContainer.getDbServerState().toString()));
      TRACER.debugInfo("State=" +
          exportContainer.getDbServerState().toString());
      Attribute stateAttr = new Attribute(ocType, "state", stateValues);
      Attribute stateAttr = Attributes.create("state", exportContainer
          .getDbServerState().toString());
      ldapAttrList.clear();
      ldapAttrList.add(stateAttr);
      attributes.put(stateAttr.getAttributeType(), ldapAttrList);
      AttributeType genidType=
        DirectoryServer.getAttributeType("generation-id", true);
      LinkedHashSet<AttributeValue> genidValues =
        new LinkedHashSet<AttributeValue>();
      genidValues.add(new AttributeValue(genidType,
          String.valueOf(exportContainer.getGenerationId())+
          exportContainer.getBaseDn()));
      Attribute genidAttr = new Attribute(ocType, "generation-id", genidValues);
      Attribute genidAttr = Attributes.create("generation-id", String
          .valueOf(exportContainer.getGenerationId())
          + exportContainer.getBaseDn());
      ldapAttrList.clear();
      ldapAttrList.add(genidAttr);
      attributes.put(genidType, ldapAttrList);
      attributes.put(genidAttr.getAttributeType(), ldapAttrList);
      try
      {
@@ -739,13 +733,44 @@
        break;
      }
      ChangeNumber previousChangeNumber = null;
      if (searchOperation != null)
      {
        // Try to optimize for filters like replicationChangeNumber>=xxxxx
        // or replicationChangeNumber=xxxxx :
        // If the search filter is one of these 2 filters, move directly to
        // ChangeNumber=xxxx before starting the iteration.
        SearchFilter filter = searchOperation.getFilter();
        previousChangeNumber = extractChangeNumber(filter);
        if ((previousChangeNumber == null) &&
            (filter.getFilterType().equals(FilterType.AND)))
        {
          for (SearchFilter filterComponents: filter.getFilterComponents())
          {
            previousChangeNumber = extractChangeNumber(filterComponents);
            if (previousChangeNumber != null)
              break;
          }
        }
      }
      ReplicationIterator ri = rsd.getChangelogIterator(serverId,
          null);
          previousChangeNumber);
      if (ri != null)
      {
        try
        {
          int lookthroughCount = 0;
          int lookthroughLimit = 0;
          if (searchOperation != null)
          {
            lookthroughLimit =
              searchOperation.getClientConnection().getLookthroughLimit();
          }
          // Walk through the changes
          while (ri.getChange() != null)
          {
@@ -753,8 +778,32 @@
            {
              break;
            }
            UpdateMessage msg = ri.getChange();
            processChange(msg, exportConfig, ldifWriter, searchOperation);
            if (searchOperation != null)
            {
              try
              {
                if (lookthroughLimit > 0 && lookthroughCount > lookthroughLimit)
                {
                  // Lookthrough limit exceeded
                  searchOperation.setResultCode(
                      ResultCode.ADMIN_LIMIT_EXCEEDED);
                  searchOperation.setErrorMessage(null);
                  break;
                }
                searchOperation.checkIfCanceled(false);
              } catch (CanceledOperationException e)
              {
                searchOperation.setResultCode(ResultCode.CANCELED);
                searchOperation.setErrorMessage(null);
                break;
              }
            }
            lookthroughCount++;
            UpdateMsg msg = ri.getChange();
            processChange(
                msg, exportConfig, ldifWriter, searchOperation,
                rsd.getBaseDn().toString());
            if (!ri.next())
              break;
          }
@@ -768,17 +817,60 @@
  }
  /**
   * Attempt to extract a ChangeNumber from searchFilter like
   * ReplicationChangeNumber=xxxx or ReplicationChangeNumber>=xxxx.
   *
   * @param filter The filter to evaluate.
   *
   * @return       The extracted ChangeNumber or null if no ChangeNumber
   *               was found.
   */
  private ChangeNumber extractChangeNumber(SearchFilter filter)
  {
    AttributeType changeNumberAttrType =
      DirectoryServer.getDefaultAttributeType(CHANGE_NUMBER);
    FilterType filterType = filter.getFilterType();
    if ( (filterType.equals(FilterType.GREATER_OR_EQUAL) ||
             filterType.equals(FilterType.EQUALITY) ) &&
             (filter.getAttributeType().equals(changeNumberAttrType)))
    {
      try
      {
        ChangeNumber startingChangeNumber =
          new ChangeNumber(filter.getAssertionValue().getStringValue());
         return new ChangeNumber(
              startingChangeNumber.getTime(),
              startingChangeNumber.getSeqnum()-1,
              startingChangeNumber.getServerId());
      }
      catch (Exception e)
      {
        // don't try to optimize the search if we the ChangeNumber is
        // not a valid replication ChangeNumber.
      }
    }
    return null;
  }
  /**
   * Export one change.
   */
  private void processChange(UpdateMessage msg,
  private void processChange(UpdateMsg msg,
      LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
      SearchOperation searchOperation)
      SearchOperation searchOperation, String baseDN)
  {
    InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
    Entry entry = null;
    DN dn = null;
    ObjectClass objectclass =
      DirectoryServer.getDefaultObjectClass("extensibleObject");
    try
    {
      if (msg instanceof AddMsg)
@@ -786,43 +878,51 @@
        AddMsg addMsg = (AddMsg)msg;
        AddOperation addOperation = (AddOperation)msg.createOperation(conn);
        dn = DN.decode("puid=" + addMsg.getParentUid() + "," +
            "changeNumber=" + msg.getChangeNumber().toString() + "," +
            msg.getDn() +","+ BASE_DN);
        dn = DN.decode("puid=" + addMsg.getParentUid() + "+" +
            CHANGE_NUMBER + "=" + msg.getChangeNumber().toString() + "+" +
            msg.getDn() + "," + BASE_DN);
        Map<AttributeType,List<Attribute>> attributes =
          new HashMap<AttributeType,List<Attribute>>();
        Map<ObjectClass, String> objectclasses =
          new HashMap<ObjectClass, String>();
        for (RawAttribute a : addOperation.getRawAttributes())
        {
          Attribute attr = a.toAttribute();
          AttributeType attrType = attr.getAttributeType();
          List<Attribute> attrs = attributes.get(attrType);
          if (attrs == null)
          if (attr.getAttributeType().isObjectClassType())
          {
            attrs = new ArrayList<Attribute>(1);
            attrs.add(attr);
            attributes.put(attrType, attrs);
            for (ByteString os : a.getValues())
            {
              String ocName = os.toString();
              ObjectClass oc =
                DirectoryServer.getObjectClass(toLowerCase(ocName));
              if (oc == null)
              {
                oc = DirectoryServer.getDefaultObjectClass(ocName);
              }
              objectclasses.put(oc,ocName);
            }
          }
          else
          {
            attrs.add(attr);
            addAttribute(attributes, attr);
          }
        }
        AddChangeRecordEntry changeRecord =
          new AddChangeRecordEntry(dn, attributes);
        Attribute changetype = Attributes.create("changetype", "add");
        addAttribute(attributes, changetype);
        if (exportConfig != null)
        {
          AddChangeRecordEntry changeRecord =
            new AddChangeRecordEntry(dn, attributes);
          ldifWriter.writeChangeRecord(changeRecord);
        }
        else
        {
          Writer writer = new Writer();
          LDIFWriter ldifWriter2 = writer.getLDIFWriter();
          ldifWriter2.writeChangeRecord(changeRecord);
          LDIFReader reader = writer.getLDIFReader();
          entry = reader.readEntry();
          entry = new Entry(dn, objectclasses, attributes, null);
        }
      }
      else if (msg instanceof DeleteMsg)
@@ -830,7 +930,7 @@
        DeleteMsg delMsg = (DeleteMsg)msg;
        dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
            "changeNumber=" + delMsg.getChangeNumber().toString()+ "," +
            CHANGE_NUMBER + "=" + delMsg.getChangeNumber().toString()+ "," +
            msg.getDn() +","+ BASE_DN);
        DeleteChangeRecordEntry changeRecord =
@@ -853,7 +953,7 @@
        ModifyOperation op = (ModifyOperation)msg.createOperation(conn);
        dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
            "changeNumber=" + msg.getChangeNumber().toString()+ "," +
            CHANGE_NUMBER + "=" + msg.getChangeNumber().toString()+ "," +
            msg.getDn() +","+ BASE_DN);
        op.setInternalOperation(true);
@@ -877,7 +977,7 @@
        ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn);
        dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
            "changeNumber=" + msg.getChangeNumber().toString()+ "," +
            CHANGE_NUMBER + "=" + msg.getChangeNumber().toString()+ "," +
            msg.getDn() +","+ BASE_DN);
        op.setInternalOperation(true);
@@ -906,6 +1006,15 @@
      }
      else
      {
        // Add extensibleObject objectclass and the ChangeNumber
        // in the entry.
        entry.addObjectClass(objectclass);
        Attribute changeNumber =
          Attributes.create(CHANGE_NUMBER, msg.getChangeNumber().toStringUI());
        addAttribute(entry.getUserAttributes(), changeNumber);
        Attribute domain = Attributes.create("replicationDomain", baseDN);
        addAttribute(entry.getUserAttributes(), domain);
        // Get the base DN, scope, and filter for the search.
        DN  searchBaseDN = searchOperation.getBaseDN();
        SearchScope  scope  = searchOperation.getScope();
@@ -941,7 +1050,28 @@
    }
  }
  /**
   * Add an attribute to a provided Map of attribute.
   *
   * @param attributes The Map that should be updated.
   * @param attribute  The attribute that should be added to the Map.
   */
  private void addAttribute(
      Map<AttributeType,List<Attribute>> attributes, Attribute attribute)
  {
    AttributeType attrType = attribute.getAttributeType();
    List<Attribute> attrs = attributes.get(attrType);
    if (attrs == null)
    {
      attrs = new ArrayList<Attribute>(1);
      attrs.add(attribute);
      attributes.put(attrType, attrs);
    }
    else
    {
      attrs.add(attribute);
    }
  }
  /**
   * {@inheritDoc}
@@ -1189,17 +1319,6 @@
      {
        if (baseDNSet.contains(searchBaseDN))
        {
          // Get the base DN, scope, and filter for the search.
          SearchScope  scope  = searchOperation.getScope();
          SearchFilter filter = searchOperation.getFilter();
          Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes,
              operationalAttributes);
          if (re.matchesBaseAndScope(searchBaseDN, scope) &&
              filter.matchesEntry(re))
          {
            searchOperation.returnEntry(re, new LinkedList<Control>());
          }
          return;
        }
        else
@@ -1212,18 +1331,6 @@
      }
    }
    // Get the base DN, scope, and filter for the search.
    SearchScope  scope  = searchOperation.getScope();
    SearchFilter filter = searchOperation.getFilter();
    Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes,
        operationalAttributes);
    if (re.matchesBaseAndScope(searchBaseDN, scope) &&
        filter.matchesEntry(re))
    {
      searchOperation.returnEntry(re, new LinkedList<Control>());
    }
    // Walk through all entries and send the ones that match.
    Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator();
    if (rsdi != null)