mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.45.2013 a8183183afe0b7df26182390a6fa4c5c9f5604e0
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -1348,7 +1348,7 @@
   *           if a database problem occurs.
   */
  private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
      throws DirectoryException, ChangelogException
      throws ChangelogException
  {
    // We also need to check if the draftCNdb is consistent with
    // the changelogdb.
@@ -1451,7 +1451,7 @@
  }
  private void assignNewChangeNumberAndStore(ECLUpdateMsg change)
      throws DirectoryException, ChangelogException
      throws ChangelogException
  {
    ChangeNumberIndexDB cnIndexDB = replicationServer.getChangeNumberIndexDB();
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2202,7 +2202,17 @@
   */
  public void clearDbs()
  {
    changelogDB.removeDomain(baseDN);
    try
    {
      changelogDB.removeDomain(baseDN);
    }
    catch (ChangelogException e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_ERROR_CLEARING_DB.get(baseDN.toString(), e.getMessage()
          + " " + stackTraceToSingleLineString(e)));
      logError(mb.toMessage());
    }
  }
  /**
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -172,8 +172,10 @@
   *
   * @param baseDN
   *          the replication domain baseDN
   * @throws ChangelogException
   *           If a database problem happened
   */
  void removeDomain(DN baseDN);
  void removeDomain(DN baseDN) throws ChangelogException;
  // serverId methods
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -653,14 +653,9 @@
        return;
      }
      String dbName = db.getDatabaseName();
      // Closing is requested by the Berkeley DB before truncate
      db.close();
      final Database oldDb = db;
      db = null; // In case there's a failure between here and recreation.
      // Clears the changes
      dbenv.clearDb(dbName);
      dbenv.clearDb(oldDb);
      // RE-create the db
      db = dbenv.getOrCreateDraftCNDb();
@@ -687,6 +682,6 @@
   */
  private boolean isDBClosed()
  {
    return db == null;
    return db == null || !db.getEnvironment().isValid();
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -33,7 +33,6 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -43,13 +42,11 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicaDBCursor;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.Pair;
import org.opends.server.util.StaticUtils;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -58,9 +55,6 @@
public class JEChangelogDB implements ChangelogDB
{
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
   * This map contains the List of updates received from each LDAP server.
   */
@@ -313,6 +307,11 @@
   */
  public void clearDB() throws ChangelogException
  {
    if (!dbDirectory.exists())
    {
      return;
    }
    // Remember the first exception because :
    // - we want to try to remove everything we want to remove
    // - then throw the first encountered exception
@@ -402,6 +401,7 @@
  public void shutdownDomain(DN baseDN)
  {
    shutdownDbHandlers(getDomainMap(baseDN));
    sourceDbHandlers.remove(baseDN);
  }
  private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap)
@@ -446,8 +446,13 @@
  /** {@inheritDoc} */
  @Override
  public void removeDomain(DN baseDN)
  public void removeDomain(DN baseDN) throws ChangelogException
  {
    // Remember the first exception because :
    // - we want to try to remove everything we want to remove
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    // 1- clear the replica DBs
    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
    synchronized (domainMap)
@@ -458,17 +463,13 @@
        {
          dbHandler.clear();
        }
        catch (Exception e)
        catch (ChangelogException e)
        {
          // TODO: i18n
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e
              .getMessage()
              + " " + stackTraceToSingleLineString(e)));
          logError(mb.toMessage());
          firstException = e;
        }
      }
      shutdownDbHandlers(domainMap);
      sourceDbHandlers.remove(baseDN);
    }
    // 2- clear the ChangeNumber index DB
@@ -480,11 +481,11 @@
        {
          cnIndexDB.clear(baseDN);
        }
        catch (Exception ignored)
        catch (ChangelogException e)
        {
          if (debugEnabled())
          if (firstException == null)
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
            firstException = e;
          }
        }
      }
@@ -495,13 +496,18 @@
    {
      dbEnv.clearGenerationId(baseDN);
    }
    catch (Exception ignored)
    catch (ChangelogException e)
    {
      if (debugEnabled())
      if (firstException == null)
      {
        TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
        firstException = e;
      }
    }
    if (firstException != null)
    {
      throw firstException;
    }
  }
  /** {@inheritDoc} */
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -874,17 +874,12 @@
        return;
      }
      String dbName = db.getDatabaseName();
      // Clears the reference to this serverID
      dbenv.clearServerId(baseDN, serverId);
      // Closing is requested by the Berkeley DB before truncate
      db.close();
      final Database oldDb = db;
      db = null; // In case there's a failure between here and recreation.
      // Clears the changes
      dbenv.clearDb(dbName);
      dbenv.clearDb(oldDb);
      // RE-create the db
      db = dbenv.getOrAddDb(serverId, baseDN, -1);
