mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.20.2014 68aa678de0eb1feffaec78032cbf0fc2431a52ae
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -61,7 +61,7 @@
  private Database db;
  private ReplicationDbEnv dbenv;
  private ReplicationDbEnv dbEnv;
  private ReplicationServer replicationServer;
  private int serverId;
  private DN baseDN;
@@ -118,22 +118,22 @@
   * @param serverId The identifier of the LDAP server.
   * @param baseDN The baseDN of the replication domain.
   * @param replicationServer The ReplicationServer that needs to be shutdown.
   * @param dbenv The Db environment to use to create the db.
   * @param dbEnv The Db environment to use to create the db.
   * @throws ChangelogException If a database problem happened
   */
  public ReplicationDB(int serverId, DN baseDN,
      ReplicationServer replicationServer, ReplicationDbEnv dbenv)
      ReplicationServer replicationServer, ReplicationDbEnv dbEnv)
      throws ChangelogException
  {
    this.serverId = serverId;
    this.baseDN = baseDN;
    this.dbenv = dbenv;
    this.dbEnv = dbEnv;
    this.replicationServer = replicationServer;
    // Get or create the associated ReplicationServerDomain and Db.
    final ReplicationServerDomain domain =
        replicationServer.getReplicationServerDomain(baseDN, true);
    db = dbenv.getOrAddDb(serverId, baseDN, domain.getGenerationId());
    db = dbEnv.getOrAddReplicationDB(serverId, baseDN, domain.getGenerationId());
    intializeCounters();
  }
@@ -649,7 +649,7 @@
        // Create the transaction that will protect whatever done with this
        // write cursor.
        localTxn = dbenv.beginTransaction();
        localTxn = dbEnv.beginTransaction();
        localCursor = db.openCursor(localTxn, null);
        txn = localTxn;
@@ -701,7 +701,7 @@
        }
        catch (DatabaseException e)
        {
          dbenv.shutdownOnException(e);
          dbEnv.shutdownOnException(e);
        }
      }
    }
