mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
29.06.2014 6fde75c6138d9f81009935d8d782209d8426e4de
Added unit tests for encoding/decoding changelog state DB entries.


ReplicationDbEnv.java:
Split readChangelogState() into decodeChangelogState() and readWholeState().
Changed buildServerIdKey() into toReplicaEntry().
Combined buildGenIdKey() and buildGenIdData() into toGenIdEntry().
Made some methods package private for testing.
Made some methods protected for easing unit testing.
Extracted openJEEnvironment() from ctor.
Renamed getOrAddDb() to getOrAddReplicationDB().

ReplicationDbEnvTest.java: ADDED

ReplicationDB.java:
Consequence of the change to ReplicationDbEnv.
1 files added
2 files modified
490 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 20 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 341 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java 129 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -60,7 +60,7 @@
{
  private Database db;
  private ReplicationDbEnv dbenv;
  private ReplicationDbEnv dbEnv;
  private ReplicationServer replicationServer;
  private int serverId;
  private DN baseDN;
@@ -117,22 +117,22 @@
   * @param serverId The identifier of the LDAP server.
   * @param baseDN The baseDN of the replication domain.
   * @param replicationServer The ReplicationServer that needs to be shutdown.
   * @param dbenv The Db environment to use to create the db.
   * @param dbEnv The Db environment to use to create the db.
   * @throws ChangelogException If a database problem happened
   */
  public ReplicationDB(int serverId, DN baseDN,
      ReplicationServer replicationServer, ReplicationDbEnv dbenv)
      ReplicationServer replicationServer, ReplicationDbEnv dbEnv)
      throws ChangelogException
  {
    this.serverId = serverId;
    this.baseDN = baseDN;
    this.dbenv = dbenv;
    this.dbEnv = dbEnv;
    this.replicationServer = replicationServer;
    // Get or create the associated ReplicationServerDomain and Db.
    final ReplicationServerDomain domain =
        replicationServer.getReplicationServerDomain(baseDN, true);
    db = dbenv.getOrAddDb(serverId, baseDN, domain.getGenerationId());
    db = dbEnv.getOrAddReplicationDB(serverId, baseDN, domain.getGenerationId());
    intializeCounters();
  }
@@ -649,7 +649,7 @@
        // Create the transaction that will protect whatever done with this
        // write cursor.
        localTxn = dbenv.beginTransaction();
        localTxn = dbEnv.beginTransaction();
        localCursor = db.openCursor(localTxn, null);
        txn = localTxn;
@@ -701,7 +701,7 @@
        }
        catch (DatabaseException e)
        {
          dbenv.shutdownOnException(e);
          dbEnv.shutdownOnException(e);
        }
      }
    }
