mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
19.33.2008 111c22a17e3ab4bdca9052891a810f5ab7cea6b6
The following changes fix a bug in the clearing of the replication server db that make the clearing sometimes fail silently .
Particularly Berkeley DB requires to close the db and any reference to the db handle released before to truncate the db.
That requires to lock the db when it is closed/cleared with a limited impact on the performances in the other cases.
A RW lock is added on the db : every thread using the db takes/releases the READ lock before /after usage. That still allow these threads to run concurrently and prevent a big impact on performances. Every thread closing the db
(shutdown or clear) takes/releases the WRITE lock before/after the closure.

The changes also include a fix on the search scope processing in the replication backend.

Test:
------
In addition, the generation ID unit test has been improved with some check on the replication server db content that go through these 2 fixes. Successfully ran nightly build.

5 files modified
274 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java 52 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDB.java 113 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java 25 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 8 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 76 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -136,7 +136,7 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private static final String EXPORT_BASE_DN = "dc=replicationChanges";
  private static final String BASE_DN = "dc=replicationchanges";
  // The base DNs for this backend.
  private DN[] baseDNs;
@@ -545,7 +545,7 @@
        ReplicationServerDomain rc = rsdi.next();
        // Skip containers that are not covered by the include branches.
        baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN);
        baseDN = DN.decode(rc.getBaseDn().toString() + "," + BASE_DN);
        if (includeBranches == null || includeBranches.isEmpty())
        {
@@ -662,7 +662,7 @@
    try
    {
      AddChangeRecordEntry changeRecord =
        new AddChangeRecordEntry(DN.decode(EXPORT_BASE_DN),
        new AddChangeRecordEntry(DN.decode(BASE_DN),
                               attributes);
      ldifWriter.writeChangeRecord(changeRecord);
    }
@@ -706,7 +706,7 @@
      {
        AddChangeRecordEntry changeRecord =
          new AddChangeRecordEntry(DN.decode(
              exportContainer.getBaseDn() + "," + EXPORT_BASE_DN),
              exportContainer.getBaseDn() + "," + BASE_DN),
              attributes);
        ldifWriter.writeChangeRecord(changeRecord);
      }
@@ -717,7 +717,7 @@
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        Message message = ERR_BACKEND_EXPORT_ENTRY.get(
            exportContainer.getBaseDn() + "," + EXPORT_BASE_DN,
            exportContainer.getBaseDn() + "," + BASE_DN,
            String.valueOf(e));
        logError(message);
      }
@@ -788,7 +788,7 @@
        dn = DN.decode("puid=" + addMsg.getParentUid() + "," +
            "changeNumber=" + msg.getChangeNumber().toString() + "," +
            msg.getDn() +","+ "dc=replicationChanges");
            msg.getDn() +","+ BASE_DN);
        Map<AttributeType,List<Attribute>> attributes =
          new HashMap<AttributeType,List<Attribute>>();
@@ -831,7 +831,7 @@
        dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
            "changeNumber=" + delMsg.getChangeNumber().toString()+ "," +
            msg.getDn() +","+ "dc=replicationChanges");
            msg.getDn() +","+ BASE_DN);
        DeleteChangeRecordEntry changeRecord =
          new DeleteChangeRecordEntry(dn);
@@ -854,7 +854,7 @@
        dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
            "changeNumber=" + msg.getChangeNumber().toString()+ "," +
            msg.getDn() +","+ "dc=replicationChanges");
            msg.getDn() +","+ BASE_DN);
        op.setInternalOperation(true);
        ModifyChangeRecordEntry changeRecord =
@@ -878,7 +878,7 @@
        dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
            "changeNumber=" + msg.getChangeNumber().toString()+ "," +
            msg.getDn() +","+ "dc=replicationChanges");
            msg.getDn() +","+ BASE_DN);
        op.setInternalOperation(true);
        ModifyDNChangeRecordEntry changeRecord =
@@ -911,8 +911,9 @@
        SearchScope  scope  = searchOperation.getScope();
        SearchFilter filter = searchOperation.getFilter();
        if (entry.matchesBaseAndScope(searchBaseDN, scope) &&
            filter.matchesEntry(entry))
        boolean ms = entry.matchesBaseAndScope(searchBaseDN, scope);
        boolean mf = filter.matchesEntry(entry);
        if ( ms && mf )
        {
          searchOperation.returnEntry(entry, new LinkedList<Control>());
        }