@@ -1190,7 +1185,7 @@
   */
  private boolean isDBClosed()
  {
    return db == null;
    return db == null || !db.getEnvironment().isValid();
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -29,7 +29,10 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -41,7 +44,9 @@
import org.opends.server.types.DirectoryException;
import com.sleepycat.je.*;
import com.sleepycat.je.config.EnvironmentParams;
import static com.sleepycat.je.EnvironmentConfig.*;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
@@ -51,19 +56,19 @@
import static org.opends.server.util.StaticUtils.*;
/**
 * This class is used to represent a Db environment that can be used
 * to create ReplicationDB.
 * This class represents a DB environment that acts as a factory for
 * ReplicationDBs.
 */
public class ReplicationDbEnv
{
  private Environment dbEnvironment;
  private Database changelogStateDb;
  private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
  private ReplicationServer replicationServer;
  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
  private static final String GENERATION_ID_TAG = "GENID";
  private static final String FIELD_SEPARATOR = " ";
  /**
   * The tracer object for the debug logger.
   */
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  /**
@@ -92,9 +97,9 @@
       */
      envConfig.setAllowCreate(true);
      envConfig.setTransactional(true);
      envConfig.setConfigParam("je.cleaner.threads", "2");
      envConfig.setConfigParam("je.checkpointer.highPriority", "true");
      envConfig.setConfigParam(STATS_COLLECT, "false");
      envConfig.setConfigParam(CLEANER_THREADS, "2");
      envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
      /*
       * Tests have shown that since the parsing of the Replication log is
       * always done sequentially, it is not necessary to use a large DB cache.
@@ -106,15 +111,15 @@
         * read buffers. This will result in more scalable checkpointer and
         * cleaner performance.
         */
        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2));
        envConfig.setConfigParam("je.log.iteratorReadSize", mb(2));
        envConfig.setConfigParam("je.log.faultReadSize", kb(4));
        envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
        envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
        envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
        /*
         * The cache size must be bigger in order to accommodate the larger
         * buffers - see OPENDJ-943.
         */
        envConfig.setConfigParam("je.maxMemory", mb(16));
        envConfig.setConfigParam(MAX_MEMORY, mb(16));
      }
      else
      {
@@ -122,7 +127,7 @@
         * Use 5M so that the replication can be used with 64M total for the
         * JVM.
         */
        envConfig.setConfigParam("je.maxMemory", mb(5));
        envConfig.setConfigParam(MAX_MEMORY, mb(5));
      }
      // Since records are always added at the end of the Replication log and
@@ -160,12 +165,27 @@
    return String.valueOf(sizeInMb * 1024 * 1024);
  }
  private Database openDatabase(String databaseName) throws RuntimeException
  private Database openDatabase(String databaseName) throws ChangelogException,
      RuntimeException
  {
    if (isShuttingDown.get())
    {
      // TODO JNR i18n
      throw new ChangelogException(Message.raw("DB is closing"));
    }
    final DatabaseConfig dbConfig = new DatabaseConfig();
    dbConfig.setAllowCreate(true);
    dbConfig.setTransactional(true);
    return dbEnvironment.openDatabase(null, databaseName, dbConfig);
    final Database db =
        dbEnvironment.openDatabase(null, databaseName, dbConfig);
    if (isShuttingDown.get())
    {
      closeDB(db);
      // TODO JNR i18n
      throw new ChangelogException(Message.raw("DB is closing"));
    }
    allDbs.add(db);
    return db;
  }
  /**
@@ -395,44 +415,69 @@
      }
    }
    /**
     * Shutdown the Db environment.
     */
    public void shutdown()
  /**
   * Shutdown the Db environment.
   */
  public void shutdown()
  {
    isShuttingDown.set(true);
    // CopyOnWriteArrayList iterator never throw ConcurrentModificationException
    // This code rely on openDatabase() to close databases opened concurrently
    // with this code
    final Database[] allDbsCopy = allDbs.toArray(new Database[0]);
    allDbs.clear();
    for (Database db : allDbsCopy)
    {
      try
      {
        changelogStateDb.close();
      } catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
      try
      {
        dbEnvironment.close();
      } catch (DatabaseException e)
      {
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
        mb.append(" ");
        mb.append(stackTraceToSingleLineString(e));
        logError(mb.toMessage());
      }
      closeDB(db);
    }
    try
    {
      dbEnvironment.close();
    }
    catch (DatabaseException e)
    {
      logError(newErrorMessage(e));
    }
  }
  private void closeDB(Database db)
  {
    allDbs.remove(db);
    try
    {
      db.close();
    }
    catch (DatabaseException e)
    {
      logError(newErrorMessage(e));
    }
  }
  private Message newErrorMessage(DatabaseException e)
  {
    if (!isShuttingDown.get())
    {
      return NOTE_EXCEPTION_CLOSING_DATABASE
          .get(stackTraceToSingleLineString(e));
    }
    MessageBuilder mb = new MessageBuilder();
    mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
    mb.append(" ");
    mb.append(stackTraceToSingleLineString(e));
    return mb.toMessage();
  }
  /**
   * Clears the provided generationId associated to the provided baseDN from the
   * state Db.
   *
   * @param baseDN
   *          The baseDN for which the generationID must be cleared.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void clearGenerationId(DN baseDN)
  public void clearGenerationId(DN baseDN) throws ChangelogException
  {
    deleteFromChangelogStateDB(buildGenIdKey(baseDN),
        "clearGenerationId(baseDN=" + baseDN + ")");
@@ -446,15 +491,17 @@
   *          The baseDN for which the serverId must be cleared.
   * @param serverId
   *          The serverId to remove from the Db.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void clearServerId(DN baseDN, int serverId)
  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId),
        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
  }
  private void deleteFromChangelogStateDB(String keyString,
      String methodInvocation)
      String methodInvocation) throws ChangelogException
  {
    if (debugEnabled())
      debug(methodInvocation + " starting");
@@ -490,18 +537,23 @@
    }
    catch (RuntimeException dbe)
    {
      // FIXME can actually happen (see catch above)
      // what should we do about it?
      throw new ChangelogException(dbe);
    }
  }
    /**
     * Clears the database.
     *
     * @param databaseName The name of the database to clear.
     * @param db
     *          The database to clear.
     */
    public final void clearDb(String databaseName)
    public final void clearDb(Database db)
    {
      String databaseName = db.getDatabaseName();
      // Closing is requested by Berkeley JE before truncate
      db.close();
      Transaction txn = null;
      try
      {
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -28,10 +28,7 @@
package org.opends.server.replication;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.*;
import java.util.concurrent.locks.Lock;
import org.opends.messages.Category;
@@ -406,9 +403,6 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
      " ##### Calling ReplicationTestCase.classCleanUp ##### "));
    // Clean RS databases
    cleanUpReplicationServersDB();
    removeReplicationServerDB();
    cleanConfigEntries();
