mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.45.2013 9d5b1c7a628471604be4768f97fcdaf13cf0639f
OPENDJ-1116 Introduce abstraction for the changelog DB


ChangelogDB.java:
In removeDomain(), now throws ChangelogException.

JEChangelogDB.java:
In clearDB(), only clear the DB if the directory exists on disk.
In removeDomain() and shutdownDomain(), removed the entry associated to the baseDN key.
In removeDomain(), now throw ChangelogException + moved exception handling to ReplicationServerDomain.clearDbs().


ReplicationServerDomain.java:
In clearDbs(), moved the exception handling here from lower level JEChangelogDB.removeDomain().

ReplicationDbEnv.java:
Added allDbs and isShuttingDown fields to record all created Databases and mark when shutting down is in progress.
In openDatabase(), throw exceptions when the DB is shutting down.
In shutdown(), close all the opened DBs.
Added closeDB() and newErrorMessage().
In clearGenerationId(), clearServerId() and deleteFromChangelogStateDB(), now throw ChangelogException.
In clearDb(), changed parameter from String to Database.

DraftCNDB.java, ReplicationDB.java:
In clear(), rely more on ReplicationDbEnv.clearDb(Database).
In isDBClosed(), added more conditions.


ReplicationTestCase.java:
Removed useless call to cleanUpReplicationServersDB() before removeReplicationServerDB().
Added remove(Collection<ReplicationServer>) to factorize code between removeReplicationServerDB() and remove(ReplicationServer...).

ReplicationServerTest.java:
Removed all calls to clearChangelogDB() before calling changelogBasic(), because this is the first thing the latter method does.
Added getCSNFieldName() to have more explicit failures.
In windowProbeTest(), used HostPort.


ECLServerHandler.java:
Removed never thrown DirectoryException.
9 files modified
300 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 12 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java 4 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 46 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 150 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 21 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 41 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -1348,7 +1348,7 @@
   *           if a database problem occurs.
   */
  private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
      throws DirectoryException, ChangelogException
      throws ChangelogException
  {
    // We also need to check if the draftCNdb is consistent with
    // the changelogdb.
@@ -1451,7 +1451,7 @@
  }
  private void assignNewChangeNumberAndStore(ECLUpdateMsg change)
      throws DirectoryException, ChangelogException
      throws ChangelogException
  {
    ChangeNumberIndexDB cnIndexDB = replicationServer.getChangeNumberIndexDB();
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2202,7 +2202,17 @@
   */
  public void clearDbs()
  {
    changelogDB.removeDomain(baseDN);
    try
    {
      changelogDB.removeDomain(baseDN);
    }
    catch (ChangelogException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_ERROR_CLEARING_DB.get(baseDN.toString(), e.getMessage()
          + " " + stackTraceToSingleLineString(e)));
      logError(mb.toMessage());
    }
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -172,8 +172,10 @@
   *
   * @param baseDN
   *          the replication domain baseDN
   * @throws ChangelogException
   *           If a database problem happened
   */
  void removeDomain(DN baseDN);
  void removeDomain(DN baseDN) throws ChangelogException;
  // serverId methods
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -653,14 +653,9 @@
        return;
      }
      String dbName = db.getDatabaseName();
      // Closing is requested by the Berkeley DB before truncate
      db.close();
      final Database oldDb = db;
      db = null; // In case there's a failure between here and recreation.
      // Clears the changes
      dbenv.clearDb(dbName);
      dbenv.clearDb(oldDb);
      // RE-create the db
      db = dbenv.getOrCreateDraftCNDb();