@@ -852,14 +852,14 @@
      }
      // Clears the reference to this serverID
      dbenv.clearServerId(baseDN, serverId);
      dbEnv.clearServerId(baseDN, serverId);
      final Database oldDb = db;
      db = null; // In case there's a failure between here and recreation.
      dbenv.clearDb(oldDb);
      dbEnv.clearDb(oldDb);
      // RE-create the db
      db = dbenv.getOrAddDb(serverId, baseDN, -1);
      db = dbEnv.getOrAddReplicationDB(serverId, baseDN, -1);
    }
    catch (Exception e)
    {
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -28,7 +28,9 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -84,58 +86,7 @@
    try
    {
      EnvironmentConfig envConfig = new EnvironmentConfig();
      /*
       * Create the DB Environment that will be used for all the
       * ReplicationServer activities related to the db
       */
      envConfig.setAllowCreate(true);
      envConfig.setTransactional(true);
      envConfig.setConfigParam(STATS_COLLECT, "false");
      envConfig.setConfigParam(CLEANER_THREADS, "2");
      envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
      /*
       * Tests have shown that since the parsing of the Replication log is
       * always done sequentially, it is not necessary to use a large DB cache.
       */
      if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
      {
        /*
         * If the JVM is reasonably large then we can safely default to bigger
         * read buffers. This will result in more scalable checkpointer and
         * cleaner performance.
         */
        envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
        envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
        envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
        /*
         * The cache size must be bigger in order to accommodate the larger
         * buffers - see OPENDJ-943.
         */
        envConfig.setConfigParam(MAX_MEMORY, mb(16));
      }
      else
      {
        /*
         * Use 5M so that the replication can be used with 64M total for the
         * JVM.
         */
        envConfig.setConfigParam(MAX_MEMORY, mb(5));
      }
      // Since records are always added at the end of the Replication log and
      // deleted at the beginning of the Replication log, this should never
      // cause any deadlock.
      envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
      envConfig.setLockTimeout(0, TimeUnit.SECONDS);
      // Since replication provides durability, we can reduce the DB durability
      // level so that we are immune to application / JVM crashes.
      envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
      dbEnvironment = new Environment(new File(path), envConfig);
      dbEnvironment = openJEEnvironment(path);
      /*
       * One database is created to store the update from each LDAP server in
@@ -150,6 +101,71 @@
    }
  }
  /**
   * Open a JE environment.
   * <p>
   * protected so it can be overridden by tests.
   *
   * @param path
   *          the path to the JE environment in the filesystem
   * @return the opened JE environment
   */
  protected Environment openJEEnvironment(String path)
  {
    final EnvironmentConfig envConfig = new EnvironmentConfig();
    /*
     * Create the DB Environment that will be used for all the
     * ReplicationServer activities related to the db
     */
    envConfig.setAllowCreate(true);
    envConfig.setTransactional(true);
    envConfig.setConfigParam(STATS_COLLECT, "false");
    envConfig.setConfigParam(CLEANER_THREADS, "2");
    envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
    /*
     * Tests have shown that since the parsing of the Replication log is
     * always done sequentially, it is not necessary to use a large DB cache.
     */
    if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
    {
      /*
       * If the JVM is reasonably large then we can safely default to bigger
       * read buffers. This will result in more scalable checkpointer and
       * cleaner performance.
       */
      envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
      envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
      envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
      /*
       * The cache size must be bigger in order to accommodate the larger
       * buffers - see OPENDJ-943.
       */
      envConfig.setConfigParam(MAX_MEMORY, mb(16));
    }
    else
    {
      /*
       * Use 5M so that the replication can be used with 64M total for the
       * JVM.
       */
      envConfig.setConfigParam(MAX_MEMORY, mb(5));
    }
    // Since records are always added at the end of the Replication log and
    // deleted at the beginning of the Replication log, this should never
    // cause any deadlock.
    envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
    envConfig.setLockTimeout(0, TimeUnit.SECONDS);
    // Since replication provides durability, we can reduce the DB durability
    // level so that we are immune to application / JVM crashes.
    envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
    return new Environment(new File(path), envConfig);
  }
  private String kb(int sizeInKb)
  {
    return String.valueOf(sizeInKb * 1024);
@@ -160,8 +176,21 @@
    return String.valueOf(sizeInMb * 1024 * 1024);
  }
  private Database openDatabase(String databaseName) throws ChangelogException,
      RuntimeException
  /**
   * Open a JE database.
   * <p>
   * protected so it can be overridden by tests.
   *
   * @param databaseName
   *          the databaseName to open
   * @return the opened JE database
   * @throws ChangelogException
   *           if a problem happened opening the database
   * @throws RuntimeException
   *           if a problem happened with the JE database
   */
  protected Database openDatabase(String databaseName)
      throws ChangelogException, RuntimeException
  {
    if (isShuttingDown.get())
    {
@@ -194,59 +223,92 @@
   */
  public ChangelogState readChangelogState() throws ChangelogException
  {
    return decodeChangelogState(readWholeState());
  }
  /**
   * Decode the whole changelog state DB.
   *
   * @param wholeState
   *          the whole changelog state DB as a Map.
   *          The Map is only used as a convenient collection of key => data objects
   * @return the decoded changelog state
   * @throws ChangelogException
   *           if a problem occurred while decoding
   */
  ChangelogState decodeChangelogState(Map<byte[], byte[]> wholeState)
      throws ChangelogException
  {
    try
    {
      final ChangelogState result = new ChangelogState();
      for (Entry<byte[], byte[]> entry : wholeState.entrySet())
      {
        final String stringKey = toString(entry.getKey());
        final String stringData = toString(entry.getValue());
        if (logger.isTraceEnabled())
        {
          debug("read (key, value)=(" + stringKey + ", " + stringData + ")");
        }
        final String[] str = stringData.split(FIELD_SEPARATOR, 3);
        if (str[0].equals(GENERATION_ID_TAG))
        {
          final long generationId = toLong(str[1]);
          final DN baseDN = DN.valueOf(str[2]);
          if (logger.isTraceEnabled())
          {
            debug("has read generationId: baseDN=" + baseDN + " generationId="
                + generationId);
          }
          result.setDomainGenerationId(baseDN, generationId);
        }
        else
        {
          final int serverId = toInt(str[0]);
          final DN baseDN = DN.valueOf(str[1]);
          if (logger.isTraceEnabled())
          {
            debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId);
          }
          result.addServerIdToDomain(serverId, baseDN);
        }
      }
      return result;
    }
    catch (DirectoryException e)
    {
      throw new ChangelogException(e.getMessageObject(), e);
    }
  }
  private Map<byte[], byte[]> readWholeState() throws ChangelogException
  {
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    Cursor cursor = changelogStateDb.openCursor(null, null);
    try
    {
      final ChangelogState result = new ChangelogState();
      final Map<byte[], byte[]> results = new LinkedHashMap<byte[], byte[]>();
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        final String stringData = toString(data.getData());
        if (logger.isTraceEnabled())
          debug("read (" + GENERATION_ID_TAG + " generationId baseDN) OR "
              + "(serverId baseDN): " + stringData);
        final String[] str = stringData.split(FIELD_SEPARATOR, 3);
        if (str[0].equals(GENERATION_ID_TAG))
        {
          long generationId = toLong(str[1]);
          DN baseDN = DN.valueOf(str[2]);
          if (logger.isTraceEnabled())
            debug("has read baseDN=" + baseDN + " generationId=" +generationId);
          result.setDomainGenerationId(baseDN, generationId);
        }
        else
        {
          int serverId = toInt(str[0]);
          DN baseDN = DN.valueOf(str[1]);
          if (logger.isTraceEnabled())
            debug("has read: baseDN=" + baseDN + " serverId=" + serverId);
          result.addServerIdToDomain(serverId, baseDN);
        }
        results.put(key.getData(), data.getData());
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      return result;
      return results;
    }
    catch (RuntimeException e)
    {
      final LocalizableMessage message = ERR_JEB_DATABASE_EXCEPTION.get(e.getMessage());
      throw new ChangelogException(message, e);
    }
    catch (DirectoryException e)
    {
      throw new ChangelogException(e.getMessageObject(), e);
    }
    finally
    {
      close(cursor);
@@ -258,7 +320,8 @@
    try
    {
      return Integer.parseInt(data);
    } catch (NumberFormatException e)
    }
    catch (NumberFormatException e)
    {
      // should never happen
      // TODO: i18n
@@ -298,7 +361,14 @@
    }
  }
  private byte[] toBytes(String s)
  /**
   * Converts the string to a UTF8-encoded byte array.
   *
   * @param s
   *          the string to convert
   * @return the byte array representation of the UTF8-encoded string
   */
  static byte[] toBytes(String s)
  {
    try
    {
@@ -312,8 +382,8 @@
  }
  /**
   * Finds or creates the database used to store changes from the server with
   * the given serverId and the given baseDN.
   * Finds or creates the database used to store changes for a replica with the
   * given baseDN and serverId.
   *
   * @param serverId
   *          The server id that identifies the server.
@@ -325,26 +395,27 @@
   * @throws ChangelogException
   *           in case of underlying Exception.
   */
  public Database getOrAddDb(int serverId, DN baseDN, long generationId)
  public Database getOrAddReplicationDB(int serverId, DN baseDN, long generationId)
      throws ChangelogException
  {
    if (logger.isTraceEnabled())
    {
      debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDN + ", "
          + generationId + ")");
    }
    try
    {
      // JNR: redundant info is stored between the key and data down below.
      // It is probably ok since "changelogstate" DB does not receive a high
      // volume of inserts.
      final String serverIdToBaseDn = buildServerIdKey(baseDN, serverId);
      Entry<String, String> replicaEntry = toReplicaEntry(baseDN, serverId);
      // Opens the DB for the changes received from this server on this domain.
      Database db = openDatabase(serverIdToBaseDn);
      final Database replicaDB = openDatabase(replicaEntry.getKey());
      putInChangelogStateDBIfNotExist(serverIdToBaseDn, serverIdToBaseDn);
      putInChangelogStateDBIfNotExist(buildGenIdKey(baseDN),
                                      buildGenIdData(baseDN, generationId));
      return db;
      putInChangelogStateDBIfNotExist(replicaEntry);
      putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
      return replicaDB;
    }
    catch (RuntimeException e)
    {
@@ -352,36 +423,57 @@
    }
  }
  private String buildGenIdKey(DN baseDN)
  /**
   * Return an entry to store in the changelog state database representing a
   * replica in the topology.
   *
   * @param baseDN
   *          the replica's baseDN
   * @param serverId
   *          the replica's serverId
   * @return a database entry for the replica
   */
  Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
  {
    return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDN.toNormalizedString();
    final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
    return new SimpleImmutableEntry<String, String>(key, key);
  }
  private String buildServerIdKey(DN baseDN, int serverId)
  /**
   * Return an entry to store in the changelog state database representing the
   * domain generation id.
   *
   * @param baseDN
   *          the domain's baseDN
   * @param generationId
   *          the domain's generationId
   * @return a database entry for the generationId
   */
  Entry<String, String> toGenIdEntry(DN baseDN, long generationId)
  {
    return serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
    final String normDn = baseDN.toNormalizedString();
    final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn;
    final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
        + FIELD_SEPARATOR + normDn;
    return new SimpleImmutableEntry<String, String>(key, data);
  }
  private String buildGenIdData(DN baseDN, long generationId)
  private void putInChangelogStateDBIfNotExist(Entry<String, String> entry)
      throws RuntimeException
  {
    return GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR
        + baseDN.toNormalizedString();
  }
  private void putInChangelogStateDBIfNotExist(String keyString,
      String dataString) throws RuntimeException
  {
    DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
    DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
    DatabaseEntry data = new DatabaseEntry();
    if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
    {
      Transaction txn = dbEnvironment.beginTransaction(null, null);
      try
      {
        data.setData(toBytes(dataString));
        data.setData(toBytes(entry.getValue()));
        if (logger.isTraceEnabled())
          debug("putting record in the changelogstate Db key=[" + keyString
              + "] value=[" + dataString + "]");
        {
          debug("putting record in the changelogstate Db key=["
              + entry.getKey() + "] value=[" + entry.getValue() + "]");
        }
        changelogStateDb.put(txn, key, data);
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
      }
@@ -472,7 +564,8 @@
   */
  public void clearGenerationId(DN baseDN) throws ChangelogException
  {
    deleteFromChangelogStateDB(buildGenIdKey(baseDN),
    final int unusedGenId = 0;
    deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
        "clearGenerationId(baseDN=" + baseDN + ")");
  }
@@ -489,19 +582,21 @@
   */
  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId),
    deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId),
        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
  }
  private void deleteFromChangelogStateDB(String keyString,
  private void deleteFromChangelogStateDB(Entry<String, ?> entry,
      String methodInvocation) throws ChangelogException
  {
    if (logger.isTraceEnabled())
    {
      debug(methodInvocation + " starting");
    }
    try
    {
      final DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
      final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
      final DatabaseEntry data = new DatabaseEntry();
      if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
      {
@@ -511,7 +606,9 @@
          changelogStateDb.delete(txn, key);
          txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          if (logger.isTraceEnabled())
          {
            debug(methodInvocation + " succeeded");
          }
        }
        catch (RuntimeException dbe)
        {
@@ -523,8 +620,10 @@
      else
      {
        if (logger.isTraceEnabled())
          debug(methodInvocation + " failed: key=[ " + keyString
        {
          debug(methodInvocation + " failed: key=[ " + entry.getKey()
              + "] not found");
        }
      }
    }
    catch (RuntimeException dbe)
@@ -564,7 +663,9 @@
        try
        {
          if (txn != null)
          {
            txn.abort();
          }
        }
        catch(Exception e)
        { /* do nothing */ }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
New file
@@ -0,0 +1,129 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DN;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.sleepycat.je.Database;
import com.sleepycat.je.Environment;
import static java.util.Arrays.*;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.je.ReplicationDbEnv.*;
@SuppressWarnings("javadoc")
public class ReplicationDbEnvTest extends DirectoryServerTestCase
{
   /**
    * Bypass heavyweight setup.
    */
   private final class TestableReplicationDbEnv extends ReplicationDbEnv
   {
      private TestableReplicationDbEnv() throws ChangelogException
      {
         super(null, null);
      }
      @Override
      protected Environment openJEEnvironment(String path)
      {
         return null;
      }
      @Override
      protected Database openDatabase(String databaseName)
            throws ChangelogException, RuntimeException
      {
         return null;
      }
   }
   @BeforeClass
   public void setup() throws Exception
   {
      TestCaseUtils.startFakeServer();
   }
   @AfterClass
   public void teardown()
   {
      TestCaseUtils.shutdownFakeServer();
   }
   @DataProvider
   public Object[][] changelogStateDataProvider() throws Exception
   {
      return new Object[][] {
         { DN.valueOf("dc=example,dc=com"), 524157415, asList(42, 346) },
         // test with a space in the baseDN (space is the field separator in the DB)
         // FIXME does not work yet (gosh!!)
         // { DN.decode("cn=admin data"), 524157415, asList(42, 346) },
     };
   }
   @Test(dataProvider = "changelogStateDataProvider")
   public void encodeDecodeChangelogState(DN baseDN, long generationId,
         List<Integer> serverIds) throws Exception
   {
      final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv();
      // encode data
      final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>();
      put(wholeState, changelogStateDB.toGenIdEntry(baseDN, generationId));
      for (Integer serverId : serverIds)
      {
         put(wholeState, changelogStateDB.toReplicaEntry(baseDN, serverId));
      }
      // decode data
      final ChangelogState state =
            changelogStateDB.decodeChangelogState(wholeState);
      assertThat(state.getDomainToGenerationId()).containsExactly(
            entry(baseDN, generationId));
      assertThat(state.getDomainToServerIds()).containsExactly(
            entry(baseDN, serverIds));
   }
   private void put(Map<byte[], byte[]> map, Entry<String, String> entry)
   {
      map.put(toBytes(entry.getKey()), toBytes(entry.getValue()));
   }
}