mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
24.44.2013 791840ef10ecb9f25b4c3b97eacbf848bf75a261
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2012 ForgeRock AS.
 *      Portions copyright 2012-2013 ForgeRock AS.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
@@ -35,15 +35,12 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Iterator;
import org.opends.server.core.DeleteOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperationBasis;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
@@ -55,7 +52,6 @@
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.LDAPException;
import org.opends.server.types.ModificationType;
import org.opends.server.types.RawModification;
import org.opends.server.types.ResultCode;
@@ -64,7 +60,7 @@
import org.opends.server.types.SearchScope;
/**
 * This class implements a ServerState that is stored on the backends
 * This class implements a ServerState that is stored in the backend
 * used to store the synchronized data and that is therefore persistent
 * across server reboot.
 */
@@ -73,7 +69,6 @@
   private final DN baseDn;
   private final InternalClientConnection conn =
       InternalClientConnection.getRootConnection();
   private final ByteString asn1BaseDn;
   private final int serverId;
   private final ServerState state;
@@ -83,16 +78,6 @@
    */
   protected static final String REPLICATION_STATE = "ds-sync-state";
   /**
    * The attribute name used to store the entryUUID.
    */
   private static final String ENTRY_UUID = "entryUUID";
   /**
    * The attribute name used to store the RUV elements.
    */
   private static final String REPLICATION_RUV_ELEMENT = "nsds50ruv";
  /**
   * create a new ServerState.
   * @param baseDn The baseDN for which the ServerState is created
@@ -103,12 +88,12 @@
    this.baseDn = baseDn;
    this.serverId = serverId;
    this.state = new ServerState();
    this.asn1BaseDn = ByteString.valueOf(baseDn.toString());
    loadState();
  }
  /**
   * Create a new PersistenServerState based on an already existing ServerState.
   * Create a new PersistentServerState based on an already existing
   * ServerState.
   *
   * @param baseDn    The baseDN for which the ServerState is created.
   * @param serverId  The serverId.
@@ -119,7 +104,6 @@
    this.baseDn = baseDn;
    this.serverId = serverId;
    this.state = state;
    this.asn1BaseDn = ByteString.valueOf(baseDn.toString());
    loadState();
  }
@@ -170,16 +154,14 @@
   */
  public void loadState()
  {
    SearchResultEntry stateEntry = null;
    // try to load the state from the base entry.
    stateEntry = searchBaseEntry();
    SearchResultEntry stateEntry = searchBaseEntry();
    if (stateEntry == null)
    {
      // The base entry does not exist yet
      // in the database or was deleted. Try to read the ServerState
      // from the configuration instead.
      /*
      The base entry does not exist yet in the database or was deleted.
      Try to read the ServerState from the configuration instead.
      */
      stateEntry = searchConfigEntry();
    }
@@ -205,50 +187,47 @@
   */
  private SearchResultEntry searchBaseEntry()
  {
    LDAPFilter filter;
    try
    {
      filter = LDAPFilter.decode("objectclass=*");
    } catch (LDAPException e)
    {
      // can not happen
      return null;
    }
    /*
     * Search the database entry that is used to periodically
     * save the ServerState
     */
    LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
    attributes.add(REPLICATION_STATE);
    InternalSearchOperation search = conn.processSearch(asn1BaseDn,
        SearchScope.BASE_OBJECT,
        DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
        filter,attributes);
    if (((search.getResultCode() != ResultCode.SUCCESS)) &&
        ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
    {
      Message message = ERR_ERROR_SEARCHING_RUV.
          get(search.getResultCode().getResultCodeName(), search.toString(),
              search.getErrorMessage(), baseDn.toString());
      logError(message);
      return null;
    }
    SearchResultEntry stateEntry = null;
    if (search.getResultCode() == ResultCode.SUCCESS)
    {
      SearchFilter filter =
          SearchFilter.createFilterFromString("objectclass=*");
      /*
       * Read the serverState from the REPLICATION_STATE attribute
       * Search the database entry that is used to periodically
       * save the ServerState
       */
      LinkedList<SearchResultEntry> result = search.getSearchEntries();
      if (!result.isEmpty())
      LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
      attributes.add(REPLICATION_STATE);
      InternalSearchOperation search = conn.processSearch(baseDn,
          SearchScope.BASE_OBJECT,
          DereferencePolicy.NEVER_DEREF_ALIASES,
          0, 0, false, filter, attributes);
      if (((search.getResultCode() != ResultCode.SUCCESS)) &&
          ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
      {
        stateEntry = result.getFirst();
        Message message = ERR_ERROR_SEARCHING_RUV.
            get(search.getResultCode().getResultCodeName(), search.toString(),
                search.getErrorMessage(), baseDn.toString());
        logError(message);
        return null;
      }
      SearchResultEntry stateEntry = null;
      if (search.getResultCode() == ResultCode.SUCCESS)
      {
        // Read the serverState from the REPLICATION_STATE attribute
        LinkedList<SearchResultEntry> result = search.getSearchEntries();
        if (!result.isEmpty())
        {
          stateEntry = result.getFirst();
        }
      }
      return stateEntry;
    }
    return stateEntry;
    catch (DirectoryException e)
    {
      // cannot happen
      return null;
    }
  }
  /**
@@ -276,15 +255,12 @@
      if (op.getResultCode() == ResultCode.SUCCESS)
      {
        /*
         * Read the serverState from the REPLICATION_STATE attribute
         */
        // Read the serverState from the REPLICATION_STATE attribute
        LinkedList<SearchResultEntry> resultEntries =
          op.getSearchEntries();
        if (!resultEntries.isEmpty())
        {
          SearchResultEntry resultEntry = resultEntries.getFirst();
          return resultEntry;
          return resultEntries.getFirst();
        }
      }
      return null;
@@ -455,31 +431,23 @@
        dbMaxCn = serverStateMaxCn;
        for (SearchResultEntry resEntry : op.getSearchEntries())
        {
          List<Attribute> attrs = resEntry.getAttribute(histType);
          Iterator<AttributeValue> iav = attrs.get(0).iterator();
          try
          for (AttributeValue attrValue :
                    resEntry.getAttribute(histType).get(0))
          {
            while (true)
            {
              AttributeValue attrVal = iav.next();
              HistoricalAttributeValue histVal =
                new HistoricalAttributeValue(attrVal.toString());
              ChangeNumber cn = histVal.getCn();
            HistoricalAttributeValue histVal =
                new HistoricalAttributeValue(attrValue.toString());
            ChangeNumber cn = histVal.getCn();
              if ((cn != null) && (cn.getServerId() == serverId))
            if ((cn != null) && (cn.getServerId() == serverId))
            {
              // compare the csn regarding the maxCn we know and
              // store the biggest
              if (ChangeNumber.compare(dbMaxCn, cn) < 0)
              {
                // compare the csn regarding the maxCn we know and
                // store the biggest
                if (ChangeNumber.compare(dbMaxCn, cn) < 0)
                {
                  dbMaxCn = cn;
                }
                dbMaxCn = cn;
              }
            }
          }
          catch(Exception e)
          {
          }
        }
        if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0)
@@ -495,215 +463,6 @@
    }
  }