@@ -687,6 +682,6 @@
   */
  private boolean isDBClosed()
  {
    return db == null;
    return db == null || !db.getEnvironment().isValid();
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -33,7 +33,6 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -43,13 +42,11 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.Pair;
import org.opends.server.util.StaticUtils;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -58,9 +55,6 @@
public class JEChangelogDB implements ChangelogDB
{
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
   * This map contains the List of updates received from each LDAP server.
   */
@@ -313,6 +307,11 @@
   */
  public void clearDB() throws ChangelogException
  {
    if (!dbDirectory.exists())
    {
      return;
    }
    // Remember the first exception because :
    // - we want to try to remove everything we want to remove
    // - then throw the first encountered exception
@@ -402,6 +401,7 @@
  public void shutdownDomain(DN baseDN)
  {
    shutdownDbHandlers(getDomainMap(baseDN));
    sourceDbHandlers.remove(baseDN);
  }
  private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap)
@@ -446,8 +446,13 @@
  /** {@inheritDoc} */
  @Override
  public void removeDomain(DN baseDN)
  public void removeDomain(DN baseDN) throws ChangelogException
  {
    // Remember the first exception because :
    // - we want to try to remove everything we want to remove
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    // 1- clear the replica DBs
    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
    synchronized (domainMap)
@@ -458,17 +463,13 @@
        {
          dbHandler.clear();
        }
        catch (Exception e)
        catch (ChangelogException e)
        {
          // TODO: i18n
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e
              .getMessage()
              + " " + stackTraceToSingleLineString(e)));
          logError(mb.toMessage());
          firstException = e;
        }
      }
      shutdownDbHandlers(domainMap);
      sourceDbHandlers.remove(baseDN);
    }
    // 2- clear the ChangeNumber index DB
@@ -480,11 +481,11 @@
        {
          cnIndexDB.clear(baseDN);
        }
        catch (Exception ignored)
        catch (ChangelogException e)
        {
          if (debugEnabled())
          if (firstException == null)
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
            firstException = e;
          }
        }
      }
@@ -495,13 +496,18 @@
    {
      dbEnv.clearGenerationId(baseDN);
    }
    catch (Exception ignored)
    catch (ChangelogException e)
    {
      if (debugEnabled())
      if (firstException == null)
      {
        TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
        firstException = e;
      }
    }
    if (firstException != null)
    {
      throw firstException;
    }
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -874,17 +874,12 @@
        return;
      }
      String dbName = db.getDatabaseName();
      // Clears the reference to this serverID
      dbenv.clearServerId(baseDN, serverId);
      // Closing is requested by the Berkeley DB before truncate
      db.close();
      final Database oldDb = db;
      db = null; // In case there's a failure between here and recreation.
      // Clears the changes
      dbenv.clearDb(dbName);
      dbenv.clearDb(oldDb);
      // RE-create the db
      db = dbenv.getOrAddDb(serverId, baseDN, -1);
@@ -1190,7 +1185,7 @@
   */
  private boolean isDBClosed()
  {
    return db == null;
    return db == null || !db.getEnvironment().isValid();
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -29,7 +29,10 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -41,7 +44,9 @@
import org.opends.server.types.DirectoryException;
import com.sleepycat.je.*;
import com.sleepycat.je.config.EnvironmentParams;
import static com.sleepycat.je.EnvironmentConfig.*;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
@@ -51,19 +56,19 @@
import static org.opends.server.util.StaticUtils.*;
/**
 * This class is used to represent a Db environment that can be used
 * to create ReplicationDB.
 * This class represents a DB environment that acts as a factory for
 * ReplicationDBs.
 */
public class ReplicationDbEnv
{
  private Environment dbEnvironment;
  private Database changelogStateDb;
  private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
  private ReplicationServer replicationServer;
  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
  private static final String GENERATION_ID_TAG = "GENID";
  private static final String FIELD_SEPARATOR = " ";
  /**
   * The tracer object for the debug logger.
   */
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
@@ -92,9 +97,9 @@
       */
      envConfig.setAllowCreate(true);
      envConfig.setTransactional(true);
      envConfig.setConfigParam("je.cleaner.threads", "2");
      envConfig.setConfigParam("je.checkpointer.highPriority", "true");
      envConfig.setConfigParam(STATS_COLLECT, "false");
      envConfig.setConfigParam(CLEANER_THREADS, "2");
      envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
      /*
       * Tests have shown that since the parsing of the Replication log is
       * always done sequentially, it is not necessary to use a large DB cache.
@@ -106,15 +111,15 @@
         * read buffers. This will result in more scalable checkpointer and
         * cleaner performance.
         */
        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2));
        envConfig.setConfigParam("je.log.iteratorReadSize", mb(2));
        envConfig.setConfigParam("je.log.faultReadSize", kb(4));
        envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
        envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
        envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
        /*
         * The cache size must be bigger in order to accommodate the larger
         * buffers - see OPENDJ-943.
         */
        envConfig.setConfigParam("je.maxMemory", mb(16));
        envConfig.setConfigParam(MAX_MEMORY, mb(16));
      }
      else
      {
@@ -122,7 +127,7 @@
         * Use 5M so that the replication can be used with 64M total for the
         * JVM.
         */
        envConfig.setConfigParam("je.maxMemory", mb(5));
        envConfig.setConfigParam(MAX_MEMORY, mb(5));
      }
      // Since records are always added at the end of the Replication log and
