mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
19.33.2008 3254d0089701cbf24c06cc019def174b13628923
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -136,7 +136,7 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private static final String EXPORT_BASE_DN = "dc=replicationChanges";
  private static final String BASE_DN = "dc=replicationchanges";
  // The base DNs for this backend.
  private DN[] baseDNs;
@@ -545,7 +545,7 @@
        ReplicationServerDomain rc = rsdi.next();
        // Skip containers that are not covered by the include branches.
        baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN);
        baseDN = DN.decode(rc.getBaseDn().toString() + "," + BASE_DN);
        if (includeBranches == null || includeBranches.isEmpty())
        {
@@ -662,7 +662,7 @@
    try
    {
      AddChangeRecordEntry changeRecord =
        new AddChangeRecordEntry(DN.decode(EXPORT_BASE_DN),
        new AddChangeRecordEntry(DN.decode(BASE_DN),
                               attributes);
      ldifWriter.writeChangeRecord(changeRecord);
    }
@@ -706,7 +706,7 @@
      {
        AddChangeRecordEntry changeRecord =
          new AddChangeRecordEntry(DN.decode(
              exportContainer.getBaseDn() + "," + EXPORT_BASE_DN),
              exportContainer.getBaseDn() + "," + BASE_DN),
              attributes);
        ldifWriter.writeChangeRecord(changeRecord);
      }
@@ -717,7 +717,7 @@
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message message = ERR_BACKEND_EXPORT_ENTRY.get(
            exportContainer.getBaseDn() + "," + EXPORT_BASE_DN,
            exportContainer.getBaseDn() + "," + BASE_DN,
            String.valueOf(e));
        logError(message);
      }
@@ -788,7 +788,7 @@
        dn = DN.decode("puid=" + addMsg.getParentUid() + "," +
            "changeNumber=" + msg.getChangeNumber().toString() + "," +
            msg.getDn() +","+ "dc=replicationChanges");
            msg.getDn() +","+ BASE_DN);
        Map<AttributeType,List<Attribute>> attributes =
          new HashMap<AttributeType,List<Attribute>>();
@@ -831,7 +831,7 @@
        dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
            "changeNumber=" + delMsg.getChangeNumber().toString()+ "," +
            msg.getDn() +","+ "dc=replicationChanges");
            msg.getDn() +","+ BASE_DN);
        DeleteChangeRecordEntry changeRecord =
          new DeleteChangeRecordEntry(dn);
@@ -854,7 +854,7 @@
        dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
            "changeNumber=" + msg.getChangeNumber().toString()+ "," +
            msg.getDn() +","+ "dc=replicationChanges");
            msg.getDn() +","+ BASE_DN);
        op.setInternalOperation(true);
        ModifyChangeRecordEntry changeRecord =
@@ -878,7 +878,7 @@
        dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
            "changeNumber=" + msg.getChangeNumber().toString()+ "," +
            msg.getDn() +","+ "dc=replicationChanges");
            msg.getDn() +","+ BASE_DN);
        op.setInternalOperation(true);
        ModifyDNChangeRecordEntry changeRecord =
@@ -907,12 +907,13 @@
      else
      {
        // Get the base DN, scope, and filter for the search.
        DN           searchBaseDN = searchOperation.getBaseDN();
        DN  searchBaseDN = searchOperation.getBaseDN();
        SearchScope  scope  = searchOperation.getScope();
        SearchFilter filter = searchOperation.getFilter();
        if (entry.matchesBaseAndScope(searchBaseDN, scope) &&
            filter.matchesEntry(entry))
        boolean ms = entry.matchesBaseAndScope(searchBaseDN, scope);
        boolean mf = filter.matchesEntry(entry);
        if ( ms && mf )
        {
          searchOperation.returnEntry(entry, new LinkedList<Control>());
        }
@@ -1188,10 +1189,17 @@
      {
        if (baseDNSet.contains(searchBaseDN))
        {
          searchOperation.returnEntry(
              new Entry(searchBaseDN, rootObjectclasses, attributes,
                  operationalAttributes),
                  new LinkedList<Control>());
          // Get the base DN, scope, and filter for the search.
          SearchScope  scope  = searchOperation.getScope();
          SearchFilter filter = searchOperation.getFilter();
          Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes,
              operationalAttributes);
          if (re.matchesBaseAndScope(searchBaseDN, scope) &&
              filter.matchesEntry(re))
          {
            searchOperation.returnEntry(re, new LinkedList<Control>());
          }
          return;
        }
        else
@@ -1204,6 +1212,18 @@
      }
    }
    // Get the base DN, scope, and filter for the search.
    SearchScope  scope  = searchOperation.getScope();
    SearchFilter filter = searchOperation.getFilter();
    Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes,
        operationalAttributes);
    if (re.matchesBaseAndScope(searchBaseDN, scope) &&
        filter.matchesEntry(re))
    {
      searchOperation.returnEntry(re, new LinkedList<Control>());
    }
    // Walk through all entries and send the ones that match.
    Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator();
    if (rsdi != null)