@@ -1188,10 +1189,17 @@
      {
        if (baseDNSet.contains(searchBaseDN))
        {
          searchOperation.returnEntry(
              new Entry(searchBaseDN, rootObjectclasses, attributes,
                  operationalAttributes),
                  new LinkedList<Control>());
          // Get the base DN, scope, and filter for the search.
          SearchScope  scope  = searchOperation.getScope();
          SearchFilter filter = searchOperation.getFilter();
          Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes,
              operationalAttributes);
          if (re.matchesBaseAndScope(searchBaseDN, scope) &&
              filter.matchesEntry(re))
          {
            searchOperation.returnEntry(re, new LinkedList<Control>());
          }
          return;
        }
        else
@@ -1204,6 +1212,18 @@
      }
    }
    // Get the base DN, scope, and filter for the search.
    SearchScope  scope  = searchOperation.getScope();
    SearchFilter filter = searchOperation.getFilter();
    Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes,
        operationalAttributes);
    if (re.matchesBaseAndScope(searchBaseDN, scope) &&
        filter.matchesEntry(re))
    {
      searchOperation.returnEntry(re, new LinkedList<Control>());
    }
    // Walk through all entries and send the ones that match.
    Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator();
    if (rsdi != null)
@@ -1213,7 +1233,7 @@
        ReplicationServerDomain rsd = rsdi.next();
        // Skip containers that are not covered by the include branches.
        baseDN = DN.decode(rsd.getBaseDn().toString() + "," + EXPORT_BASE_DN);
        baseDN = DN.decode(rsd.getBaseDn().toString() + "," + BASE_DN);
            if (searchBaseDN.isDescendantOf(baseDN) ||
                searchBaseDN.isAncestorOf(baseDN))
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -37,6 +37,7 @@
import org.opends.server.types.DN;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMessage;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.sleepycat.je.Cursor;
import com.sleepycat.je.DatabaseEntry;
@@ -63,6 +64,10 @@
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  // The lock used to provide exclusive access to the thread that
  // close the db (shutdown or clear).
  private ReentrantReadWriteLock dbCloseLock;
  /**
   * Creates a new database or open existing database that will be used
   * to store and retrieve changes from an LDAP server.
@@ -86,6 +91,8 @@
    db = dbenv.getOrAddDb(serverId, baseDn,
        replicationServer.getReplicationServerDomain(baseDn,
        true).getGenerationId());
    dbCloseLock = new ReentrantReadWriteLock(true);
  }
  /**
@@ -108,6 +115,7 @@
      // the operation is attempted again up to DEADLOCK_RETRIES times.
      while ((tries++ < DEADLOCK_RETRIES) && (!done))
      {
        dbCloseLock.readLock().lock();
        try
        {
          txn = dbenv.beginTransaction();
@@ -128,6 +136,10 @@
          txn.abort();
          txn = null;
        }
        finally
        {
          dbCloseLock.readLock().unlock();
        }
      }
      if (!done)
      {
@@ -190,8 +202,17 @@
  {
    try
    {
      dbCloseLock.writeLock().lock();
      try
      {
      db.close();
    } catch (DatabaseException e)
      }
      finally
      {
        dbCloseLock.writeLock().unlock();
      }
    }
    catch (DatabaseException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(NOTE_EXCEPTION_CLOSING_DATABASE.get(this.toString()));
@@ -223,6 +244,7 @@
   * @throws DatabaseException If a database error prevented the cursor
   *                           creation.
   * @throws Exception if the ReplServerDBCursor creation failed.
   *
   * @return The ReplServerDBCursor.
   */
  public ReplServerDBCursor openDeleteCursor()
@@ -242,9 +264,12 @@
    try
    {
      dbCloseLock.readLock().lock();
        cursor = db.openCursor(null, null);
    } catch (DatabaseException e1)
    }
    catch (DatabaseException e1)
    {
      dbCloseLock.readLock().unlock();
        return null;
    }
    try