@@ -160,12 +165,27 @@
    return String.valueOf(sizeInMb * 1024 * 1024);
  }
  private Database openDatabase(String databaseName) throws RuntimeException
  private Database openDatabase(String databaseName) throws ChangelogException,
      RuntimeException
  {
    if (isShuttingDown.get())
    {
      // TODO JNR i18n
      throw new ChangelogException(Message.raw("DB is closing"));
    }
    final DatabaseConfig dbConfig = new DatabaseConfig();
    dbConfig.setAllowCreate(true);
    dbConfig.setTransactional(true);
    return dbEnvironment.openDatabase(null, databaseName, dbConfig);
    final Database db =
        dbEnvironment.openDatabase(null, databaseName, dbConfig);
    if (isShuttingDown.get())
    {
      closeDB(db);
      // TODO JNR i18n
      throw new ChangelogException(Message.raw("DB is closing"));
    }
    allDbs.add(db);
    return db;
  }
  /**
@@ -395,44 +415,69 @@
      }
    }
    /**
     * Shutdown the Db environment.
     */
    public void shutdown()
  /**
   * Shutdown the Db environment.
   */
  public void shutdown()
  {
    isShuttingDown.set(true);
    // CopyOnWriteArrayList iterator never throw ConcurrentModificationException
    // This code rely on openDatabase() to close databases opened concurrently
    // with this code
    final Database[] allDbsCopy = allDbs.toArray(new Database[0]);
    allDbs.clear();
    for (Database db : allDbsCopy)
    {
      try
      {
        changelogStateDb.close();
      } catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
      try
      {
        dbEnvironment.close();
      } catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
      closeDB(db);
    }
    try
    {
      dbEnvironment.close();
    }
    catch (DatabaseException e)
    {
      logError(newErrorMessage(e));
    }
  }
  private void closeDB(Database db)
  {
    allDbs.remove(db);
    try
    {
      db.close();
    }
    catch (DatabaseException e)
    {
      logError(newErrorMessage(e));
    }
  }
  private Message newErrorMessage(DatabaseException e)
  {
    if (!isShuttingDown.get())
    {
      return NOTE_EXCEPTION_CLOSING_DATABASE
          .get(stackTraceToSingleLineString(e));
    }
    MessageBuilder mb = new MessageBuilder();
    mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
    mb.append(" ");
    mb.append(stackTraceToSingleLineString(e));
    return mb.toMessage();
  }
  /**
   * Clears the provided generationId associated to the provided baseDN from the
   * state Db.
   *
   * @param baseDN
   *          The baseDN for which the generationID must be cleared.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void clearGenerationId(DN baseDN)
  public void clearGenerationId(DN baseDN) throws ChangelogException
  {
    deleteFromChangelogStateDB(buildGenIdKey(baseDN),
        "clearGenerationId(baseDN=" + baseDN + ")");
@@ -446,15 +491,17 @@
   *          The baseDN for which the serverId must be cleared.
   * @param serverId
   *          The serverId to remove from the Db.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void clearServerId(DN baseDN, int serverId)
  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId),
        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
  }
  private void deleteFromChangelogStateDB(String keyString,
      String methodInvocation)
      String methodInvocation) throws ChangelogException
  {
    if (debugEnabled())
      debug(methodInvocation + " starting");
@@ -490,18 +537,23 @@
    }
    catch (RuntimeException dbe)
    {
      // FIXME can actually happen (see catch above)
      // what should we do about it?
      throw new ChangelogException(dbe);
    }
  }
    /**
     * Clears the database.
     *
     * @param databaseName The name of the database to clear.
     * @param db
     *          The database to clear.
     */
    public final void clearDb(String databaseName)
    public final void clearDb(Database db)
    {
      String databaseName = db.getDatabaseName();
      // Closing is requested by Berkeley JE before truncate
      db.close();
      Transaction txn = null;
      try
      {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -28,10 +28,7 @@
package org.opends.server.replication;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.*;
import java.util.concurrent.locks.Lock;
import org.opends.messages.Category;
@@ -406,9 +403,6 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
      " ##### Calling ReplicationTestCase.classCleanUp ##### "));
    // Clean RS databases
    cleanUpReplicationServersDB();
    removeReplicationServerDB();
    cleanConfigEntries();