@@ -1213,7 +1233,7 @@
        ReplicationServerDomain rsd = rsdi.next();
        // Skip containers that are not covered by the include branches.
        baseDN = DN.decode(rsd.getBaseDn().toString() + "," + EXPORT_BASE_DN);
        baseDN = DN.decode(rsd.getBaseDn().toString() + "," + BASE_DN);
            if (searchBaseDN.isDescendantOf(baseDN) ||
                searchBaseDN.isAncestorOf(baseDN))
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -37,6 +37,7 @@
import org.opends.server.types.DN;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMessage;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
@@ -63,7 +64,11 @@
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  /**
  // The lock used to provide exclusive access to the thread that
  // close the db (shutdown or clear).
  private ReentrantReadWriteLock dbCloseLock;
 /**
   * Creates a new database or open existing database that will be used
   * to store and retrieve changes from an LDAP server.
   * @param serverId The identifier of the LDAP server.
@@ -86,6 +91,8 @@
    db = dbenv.getOrAddDb(serverId, baseDn,
        replicationServer.getReplicationServerDomain(baseDn,
        true).getGenerationId());
    dbCloseLock = new ReentrantReadWriteLock(true);
  }
  /**
@@ -108,6 +115,7 @@
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      {
        dbCloseLock.readLock().lock();
        try
        {
          txn = dbenv.beginTransaction();
@@ -128,6 +136,10 @@
          txn.abort();
          txn = null;
        }
        finally
        {
          dbCloseLock.readLock().unlock();
        }
      }
      if (!done)
      {
@@ -190,8 +202,17 @@
  {
    try
    {
      db.close();
    } catch (DatabaseException e)
      dbCloseLock.writeLock().lock();
      try
      {
        db.close();
      }
      finally
      {
        dbCloseLock.writeLock().unlock();
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(NOTE_EXCEPTION_CLOSING_DATABASE.get(this.toString()));
@@ -223,6 +244,7 @@
   * @throws DatabaseException If a database error prevented the cursor
   *                           creation.
   * @throws Exception if the ReplServerDBCursor creation failed.
   *
   * @return The ReplServerDBCursor.
   */
  public ReplServerDBCursor openDeleteCursor()