@@ -487,15 +481,18 @@
   */
  protected void removeReplicationServerDB() throws Exception
  {
    for (ReplicationServer rs : ReplicationServer.getAllInstances())
    {
      clearChangelogDB(rs);
      rs.getChangelogDB().removeDB();
    }
    // avoid ConcurrentModificationException
    remove(new ArrayList<ReplicationServer>(ReplicationServer.getAllInstances()));
  }
  protected void remove(ReplicationServer... replicationServers) throws Exception
  {
    remove(Arrays.asList(replicationServers));
  }
  protected void remove(Collection<ReplicationServer> replicationServers)
      throws Exception
  {
    for (ReplicationServer rs : replicationServers)
    {
      if (rs != null)
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -29,7 +29,6 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
@@ -168,7 +167,6 @@
  @Test(enabled=true, dependsOnMethods = { "searchBackend"})
  public void replicationServerTest() throws Exception
  {
    clearChangelogDB(replicationServer);
    changelogBasic();
    newClientLateServer1();
    newClient();
@@ -192,7 +190,6 @@
  @Test(enabled=false, dependsOnMethods = { "searchBackend"})
  public void replicationServerTestLoop() throws Exception
  {
    clearChangelogDB(replicationServer);
    changelogBasic();
    while (true)
    {
@@ -286,7 +283,36 @@
        "ReplicationServer basic : incorrect message type received: "
            + receivedMsg.getClass() + ": content: " + receivedMsg);
    assertEquals(receivedMsg.toString(), sentMsg.toString(),
        "ReplicationServer basic : incorrect message body received.");
        "ReplicationServer basic : incorrect message body received. CSN is same as \""
            + getCSNFieldName(((DeleteMsg) receivedMsg).getCSN()) + "\" field.");
  }
  private String getCSNFieldName(CSN csn)
  {
    if (csn == null) {
      return "";
    }
    if (csn.equals(firstCSNServer1))
    {
      return "firstCSNServer1";
    }
    else if (csn.equals(secondCSNServer1))
    {
      return "secondCSNServer1";
    }
    else if (csn.equals(firstCSNServer2))
    {
      return "firstCSNServer2";
    }
    else if (csn.equals(secondCSNServer2))
    {
      return "secondCSNServer2";
    }
    else if (csn.equals(unknownCSNServer1))
    {
      return "unknownCSNServer1";
    }
    return null;
  }
  private ServerState newServerState(CSN... csns)
@@ -855,7 +881,6 @@
  @Test(enabled=true, dependsOnMethods = { "searchBackend"})
  public void windowProbeTest() throws Exception
  {
    debugInfo("Starting windowProbeTest");
    final int WINDOW = 10;
@@ -877,13 +902,13 @@
     */
    // open the first session to the replication server
    InetSocketAddress ServerAddr = new InetSocketAddress(
        InetAddress.getByName("localhost"), replicationServerPort);
    InetSocketAddress serverAddr =
        new HostPort("localhost", replicationServerPort).toInetSocketAddress();
    Socket socket = new Socket();
    socket.setReceiveBufferSize(1000000);
    socket.setTcpNoDelay(true);
    int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
    socket.connect(ServerAddr, timeoutMS);
    socket.connect(serverAddr, timeoutMS);
    ReplSessionSecurity replSessionSecurity = getReplSessionSecurity();
    Session session = replSessionSecurity.createClientSession(socket, timeoutMS);