/**
 * Check if a ReplicaUpdateVector entry is present
 * if so, translate the ruv into a serverState and
 * a generationId.
 * @return the generationId translated from the RUV
 * entry, 0 if no RUV is present
 */
  public Long checkRUVCompat() {
   Long genId = null;
   SearchResultEntry ruvEntry = null;
   try
   {
     // Search the RUV in the DB
     ruvEntry = searchRUVEntry();
     if (ruvEntry == null)
       return null;
     // Check if the serverState is already initialized
     if( !isServerStateInitilized())
     {
       // Translate the ruv to serverState
       // and GenerationId
       genId = initializeStateWithRUVEntry(ruvEntry);
     }
   }
   catch (Exception e)
   {
     Message message = NOTE_ERR_WHILE_TRYING_TO_DECODE_RUV_IN_STATE.get(
         baseDn.toString());
     logError(message);
   }
   // In any case, remove the RUV entry
   // if it exists
   DeleteOperationBasis del =  new DeleteOperationBasis(conn,
       InternalClientConnection.nextOperationID(),
       InternalClientConnection.nextMessageID(), null,
       ByteString.valueOf(ruvEntry.getDN().toNormalizedString()));
   // Run the internal operation
   del.setInternalOperation(true);
   del.setSynchronizationOperation(true);
   del.setDontSynchronize(true);
   del.run();
   return genId;
  }
  /**
   * Initialize the serverState and the GenerationId based on a RUV
   * entry.
   * @param ruvEntry the entry to translate into a serverState.
   * @return the generationId translated from the RUV entry.
   */
  private Long initializeStateWithRUVEntry(SearchResultEntry ruvEntry) {
    Long genId = null;
    String value = null;
    String csn = null;
    AttributeType ruvElementType =
      DirectoryServer.getAttributeType(REPLICATION_RUV_ELEMENT);
    if (ruvElementType == null)
      return null;
    for (Attribute attr : ruvEntry.getAttribute(ruvElementType))
    {
      Iterator<AttributeValue> it = attr.iterator();
      while (it.hasNext())
      {
        value = it.next().toString();
        // Search for the GenerationId
        if (value.startsWith("{replicageneration} "))
        {
          // Get only the timestamp present in the CSN
          String replicaGen = value.substring(20, 28);
          genId = Long.parseLong(replicaGen,16);
        }
        else
        {
          // Translate the other elements into serverState
          if (value.startsWith("{replica "))
          {
            String[] bits = value.split(" ");
            // Need to take into account when a purl is empty
            if (bits.length > 3)
            {
              if (bits[2].contains("ldap"))
              {
                // the ldap url is present so the max csn is the 5th element
                // Example :
                // {replica 5 ldap://host:port} 494b6635000000050000 4aeab8f300
                //  0000050000
                csn = bits[4];
              }
              else
              {
                // no ldap url so the max csn is the 4th element
                // Example :
                // {replica 31842} 4a0d1ff700017c620000 4a926b6500007c620000
                csn = bits[3];
              }
              String temp = csn.substring(0, 8);
              Long timeStamp = Long.parseLong(temp, 16);
              temp = csn.substring(8, 12);
              Integer seqNum = Integer.parseInt(temp, 16);
              temp = csn.substring(12, 16);
              Integer replicaId = Integer.parseInt(temp, 16);
              // No need to take into account the subSeqNum
              ChangeNumber cn =
                new ChangeNumber(timeStamp*1000, seqNum, replicaId);
              this.update(cn);
            }
          }
        }
      }
    }
    return genId;
}
  /**
   * Check if the server State is initialized by searching
   * the attribute type REPLICATION_STATE in the root entry.
   * @return true if the serverState is initialized, false
   * otherwise
   */
  private boolean isServerStateInitilized() {
    SearchResultEntry resultEntry = searchBaseEntry();
    AttributeType synchronizationStateType =
      DirectoryServer.getAttributeType(REPLICATION_STATE);
    List<Attribute> attrs =
      resultEntry.getAttribute(synchronizationStateType);
    return (attrs != null);
  }