@@ -253,6 +278,7 @@
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      cursor.close();
      dbCloseLock.readLock().unlock();
      if (status != OperationStatus.SUCCESS)
      {
        /* database is empty */
@@ -268,11 +294,14 @@
      return new ChangeNumber(str);
    } catch (DatabaseException e)
    {
      try {
      try
      {
      cursor.close();
        dbCloseLock.readLock().unlock();
      }
      catch (DatabaseException dbe)
      {
        // The db is dead - let's only log.
      }
      /* database is faulty */
      MessageBuilder mb = new MessageBuilder();
@@ -295,11 +324,13 @@
    try
    {
      dbCloseLock.readLock().lock();
      cursor = db.openCursor(null, null);
      DatabaseEntry key = new DatabaseEntry();
      DatabaseEntry data = new DatabaseEntry();
      OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
      cursor.close();
      dbCloseLock.readLock().unlock();
      if (status != OperationStatus.SUCCESS)
      {
        /* database is empty */
@@ -355,10 +386,14 @@
    private ReplServerDBCursor(ChangeNumber startingChangeNumber)
            throws Exception
    {
      cursor = db.openCursor(txn, null);
      try
      {
        // Take the lock. From now on, whatever error that happen in the life
        // of this cursor should end by unlocking that lock. We must also
        // unlock it when throwing an exception.
        dbCloseLock.readLock().lock();
        cursor = db.openCursor(txn, null);
        if (startingChangeNumber != null)
        {
          key = new ReplicationKey(startingChangeNumber);
@@ -367,27 +402,44 @@
          if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
            OperationStatus.SUCCESS)
          {
            // We could not move the cursor to the expected startingChangeNumber
            if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
              OperationStatus.SUCCESS)
            {
              // We could not even move the cursor closed to it => failure
              // Unlocking is required before throwing any exception
              dbCloseLock.readLock().unlock();
              throw new Exception("ChangeNumber not available");
            }
            else
            {
              // We can move close to the startingChangeNumber.
              // Let's create a cursor from that point.
              DatabaseEntry key = new DatabaseEntry();
              DatabaseEntry data = new DatabaseEntry();
              if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
                OperationStatus.SUCCESS)
              {
                try
                {
                cursor.close();
                cursor = db.openCursor(txn, null);
              }
                catch(Exception e)
                {
                  // Unlocking is required before throwing any exception
                  dbCloseLock.readLock().unlock();
                  throw(e);
                }
              }
            }
          }
        }
      }
      catch (Exception e)
      {
        // Unlocking is required before throwing any exception
        dbCloseLock.readLock().unlock();
        cursor.close();
        throw (e);
      }
@@ -395,9 +447,19 @@
    private ReplServerDBCursor() throws DatabaseException
    {
      try
      {
        // We'll go on only if no close or no clear is running
        dbCloseLock.readLock().lock();
      txn = dbenv.beginTransaction();
      cursor = db.openCursor(txn, null);
    }
      catch(DatabaseException e)
      {
        dbCloseLock.readLock().unlock();
        throw (e);
      }
    }
    /**
     * Close the ReplicationServer Cursor.
@@ -414,12 +476,15 @@
      }
      catch (DatabaseException e)
      {
        dbCloseLock.readLock().unlock();
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
        replicationServer.shutdown();
      }
      if (txn != null)
      {
        try
@@ -434,6 +499,7 @@
          replicationServer.shutdown();
        }
      }
      dbCloseLock.readLock().unlock();
    }
    /**
@@ -460,6 +526,8 @@
      }
      catch (DatabaseException e)
      {
        dbCloseLock.readLock().unlock();
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
        mb.append(stackTraceToSingleLineString(e));
@@ -480,6 +548,7 @@
          replicationServer.shutdown();
        }
      }
      dbCloseLock.readLock().unlock();
    }
    /**
@@ -556,7 +625,7 @@
    {
      cursor.delete();
    }
  }
  } // ReplServerDBCursor
  /**
   * Clears this change DB from the changes it contains.
@@ -566,10 +635,38 @@
   */
  public void clear() throws Exception, DatabaseException
  {
    // Clears the changes
    dbenv.clearDb(this.toString());
    // The coming users will be blocked until the clear is done
    dbCloseLock.writeLock().lock();
    try
    {
      String dbName = db.getDatabaseName();
    // Clears the reference to this serverID
    dbenv.clearServerId(baseDn, serverId);
      // Closing is requested by the Berkeley DB before truncate
      db.close();
      // Clears the changes
      dbenv.clearDb(dbName);
      db = null;
      // RE-create the db
      db = dbenv.getOrAddDb(serverId, baseDn, (long)-1);
    }
    catch(Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_ERROR_CLEARING_DB.get(this.toString(),
          e.getMessage() + " " +
          stackTraceToSingleLineString(e)));
      logError(mb.toMessage());
    }
    finally
    {
      // Relax the waiting users
      dbCloseLock.writeLock().unlock();
    }
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -526,22 +526,31 @@
     */
    public final void clearDb(String databaseName)
    {
      Transaction txn = null;
      try
      {
        if (debugEnabled())
          TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              "clearDb" + databaseName);
        Transaction txn = dbEnvironment.beginTransaction(null, null);
        txn = dbEnvironment.beginTransaction(null, null);
        dbEnvironment.truncateDatabase(txn, databaseName, false);
        txn.commitWriteNoSync();
        txn = null;
      }
      catch (DatabaseException dbe)
      catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLEARING_DB.get(databaseName,
            dbe.getLocalizedMessage()));
            e.getMessage() + " " +
            stackTraceToSingleLineString(e)));
        logError(mb.toMessage());
      }
      finally
      {
        try
        {
          if (txn != null)
            txn.abort();
        }
        catch(Exception e)
        {}
      }
    }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1196,9 +1196,11 @@
          catch (Exception e)
          {
            // TODO: i18n
            logError(Message.raw(
                "Exception caught while clearing dbHandler:" +
                e.getLocalizedMessage()));
            MessageBuilder mb = new MessageBuilder();
            mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
                e.getMessage() + " " +
                stackTraceToSingleLineString(e)));
            logError(mb.toMessage());
          }
        }
        sourceDbHandlers.clear();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -54,6 +54,7 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