@@ -487,15 +481,18 @@
   */
  protected void removeReplicationServerDB() throws Exception
  {
    for (ReplicationServer rs : ReplicationServer.getAllInstances())
    {
      clearChangelogDB(rs);
      rs.getChangelogDB().removeDB();
    }
    // avoid ConcurrentModificationException
    remove(new ArrayList<ReplicationServer>(ReplicationServer.getAllInstances()));
  }
  protected void remove(ReplicationServer... replicationServers) throws Exception
  {
    remove(Arrays.asList(replicationServers));
  }
  protected void remove(Collection<ReplicationServer> replicationServers)
      throws Exception
  {
    for (ReplicationServer rs : replicationServers)
    {
      if (rs != null)
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -29,7 +29,6 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -168,7 +167,6 @@
  @Test(enabled=true, dependsOnMethods = { "searchBackend"})
  public void replicationServerTest() throws Exception
  {
    clearChangelogDB(replicationServer);
    changelogBasic();
    newClientLateServer1();
    newClient();
@@ -192,7 +190,6 @@
  @Test(enabled=false, dependsOnMethods = { "searchBackend"})
  public void replicationServerTestLoop() throws Exception
  {
    clearChangelogDB(replicationServer);
    changelogBasic();
    while (true)
    {
@@ -286,7 +283,36 @@
        "ReplicationServer basic : incorrect message type received: "
            + receivedMsg.getClass() + ": content: " + receivedMsg);
    assertEquals(receivedMsg.toString(), sentMsg.toString(),
        "ReplicationServer basic : incorrect message body received.");
        "ReplicationServer basic : incorrect message body received. CSN is same as \""
            + getCSNFieldName(((DeleteMsg) receivedMsg).getCSN()) + "\" field.");
  }
  private String getCSNFieldName(CSN csn)
  {
    if (csn == null) {
      return "";
    }
    if (csn.equals(firstCSNServer1))
    {
      return "firstCSNServer1";
    }
    else if (csn.equals(secondCSNServer1))
    {
      return "secondCSNServer1";
    }
    else if (csn.equals(firstCSNServer2))
    {
      return "firstCSNServer2";
    }
    else if (csn.equals(secondCSNServer2))
    {
      return "secondCSNServer2";
    }
    else if (csn.equals(unknownCSNServer1))
    {
      return "unknownCSNServer1";
    }
    return null;
  }
  private ServerState newServerState(CSN... csns)
@@ -855,7 +881,6 @@
  @Test(enabled=true, dependsOnMethods = { "searchBackend"})
  public void windowProbeTest() throws Exception
  {
    debugInfo("Starting windowProbeTest");
    final int WINDOW = 10;
@@ -877,13 +902,13 @@
     */
    // open the first session to the replication server
    InetSocketAddress ServerAddr = new InetSocketAddress(
        InetAddress.getByName("localhost"), replicationServerPort);
    InetSocketAddress serverAddr =
        new HostPort("localhost", replicationServerPort).toInetSocketAddress();
    Socket socket = new Socket();
    socket.setReceiveBufferSize(1000000);
    socket.setTcpNoDelay(true);
    int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
    socket.connect(ServerAddr, timeoutMS);
    socket.connect(serverAddr, timeoutMS);
    ReplSessionSecurity replSessionSecurity = getReplSessionSecurity();
    Session session = replSessionSecurity.createClientSession(socket, timeoutMS);