/**
 * Search the database entry that represent a serverState
 * using the RUV format (compatibility mode).
 * @return the corresponding RUV entry, null otherwise
 */
  private SearchResultEntry searchRUVEntry() {
    LDAPFilter filter;
    SearchResultEntry ruvEntry = null;
    // Search the RUV entry
    try
    {
      filter = LDAPFilter.decode("objectclass=ldapSubEntry");
    } catch (LDAPException e)
    {
      // can not happen
      return null;
    }
    LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
    attributes.add(ENTRY_UUID);
    attributes.add(REPLICATION_RUV_ELEMENT);
    InternalSearchOperation search = conn.processSearch(asn1BaseDn,
        SearchScope.SUBORDINATE_SUBTREE,
        DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
        filter,attributes);
    if (((search.getResultCode() != ResultCode.SUCCESS)) &&
        ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
      return null;
    if (search.getResultCode() == ResultCode.SUCCESS)
    {
      /*
       * Search the ldapSubEntry with the entryUUID equals
       * to "ffffffff-ffff-ffff-ffff-ffffffffffff"
       */
      LinkedList<SearchResultEntry> result = search.getSearchEntries();
      if (!result.isEmpty())
      {
        for (SearchResultEntry ldapSubEntry : result)
        {
          List<Attribute> attrs =
            ldapSubEntry.getAttribute(ENTRY_UUID.toLowerCase());
          if (attrs != null)
          {
            Iterator<AttributeValue> iav = attrs.get(0).iterator();
            AttributeValue attrVal = iav.next();
            if (attrVal.toString().
                equalsIgnoreCase("ffffffff-ffff-ffff-ffff-ffffffffffff"))
            {
              ruvEntry = ldapSubEntry;
              break;
            }
          }
        }
      }
    }
    return ruvEntry;
  }
  /**
   * Get the largest ChangeNumber seen for a given LDAP server ID.
   *