@@ -65,7 +66,9 @@
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationBackend;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.tasks.LdifFileWriter;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
@@ -73,6 +76,9 @@
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -688,6 +694,38 @@
    return addMsg;
  }
  /*
   * Check that the expected number of changes are in the replication
   * server database.
   */
  private void checkChangelogSize(int expectedCount)
  {
    try
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      InternalSearchOperation searchOperation =
        connection.processSearch(DN.decode("dc=replicationchanges"),
            SearchScope.SUBORDINATE_SUBTREE,
            filter);
      if (debugEnabled())
      {
        if (searchOperation.getSearchEntries().size() != expectedCount)
        {
          for (SearchResultEntry sre : searchOperation.getSearchEntries())
          {
            debugInfo("Entry found: " + sre.toLDIFString());
          }
        }
      }
      assertEquals(searchOperation.getSearchEntries().size(), expectedCount);
    }
    catch(Exception e)
    {
    }
  }
  /**
   * SingleRS tests basic features of generationID
   * with one single Replication Server.
@@ -712,6 +750,13 @@
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      // To search the replication server db later in these tests, we need
      // to attach the search backend to the replication server just created.
      Thread.sleep(500);
      ReplicationBackend b =
        (ReplicationBackend)DirectoryServer.getBackend("replicationChanges");
      b.setServer(replServer1);
      /*
       * Test  : empty replicated backend
       * Check : nothing is broken - no generationId generated
@@ -824,6 +869,11 @@
      String ent1[] = { createEntry(UUID.randomUUID()) };
      this.addTestEntriesToDB(ent1);
      // Verify that the replication server does contain the change related
      // to this ADD.
      Thread.sleep(500);
      checkChangelogSize(1);
      try
      {
        ReplicationMessage msg = broker3.receive();
@@ -846,8 +896,14 @@
      debugInfo("Create again replServer1");
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      // To search the replication server db later in these tests, we need
      // to attach the search backend to the replication server just created.
      Thread.sleep(500);
      b = (ReplicationBackend)DirectoryServer.getBackend("replicationChanges");
      b.setServer(replServer1);
      debugInfo("Delay to allow DS to reconnect to replServer1");
      Thread.sleep(1000);
      long genIdAfterRestart = replServer1.getGenerationId(baseDn);
      debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart);
@@ -874,15 +930,10 @@
        fail("Broker connection is expected to be accepted.");
      }
      /*
       *
       * FIXME Should clearJEBackend() regenerate generationId and do a start
       *       against ReplicationServer ?
       */
      /*
       * Test: Reset the replication server in order to allow new data set.
       */
      // Also verify that no change occured on the replication server db
      // and still contain the ADD submitted initially.
      Thread.sleep(500);
      checkChangelogSize(1);
      debugInfo("Launch an on-line import on DS.");
      genId=-1;
@@ -905,7 +956,9 @@
      waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null);
      Thread.sleep(200);
      // TODO: Test that replication server db has been cleared
      // FIXME: Known bug : the replication server db should not have been
      // cleared in that case since it has the proper generation ID.
      checkChangelogSize(0);
      debugInfo("Expect new genId to be computed on DS and sent to all replServers after on-line import.");
      genId = readGenId();
@@ -926,7 +979,6 @@
          isDegradedDueToGenerationId(server3ID),
      "Expecting that broker3 is degraded since domain genId has been reset");
      // Now create a change that normally would be replicated
      // but will not be replicated here since DS and brokers are degraded
      String[] ent3 = { createEntry(UUID.randomUUID()) };