@@ -242,10 +264,13 @@
    try
    {
        cursor = db.openCursor(null, null);
    } catch (DatabaseException e1)
      dbCloseLock.readLock().lock();
      cursor = db.openCursor(null, null);
    }
    catch (DatabaseException e1)
    {
        return null;
      dbCloseLock.readLock().unlock();
      return null;
    }
    try
    {
@@ -253,6 +278,7 @@
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      cursor.close();
      dbCloseLock.readLock().unlock();
      if (status != OperationStatus.SUCCESS)
      {
        /* database is empty */
@@ -268,11 +294,14 @@
      return new ChangeNumber(str);
    } catch (DatabaseException e)
    {
      try {
      cursor.close();
      try
      {
        cursor.close();
        dbCloseLock.readLock().unlock();
      }
      catch (DatabaseException dbe)
      {
        // The db is dead - let's only log.
      }
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
@@ -295,11 +324,13 @@
    try
    {
      dbCloseLock.readLock().lock();
      cursor = db.openCursor(null, null);
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
      cursor.close();
      dbCloseLock.readLock().unlock();
      if (status != OperationStatus.SUCCESS)
      {
        /* database is empty */
@@ -355,10 +386,14 @@
    private ReplServerDBCursor(ChangeNumber startingChangeNumber)
            throws Exception
    {
      cursor = db.openCursor(txn, null);
      try
      {
        // Take the lock. From now on, whatever error that happen in the life
        // of this cursor should end by unlocking that lock. We must also
        // unlock it when throwing an exception.
        dbCloseLock.readLock().lock();
        cursor = db.openCursor(txn, null);
        if (startingChangeNumber != null)
        {
          key = new ReplicationKey(startingChangeNumber);
@@ -367,20 +402,35 @@
          if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
            OperationStatus.SUCCESS)
          {
            // We could not move the cursor to the expected startingChangeNumber
            if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
              // Unlocking is required before throwing any exception
              dbCloseLock.readLock().unlock();
              throw new Exception("ChangeNumber not available");
            }
            else
            {
              // We can move close to the startingChangeNumber.
              // Let's create a cursor from that point.
              DatabaseEntry key = new DatabaseEntry();
              DatabaseEntry data = new DatabaseEntry();
              if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
                OperationStatus.SUCCESS)
              {
                cursor.close();
                cursor = db.openCursor(txn, null);
                try
                {
                  cursor.close();
                  cursor = db.openCursor(txn, null);
                }
                catch(Exception e)
                {
                  // Unlocking is required before throwing any exception
                  dbCloseLock.readLock().unlock();
                  throw(e);
                }
              }
            }
          }
@@ -388,6 +438,8 @@
      }
      catch (Exception e)
      {
        // Unlocking is required before throwing any exception
        dbCloseLock.readLock().unlock();
        cursor.close();
        throw (e);
      }
@@ -395,8 +447,18 @@
    private ReplServerDBCursor() throws DatabaseException
    {
      txn = dbenv.beginTransaction();
      cursor = db.openCursor(txn, null);
      try
      {
        // We'll go on only if no close or no clear is running
        dbCloseLock.readLock().lock();
        txn = dbenv.beginTransaction();
        cursor = db.openCursor(txn, null);
      }
      catch(DatabaseException e)
      {
        dbCloseLock.readLock().unlock();
        throw (e);
      }
    }
    /**
@@ -414,12 +476,15 @@
      }
      catch (DatabaseException e)
      {
        dbCloseLock.readLock().unlock();
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
      }
      if (txn != null)
      {
        try
@@ -434,6 +499,7 @@
          replicationServer.shutdown();
        }
      }
      dbCloseLock.readLock().unlock();
    }
    /**
@@ -460,6 +526,8 @@
      }
      catch (DatabaseException e)
      {
        dbCloseLock.readLock().unlock();
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
@@ -480,6 +548,7 @@
          replicationServer.shutdown();
        }
      }
      dbCloseLock.readLock().unlock();
    }
    /**
@@ -556,7 +625,7 @@
    {
      cursor.delete();
    }
  }
  } // ReplServerDBCursor
  /**
   * Clears this change DB from the changes it contains.
@@ -566,10 +635,38 @@
   */
  public void clear() throws Exception, DatabaseException
  {
    // Clears the changes
    dbenv.clearDb(this.toString());
    // The coming users will be blocked until the clear is done
    dbCloseLock.writeLock().lock();
    try
    {
      String dbName = db.getDatabaseName();
    // Clears the reference to this serverID
    dbenv.clearServerId(baseDn, serverId);
      // Clears the reference to this serverID
      dbenv.clearServerId(baseDn, serverId);
      // Closing is requested by the Berkeley DB before truncate
      db.close();
      // Clears the changes
      dbenv.clearDb(dbName);
      db = null;
      // RE-create the db
      db = dbenv.getOrAddDb(serverId, baseDn, (long)-1);
    }
    catch(Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_ERROR_CLEARING_DB.get(this.toString(),
          e.getMessage() + " " +
          stackTraceToSingleLineString(e)));
      logError(mb.toMessage());
    }
    finally
    {
      // Relax the waiting users
      dbCloseLock.writeLock().unlock();
    }
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -526,22 +526,31 @@
     */
    public final void clearDb(String databaseName)
    {
      Transaction txn = null;
      try
      {
        if (debugEnabled())
          TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              "clearDb" + databaseName);
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        txn = dbEnvironment.beginTransaction(null, null);
        dbEnvironment.truncateDatabase(txn, databaseName, false);
        txn.commitWriteNoSync();
        txn = null;
      }
      catch (DatabaseException dbe)
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLEARING_DB.get(databaseName,
            dbe.getLocalizedMessage()));
            e.getMessage() + " " +
            stackTraceToSingleLineString(e)));
        logError(mb.toMessage());
      }
      finally
      {
        try
        {
          if (txn != null)
            txn.abort();
        }
        catch(Exception e)
        {}
      }
    }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1196,9 +1196,11 @@
          catch (Exception e)
          {
            // TODO: i18n
            logError(Message.raw(
                "Exception caught while clearing dbHandler:" +
                e.getLocalizedMessage()));
            MessageBuilder mb = new MessageBuilder();
            mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
                e.getMessage() + " " +
                stackTraceToSingleLineString(e)));
            logError(mb.toMessage());
          }
        }
        sourceDbHandlers.clear();
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -54,6 +54,7 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
@@ -65,7 +66,9 @@
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationBackend;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.tasks.LdifFileWriter;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
@@ -73,6 +76,9 @@
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -688,6 +694,38 @@
    return addMsg;
  }
  /*
   * Check that the expected number of changes are in the replication
   * server database.
   */
  private void checkChangelogSize(int expectedCount)
  {
    try
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      InternalSearchOperation searchOperation =
        connection.processSearch(DN.decode("dc=replicationchanges"),
            SearchScope.SUBORDINATE_SUBTREE,
            filter);
      if (debugEnabled())
      {
        if (searchOperation.getSearchEntries().size() != expectedCount)
        {
          for (SearchResultEntry sre : searchOperation.getSearchEntries())
          {
            debugInfo("Entry found: " + sre.toLDIFString());
          }
        }
      }
      assertEquals(searchOperation.getSearchEntries().size(), expectedCount);
    }
    catch(Exception e)
    {
    }
  }
  /**
   * SingleRS tests basic features of generationID
   * with one single Replication Server.
@@ -711,6 +749,13 @@
      long genId;
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      // To search the replication server db later in these tests, we need
      // to attach the search backend to the replication server just created.
      Thread.sleep(500);
      ReplicationBackend b =
        (ReplicationBackend)DirectoryServer.getBackend("replicationChanges");
      b.setServer(replServer1);
      /*
       * Test  : empty replicated backend
@@ -824,6 +869,11 @@
      String ent1[] = { createEntry(UUID.randomUUID()) };
      this.addTestEntriesToDB(ent1);
      // Verify that the replication server does contain the change related
      // to this ADD.
      Thread.sleep(500);
      checkChangelogSize(1);
      try
      {
        ReplicationMessage msg = broker3.receive();
@@ -846,8 +896,14 @@
      debugInfo("Create again replServer1");
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      // To search the replication server db later in these tests, we need
      // to attach the search backend to the replication server just created.
      Thread.sleep(500);
      b = (ReplicationBackend)DirectoryServer.getBackend("replicationChanges");
      b.setServer(replServer1);
      debugInfo("Delay to allow DS to reconnect to replServer1");
      Thread.sleep(1000);
      long genIdAfterRestart = replServer1.getGenerationId(baseDn);
      debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart);
@@ -874,15 +930,10 @@
        fail("Broker connection is expected to be accepted.");
      }
      /*
       *
       * FIXME Should clearJEBackend() regenerate generationId and do a start
       *       against ReplicationServer ?
       */
      /*
       * Test: Reset the replication server in order to allow new data set.
       */
      // Also verify that no change occured on the replication server db
      // and still contain the ADD submitted initially.
      Thread.sleep(500);
      checkChangelogSize(1);
      debugInfo("Launch an on-line import on DS.");
      genId=-1;
@@ -905,7 +956,9 @@
      waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null);
      Thread.sleep(200);
      // TODO: Test that replication server db has been cleared
      // FIXME: Known bug : the replication server db should not have been
      // cleared in that case since it has the proper generation ID.
      checkChangelogSize(0);
      debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import.");
      genId = readGenId();
@@ -926,7 +979,6 @@
          isDegradedDueToGenerationId(server3ID),
      "Expecting that broker3 is degraded since domain genId has been reset");
      // Now create a change that normally would be replicated
      // but will not be replicated here since DS and brokers are degraded
      String[] ent3 = { createEntry(UUID.randomUUID()) };