@@ -855,14 +855,14 @@
      }
      // Clears the reference to this serverID
      dbenv.clearServerId(baseDN, serverId);
      dbEnv.clearServerId(baseDN, serverId);
      final Database oldDb = db;
      db = null; // In case there's a failure between here and recreation.
      dbenv.clearDb(oldDb);
      dbEnv.clearDb(oldDb);
      // RE-create the db
      db = dbenv.getOrAddDb(serverId, baseDN, -1);
      db = dbEnv.getOrAddReplicationDB(serverId, baseDN, -1);
    }
    catch (Exception e)
    {
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -28,7 +28,9 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,58 +89,7 @@
    try
    {
      EnvironmentConfig envConfig = new EnvironmentConfig();
      /*
       * Create the DB Environment that will be used for all the
       * ReplicationServer activities related to the db
       */
      envConfig.setAllowCreate(true);
      envConfig.setTransactional(true);
      envConfig.setConfigParam(STATS_COLLECT, "false");
      envConfig.setConfigParam(CLEANER_THREADS, "2");
      envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
      /*
       * Tests have shown that since the parsing of the Replication log is
       * always done sequentially, it is not necessary to use a large DB cache.
       */
      if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
      {
        /*
         * If the JVM is reasonably large then we can safely default to bigger
         * read buffers. This will result in more scalable checkpointer and
         * cleaner performance.
         */
        envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
        envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
        envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
        /*
         * The cache size must be bigger in order to accommodate the larger
         * buffers - see OPENDJ-943.
         */
        envConfig.setConfigParam(MAX_MEMORY, mb(16));
      }
      else
      {
        /*
         * Use 5M so that the replication can be used with 64M total for the
         * JVM.
         */
        envConfig.setConfigParam(MAX_MEMORY, mb(5));
      }
      // Since records are always added at the end of the Replication log and
      // deleted at the beginning of the Replication log, this should never
      // cause any deadlock.
      envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
      envConfig.setLockTimeout(0, TimeUnit.SECONDS);
      // Since replication provides durability, we can reduce the DB durability
      // level so that we are immune to application / JVM crashes.
      envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
      dbEnvironment = new Environment(new File(path), envConfig);
      dbEnvironment = openJEEnvironment(path);
      /*
       * One database is created to store the update from each LDAP server in
@@ -153,6 +104,71 @@
    }
  }
  /**
   * Open a JE environment.
   * <p>
   * protected so it can be overridden by tests.
   *
   * @param path
   *          the path to the JE environment in the filesystem
   * @return the opened JE environment
   */
  protected Environment openJEEnvironment(String path)
  {
    final EnvironmentConfig envConfig = new EnvironmentConfig();
    /*
     * Create the DB Environment that will be used for all the
     * ReplicationServer activities related to the db
     */
    envConfig.setAllowCreate(true);
    envConfig.setTransactional(true);
    envConfig.setConfigParam(STATS_COLLECT, "false");
    envConfig.setConfigParam(CLEANER_THREADS, "2");
    envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
    /*
     * Tests have shown that since the parsing of the Replication log is
     * always done sequentially, it is not necessary to use a large DB cache.
     */
    if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
    {
      /*
       * If the JVM is reasonably large then we can safely default to bigger
       * read buffers. This will result in more scalable checkpointer and
       * cleaner performance.
       */
      envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
      envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
      envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
      /*
       * The cache size must be bigger in order to accommodate the larger
       * buffers - see OPENDJ-943.
       */
      envConfig.setConfigParam(MAX_MEMORY, mb(16));
    }
    else
    {
      /*
       * Use 5M so that the replication can be used with 64M total for the
       * JVM.
       */
      envConfig.setConfigParam(MAX_MEMORY, mb(5));
    }
    // Since records are always added at the end of the Replication log and
    // deleted at the beginning of the Replication log, this should never
    // cause any deadlock.
    envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
    envConfig.setLockTimeout(0, TimeUnit.SECONDS);
    // Since replication provides durability, we can reduce the DB durability
    // level so that we are immune to application / JVM crashes.
    envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
    return new Environment(new File(path), envConfig);
  }
  private String kb(int sizeInKb)
  {
    return String.valueOf(sizeInKb * 1024);
@@ -163,8 +179,21 @@
    return String.valueOf(sizeInMb * 1024 * 1024);
  }
  private Database openDatabase(String databaseName) throws ChangelogException,
      RuntimeException
  /**
   * Open a JE database.
   * <p>
   * protected so it can be overridden by tests.
   *
   * @param databaseName
   *          the databaseName to open
   * @return the opened JE database
   * @throws ChangelogException
   *           if a problem happened opening the database
   * @throws RuntimeException
   *           if a problem happened with the JE database
   */
  protected Database openDatabase(String databaseName)
      throws ChangelogException, RuntimeException
  {
    if (isShuttingDown.get())
    {
@@ -197,59 +226,90 @@
   */
  public ChangelogState readChangelogState() throws ChangelogException
  {
    return decodeChangelogState(readWholeState());
  }
  /**
   * Decode the whole changelog state DB.
   *
   * @param wholeState
   *          the whole changelog state DB as a Map.
   *          The Map is only used as a convenient collection of key => data objects
   * @return the decoded changelog state
   * @throws ChangelogException
   *           if a problem occurred while decoding
   */
  ChangelogState decodeChangelogState(Map<byte[], byte[]> wholeState)
      throws ChangelogException
  {
    try
    {
      final ChangelogState result = new ChangelogState();
      for (Entry<byte[], byte[]> entry : wholeState.entrySet())
      {
        final String stringKey = toString(entry.getKey());
        final String stringData = toString(entry.getValue());
        if (debugEnabled())
        {
          debug("read (key, value)=(" + stringKey + ", " + stringData + ")");
        }
        final String[] str = stringData.split(FIELD_SEPARATOR, 3);
        if (str[0].equals(GENERATION_ID_TAG))
        {
          final long generationId = toLong(str[1]);
          final DN baseDN = DN.decode(str[2]);
          if (debugEnabled())
          {
            debug("has read generationId: baseDN=" + baseDN + " generationId="
                + generationId);
          }
          result.setDomainGenerationId(baseDN, generationId);
        }
        else
        {
          final int serverId = toInt(str[0]);
          final DN baseDN = DN.decode(str[1]);
          if (debugEnabled())
          {
            debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId);
          }
          result.addServerIdToDomain(serverId, baseDN);
        }
      }
      return result;
    }
    catch (DirectoryException e)
    {
      throw new ChangelogException(e.getMessageObject(), e);
    }
  }
  private Map<byte[], byte[]> readWholeState() throws ChangelogException
  {
    DatabaseEntry key = new DatabaseEntry();
    DatabaseEntry data = new DatabaseEntry();
    Cursor cursor = changelogStateDb.openCursor(null, null);
    try
    {
      final ChangelogState result = new ChangelogState();
      final Map<byte[], byte[]> results = new LinkedHashMap<byte[], byte[]>();
      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
      while (status == OperationStatus.SUCCESS)
      {
        final String stringData = toString(data.getData());
        if (debugEnabled())
          debug("read (" + GENERATION_ID_TAG + " generationId baseDN) OR "
              + "(serverId baseDN): " + stringData);
        final String[] str = stringData.split(FIELD_SEPARATOR, 3);
        if (str[0].equals(GENERATION_ID_TAG))
        {
          long generationId = toLong(str[1]);
          DN baseDN = DN.decode(str[2]);
          if (debugEnabled())
            debug("has read baseDN=" + baseDN + " generationId=" +generationId);
          result.setDomainGenerationId(baseDN, generationId);
        }
        else
        {
          int serverId = toInt(str[0]);
          DN baseDN = DN.decode(str[1]);
          if (debugEnabled())
            debug("has read: baseDN=" + baseDN + " serverId=" + serverId);
          result.addServerIdToDomain(serverId, baseDN);
        }
        results.put(key.getData(), data.getData());
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      return result;
      return results;
    }
    catch (RuntimeException e)
    {
      final Message message = ERR_JEB_DATABASE_EXCEPTION.get(e.getMessage());
      throw new ChangelogException(message, e);
    }
    catch (DirectoryException e)
    {
      throw new ChangelogException(e.getMessageObject(), e);
    }
    finally
    {
      close(cursor);
@@ -261,7 +321,8 @@
    try
    {
      return Integer.parseInt(data);
    } catch (NumberFormatException e)
    }
    catch (NumberFormatException e)
    {
      // should never happen
      // TODO: i18n
@@ -301,7 +362,14 @@
    }
  }
  private byte[] toBytes(String s)
  /**
   * Converts the string to a UTF8-encoded byte array.
   *
   * @param s
   *          the string to convert
   * @return the byte array representation of the UTF8-encoded string
   */
  static byte[] toBytes(String s)
  {
    try
    {
@@ -315,8 +383,8 @@
  }
  /**
   * Finds or creates the database used to store changes from the server with
   * the given serverId and the given baseDN.
   * Finds or creates the database used to store changes for a replica with the
   * given baseDN and serverId.
   *
   * @param serverId
   *          The server id that identifies the server.
@@ -328,26 +396,27 @@
   * @throws ChangelogException
   *           in case of underlying Exception.
   */
  public Database getOrAddDb(int serverId, DN baseDN, long generationId)
  public Database getOrAddReplicationDB(int serverId, DN baseDN, long generationId)
      throws ChangelogException
  {
    if (debugEnabled())
    {
      debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDN + ", "
          + generationId + ")");
    }
    try
    {
      // JNR: redundant info is stored between the key and data down below.
      // It is probably ok since "changelogstate" DB does not receive a high
      // volume of inserts.
      final String serverIdToBaseDn = buildServerIdKey(baseDN, serverId);
      Entry<String, String> replicaEntry = toReplicaEntry(baseDN, serverId);
      // Opens the DB for the changes received from this server on this domain.
      Database db = openDatabase(serverIdToBaseDn);
      final Database replicaDB = openDatabase(replicaEntry.getKey());
      putInChangelogStateDBIfNotExist(serverIdToBaseDn, serverIdToBaseDn);
      putInChangelogStateDBIfNotExist(buildGenIdKey(baseDN),
                                      buildGenIdData(baseDN, generationId));
      return db;
      putInChangelogStateDBIfNotExist(replicaEntry);
      putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
      return replicaDB;
    }
    catch (RuntimeException e)
    {
@@ -355,36 +424,57 @@
    }
  }
  private String buildGenIdKey(DN baseDN)
  /**
   * Return an entry to store in the changelog state database representing a
   * replica in the topology.
   *
   * @param baseDN
   *          the replica's baseDN
   * @param serverId
   *          the replica's serverId
   * @return a database entry for the replica
   */
  Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
  {
    return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDN.toNormalizedString();
    final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
    return new SimpleImmutableEntry<String, String>(key, key);
  }
  private String buildServerIdKey(DN baseDN, int serverId)
  /**
   * Return an entry to store in the changelog state database representing the
   * domain generation id.
   *
   * @param baseDN
   *          the domain's baseDN
   * @param generationId
   *          the domain's generationId
   * @return a database entry for the generationId
   */
  Entry<String, String> toGenIdEntry(DN baseDN, long generationId)
  {
    return serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
    final String normDn = baseDN.toNormalizedString();
    final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn;
    final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
        + FIELD_SEPARATOR + normDn;
    return new SimpleImmutableEntry<String, String>(key, data);
  }
  private String buildGenIdData(DN baseDN, long generationId)
  private void putInChangelogStateDBIfNotExist(Entry<String, String> entry)
      throws RuntimeException
  {
    return GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR
        + baseDN.toNormalizedString();
  }
  private void putInChangelogStateDBIfNotExist(String keyString,
      String dataString) throws RuntimeException
  {
    DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
    DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
    DatabaseEntry data = new DatabaseEntry();
    if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
    {
      Transaction txn = dbEnvironment.beginTransaction(null, null);
      try
      {
        data.setData(toBytes(dataString));
        data.setData(toBytes(entry.getValue()));
        if (debugEnabled())
          debug("putting record in the changelogstate Db key=[" + keyString
              + "] value=[" + dataString + "]");
        {
          debug("putting record in the changelogstate Db key=["
              + entry.getKey() + "] value=[" + entry.getValue() + "]");
        }
        changelogStateDb.put(txn, key, data);
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
      }
@@ -475,7 +565,8 @@
   */
  public void clearGenerationId(DN baseDN) throws ChangelogException
  {
    deleteFromChangelogStateDB(buildGenIdKey(baseDN),
    final int unusedGenId = 0;
    deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
        "clearGenerationId(baseDN=" + baseDN + ")");
  }
@@ -492,19 +583,21 @@
   */
  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId),
    deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId),
        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
  }
  private void deleteFromChangelogStateDB(String keyString,
  private void deleteFromChangelogStateDB(Entry<String, ?> entry,
      String methodInvocation) throws ChangelogException
  {
    if (debugEnabled())
    {
      debug(methodInvocation + " starting");
    }
    try
    {
      final DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
      final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
      final DatabaseEntry data = new DatabaseEntry();
      if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
      {
@@ -514,7 +607,9 @@
          changelogStateDb.delete(txn, key);
          txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
          if (debugEnabled())
          {
            debug(methodInvocation + " succeeded");
          }
        }
        catch (RuntimeException dbe)
        {
@@ -526,8 +621,10 @@
      else
      {
        if (debugEnabled())
          debug(methodInvocation + " failed: key=[ " + keyString
        {
          debug(methodInvocation + " failed: key=[ " + entry.getKey()
              + "] not found");
        }
      }
    }
    catch (RuntimeException dbe)
@@ -570,7 +667,9 @@
        try
        {
          if (txn != null)
          {
            txn.abort();
          }
        }
        catch(Exception e)
        { /* do nothing */ }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
New file
@@ -0,0 +1,129 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DN;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.sleepycat.je.Database;
import com.sleepycat.je.Environment;
import static java.util.Arrays.*;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.je.ReplicationDbEnv.*;
@SuppressWarnings("javadoc")
public class ReplicationDbEnvTest extends DirectoryServerTestCase
{
    /**
     * Bypass heavyweight setup.
     */
    private final class TestableReplicationDbEnv extends ReplicationDbEnv
    {
        private TestableReplicationDbEnv() throws ChangelogException
        {
            super(null, null);
        }
        @Override
        protected Environment openJEEnvironment(String path)
        {
            return null;
        }
        @Override
        protected Database openDatabase(String databaseName)
                throws ChangelogException, RuntimeException
        {
            return null;
        }
    }
    @BeforeClass
    public void setup() throws Exception
    {
        TestCaseUtils.startFakeServer();
    }
    @AfterClass
    public void teardown()
    {
        TestCaseUtils.shutdownFakeServer();
    }
    @DataProvider
    public Object[][] changelogStateDataProvider() throws Exception
    {
        return new Object[][] {
            { DN.decode("dc=example,dc=com"), 524157415, asList(42, 346) },
            // test with a space in the baseDN (space is the field separator in the DB)
            // FIXME does not work yet (gosh!!)
            // { DN.decode("cn=admin data"), 524157415, asList(42, 346) },
      };
    }
    @Test(dataProvider = "changelogStateDataProvider")
    public void encodeDecodeChangelogState(DN baseDN, long generationId,
            List<Integer> serverIds) throws Exception
    {
        final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv();
        // encode data
        final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>();
        put(wholeState, changelogStateDB.toGenIdEntry(baseDN, generationId));
        for (Integer serverId : serverIds)
        {
            put(wholeState, changelogStateDB.toReplicaEntry(baseDN, serverId));
        }
        // decode data
        final ChangelogState state =
                changelogStateDB.decodeChangelogState(wholeState);
        assertThat(state.getDomainToGenerationId()).containsExactly(
                entry(baseDN, generationId));
        assertThat(state.getDomainToServerIds()).containsExactly(
                entry(baseDN, serverIds));
    }
    private void put(Map<byte[], byte[]> map, Entry<String, String> entry)
    {
        map.put(toBytes(entry.getKey()), toBytes(entry.getValue()));
    }
}