From 6c5c0af35aabc59a56c71e9c9296a7398a3e9176 Mon Sep 17 00:00:00 2001
From: Jean-Noël Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 25 Nov 2015 15:09:53 +0000
Subject: [PATCH] OPENDJ-2337 Remove JE changelog code
---
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/TopologyViewTest.java | 19 +
opendj-server-legacy/src/test/java/org/opends/server/replication/service/ReplicationDomainTest.java | 6
opendj-server-legacy/src/test/java/org/opends/server/replication/ReSyncTest.java | 1
opendj-server-legacy/src/test/java/org/opends/server/replication/InitOnLineTest.java | 8
opendj-server-legacy/src/test/java/org/opends/server/backends/ChangelogBackendTestCase.java | 1
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java | 23 +-
opendj-server-legacy/resource/schema/02-config.ldif | 7
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/HistoricalTest.java | 1
opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplServerFakeConfiguration.java | 48 ----
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java | 15 -
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServer.java | 13 -
opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationServerConfiguration.xml | 26 --
opendj-server-legacy/src/test/java/org/opends/server/replication/ProtocolWindowTest.java | 3
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java | 2
opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java | 1
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java | 57 ++---
opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerTest.java | 12
opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java | 23 --
opendj-server-legacy/src/test/java/org/opends/server/replication/GenerationIdTest.java | 5
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/GroupIdHandshakeTest.java | 21 -
opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java | 5
opendj-server-legacy/src/test/java/org/opends/server/replication/ChangeNumberControlPluginTestCase.java | 3
opendj-server-legacy/src/test/java/org/opends/server/replication/server/MonitorTest.java | 18 +
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java | 6
opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java | 1
opendj-server-legacy/src/test/java/org/opends/server/DirectoryServerTestCase.java | 8
opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java | 15 -
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java | 7
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java | 6
/dev/null | 161 -----------------
opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java | 4
opendj-server-legacy/src/main/java/org/opends/server/tools/upgrade/Upgrade.java | 6
32 files changed, 114 insertions(+), 418 deletions(-)
diff --git a/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationServerConfiguration.xml b/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationServerConfiguration.xml
index 860b641..e998421 100644
--- a/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationServerConfiguration.xml
+++ b/opendj-maven-plugin/src/main/resources/config/xml/org/forgerock/opendj/server/config/ReplicationServerConfiguration.xml
@@ -183,32 +183,6 @@
</ldap:attribute>
</adm:profile>
</adm:property>
- <adm:property name="replication-db-implementation" advanced="true">
- <adm:synopsis>
- The <adm:user-friendly-name /> database implementation
- that stores all persistent information.
- </adm:synopsis>
- <adm:default-behavior>
- <adm:defined>
- <adm:value>log</adm:value>
- </adm:defined>
- </adm:default-behavior>
- <adm:syntax>
- <adm:enumeration>
- <adm:value name="je">
- <adm:synopsis>Implementation based on Berkeley DB JE database.</adm:synopsis>
- </adm:value>
- <adm:value name="log">
- <adm:synopsis>Implementation based on log file.</adm:synopsis>
- </adm:value>
- </adm:enumeration>
- </adm:syntax>
- <adm:profile name="ldap">
- <ldap:attribute>
- <ldap:name>ds-cfg-replication-db-implementation</ldap:name>
- </ldap:attribute>
- </adm:profile>
- </adm:property>
<adm:property name="replication-purge-delay">
<adm:synopsis>
The time (in seconds) after which the
diff --git a/opendj-server-legacy/resource/schema/02-config.ldif b/opendj-server-legacy/resource/schema/02-config.ldif
index 10063c4..b7e9552 100644
--- a/opendj-server-legacy/resource/schema/02-config.ldif
+++ b/opendj-server-legacy/resource/schema/02-config.ldif
@@ -3765,12 +3765,6 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
SINGLE-VALUE
X-ORIGIN 'OpenDJ Directory Server' )
-attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.142
- NAME 'ds-cfg-replication-db-implementation'
- EQUALITY caseExactMatch
- SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
- SINGLE-VALUE
- X-ORIGIN 'OpenDJ Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.143
NAME 'ds-cfg-source-address' EQUALITY caseIgnoreMatch
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
@@ -4534,7 +4528,6 @@
ds-cfg-window-size $
ds-cfg-queue-size $
ds-cfg-replication-db-directory $
- ds-cfg-replication-db-implementation $
ds-cfg-replication-purge-delay $
ds-cfg-group-id $
ds-cfg-assured-timeout $
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServer.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServer.java
index cc76193..9f244c0 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServer.java
@@ -43,7 +43,6 @@
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.SearchScope;
import org.opends.server.admin.server.ConfigurationChangeListener;
-import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.ConflictBehavior;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
@@ -65,7 +64,6 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate;
import org.opends.server.replication.server.changelog.file.FileChangelogDB;
-import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
@@ -173,16 +171,7 @@
this.domainPredicate = predicate;
enableExternalChangeLog();
- ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
- logger.trace("Using %s as DB implementation for changelog DB", dbImpl);
- if (dbImpl == ReplicationDBImplementation.JE)
- {
- this.changelogDB = new JEChangelogDB(this, cfg);
- }
- else
- {
- this.changelogDB = new FileChangelogDB(this, cfg);
- }
+ this.changelogDB = new FileChangelogDB(this, cfg);
replSessionSecurity = new ReplSessionSecurity();
initialize();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/DraftCNDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/DraftCNDB.java
deleted file mode 100644
index 0b2ae1c..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/DraftCNDB.java
+++ /dev/null
@@ -1,636 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import static com.sleepycat.je.LockMode.*;
-import static com.sleepycat.je.OperationStatus.*;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.*;
-
-import java.io.Closeable;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-
-import com.sleepycat.je.*;
-
-/**
- * This class implements the interface between the underlying database
- * and the {@link JEChangeNumberIndexDB} class.
- * This is the only class that should have code using the BDB interfaces.
- */
-public class DraftCNDB
-{
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- private Database db;
- private ReplicationDbEnv dbenv;
-
- /**
- * The lock used to provide exclusive access to the thread that close the db
- * (shutdown or clear).
- */
- private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true);
-
- /**
- * Creates a new database or open existing database that will be used
- * to store and retrieve changes from an LDAP server.
- * @param dbenv The Db environment to use to create the db.
- * @throws ChangelogException If a database problem happened
- */
- public DraftCNDB(ReplicationDbEnv dbenv) throws ChangelogException
- {
- this.dbenv = dbenv;
-
- // Get or create the associated ReplicationServerDomain and Db.
- db = dbenv.getOrCreateCNIndexDB();
- }
-
- /**
- * Add a record to the database.
- *
- * @param record
- * the provided {@link ChangeNumberIndexRecord} to be stored.
- * @throws ChangelogException
- * If a database problem happened
- */
- public void addRecord(ChangeNumberIndexRecord record)
- throws ChangelogException
- {
- try
- {
- final long changeNumber = record.getChangeNumber();
- DatabaseEntry key = new ReplicationDraftCNKey(changeNumber);
- DatabaseEntry data = new DraftCNData(changeNumber, record.getBaseDN(), record.getCSN());
-
- // Use a transaction so that we can override durability.
- Transaction txn = null;
- dbCloseLock.readLock().lock();
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return;
- }
-
- txn = dbenv.beginTransaction();
- db.put(txn, key, data);
- txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
- }
- finally
- {
- JEUtils.abort(txn);
- dbCloseLock.readLock().unlock();
- }
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- /**
- * Shutdown the database.
- */
- public void shutdown()
- {
- dbCloseLock.writeLock().lock();
- try
- {
- db.close();
- db = null;
- }
- catch (DatabaseException e)
- {
- logger.info(NOTE_EXCEPTION_CLOSING_DATABASE, this, stackTraceToSingleLineString(e));
- }
- finally
- {
- dbCloseLock.writeLock().unlock();
- }
- }
-
- /**
- * Create a cursor that can be used to search or iterate on this DB.
- *
- * @param changeNumber The change number from which the cursor must start.
- * @return The ReplServerDBCursor
- * @throws ChangelogException If a database error prevented the cursor
- * creation.
- */
- public DraftCNDBCursor openReadCursor(long changeNumber)
- throws ChangelogException
- {
- return new DraftCNDBCursor(changeNumber);
- }
-
- /**
- * Create a cursor that can be used to delete some record from this
- * ReplicationServer database.
- *
- * @return The ReplServerDBCursor
- * @throws ChangelogException If a database error prevented the cursor
- * creation.
- */
- public DraftCNDBCursor openDeleteCursor() throws ChangelogException
- {
- return new DraftCNDBCursor();
- }
-
- private void closeLockedCursor(Cursor cursor)
- {
- try
- {
- close(cursor);
- }
- finally
- {
- dbCloseLock.readLock().unlock();
- }
- }
-
- /**
- * Read the first Change from the database, 0 when none.
- *
- * @return the first change number.
- * @throws ChangelogException
- * if a database problem occurred
- */
- public ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
- {
- try
- {
- dbCloseLock.readLock().lock();
- Cursor cursor = null;
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return null;
- }
-
- cursor = db.openCursor(null, null);
- ReplicationDraftCNKey key = new ReplicationDraftCNKey();
- DatabaseEntry entry = new DatabaseEntry();
- if (cursor.getFirst(key, entry, LockMode.DEFAULT) != SUCCESS)
- {
- return null;
- }
-
- return newCNIndexRecord(key, entry);
- }
- finally
- {
- closeLockedCursor(cursor);
- }
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- private ChangeNumberIndexRecord newCNIndexRecord(ReplicationDraftCNKey key,
- DatabaseEntry data) throws ChangelogException
- {
- return new DraftCNData(key.getChangeNumber(), data.getData()).getRecord();
- }
-
- /**
- * Return the record count.
- * @return the record count.
- */
- public long count()
- {
- dbCloseLock.readLock().lock();
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return 0;
- }
-
- return db.count();
- }
- catch (DatabaseException e)
- {
- logger.traceException(e);
- }
- finally
- {
- dbCloseLock.readLock().unlock();
- }
- return 0;
- }
-
- /**
- * Read the last change number from the database.
- *
- * @return the last change number.
- * @throws ChangelogException
- * if a database problem occurred
- */
- public ChangeNumberIndexRecord readLastRecord() throws ChangelogException
- {
- try
- {
- dbCloseLock.readLock().lock();
- Cursor cursor = null;
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return null;
- }
-
- cursor = db.openCursor(null, null);
- ReplicationDraftCNKey key = new ReplicationDraftCNKey();
- DatabaseEntry entry = new DatabaseEntry();
- if (cursor.getLast(key, entry, LockMode.DEFAULT) != SUCCESS)
- {
- return null;
- }
-
- return newCNIndexRecord(key, entry);
- }
- finally
- {
- closeLockedCursor(cursor);
- }
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return getClass().getSimpleName();
- }
-
- /**
- * This Class implements a cursor that can be used to browse the database.
- */
- public class DraftCNDBCursor implements Closeable
- {
- private final Cursor cursor;
-
- /**
- * The transaction that will protect the actions done with the cursor.
- * Will be let null for a read cursor.
- * Will be set non null for a write cursor.
- */
- private final Transaction txn;
- private final ReplicationDraftCNKey key;
- private final DatabaseEntry entry = new DatabaseEntry();
- private ChangeNumberIndexRecord record;
- private boolean isClosed;
-
-
- /**
- * Creates a cursor that can be used for browsing the db.
- *
- * @param startChangeNumber
- * the change number from which the cursor must start.
- * @throws ChangelogException
- * If a database problem happened
- */
- private DraftCNDBCursor(long startChangeNumber) throws ChangelogException
- {
- this.key = new ReplicationDraftCNKey(startChangeNumber);
-
- // Take the lock. From now on, whatever error that happen in the life
- // of this cursor should end by unlocking that lock. We must also
- // unlock it when throwing an exception.
- dbCloseLock.readLock().lock();
-
- boolean cursorHeld = false;
- Cursor localCursor = null;
- try
- {
- // If the DB has been closed then create empty cursor.
- if (isDBClosed())
- {
- isClosed = true;
- txn = null;
- cursor = null;
- return;
- }
-
- localCursor = db.openCursor(null, null);
- if (startChangeNumber >= 0)
- {
- if (localCursor.getSearchKey(key, entry, LockMode.DEFAULT) != SUCCESS)
- {
- // We could not move the cursor to the expected startChangeNumber
- if (localCursor.getSearchKeyRange(key, entry, DEFAULT) != SUCCESS)
- {
- // We could not even move the cursor close to it
- // => return an empty cursor
- isClosed = true;
- txn = null;
- cursor = null;
- return;
- }
-
- if (localCursor.getPrev(key, entry, LockMode.DEFAULT) != SUCCESS)
- {
- localCursor.close();
- localCursor = db.openCursor(null, null);
- }
- else
- {
- record = newCNIndexRecord(this.key, entry);
- }
- }
- else
- {
- record = newCNIndexRecord(this.key, entry);
- }
- }
-
- this.txn = null;
- this.cursor = localCursor;
- cursorHeld = true;
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- finally
- {
- if (!cursorHeld)
- {
- // Do not keep a readLock on the DB when this class does not hold a DB
- // cursor. Either an exception was thrown or no cursor could be opened
- // for some reason.
- closeLockedCursor(localCursor);
- }
- }
- }
-
- private DraftCNDBCursor() throws ChangelogException
- {
- Transaction localTxn = null;
- Cursor localCursor = null;
-
- this.key = new ReplicationDraftCNKey();
-
- // We'll go on only if no close or no clear is running
- boolean cursorHeld = false;
- dbCloseLock.readLock().lock();
- try
- {
- // If the DB has been closed then create empty cursor.
- if (isDBClosed())
- {
- isClosed = true;
- txn = null;
- cursor = null;
- return;
- }
-
- // Create the transaction that will protect whatever done with this
- // write cursor.
- localTxn = dbenv.beginTransaction();
- localCursor = db.openCursor(localTxn, null);
-
- this.txn = localTxn;
- this.cursor = localCursor;
- cursorHeld = true;
- }
- catch (DatabaseException e)
- {
- logger.traceException(e);
- JEUtils.abort(localTxn);
- throw new ChangelogException(e);
- }
- catch (ChangelogException e)
- {
- logger.traceException(e);
- JEUtils.abort(localTxn);
- throw e;
- }
- finally
- {
- if (!cursorHeld)
- {
- // Do not keep a readLock on the DB when this class does not hold a DB
- // cursor. Either an exception was thrown or no cursor could be opened
- // for some reason.
- closeLockedCursor(localCursor);
- }
- }
- }
-
- /**
- * Close the ReplicationServer Cursor.
- */
- @Override
- public void close()
- {
- synchronized (this)
- {
- if (isClosed)
- {
- return;
- }
- isClosed = true;
- }
-
- closeLockedCursor(cursor);
-
- if (txn != null)
- {
- try
- {
- txn.commit();
- }
- catch (DatabaseException e)
- {
- dbenv.shutdownOnException(e);
- }
- }
- }
-
- /**
- * Abort the Cursor after a DatabaseException.
- * This method catch and ignore the DatabaseException because
- * this must be done when aborting a cursor after a DatabaseException
- * (per the Cursor documentation).
- * This should not be used in any other case.
- */
- public void abort()
- {
- synchronized (this)
- {
- if (isClosed)
- {
- return;
- }
- isClosed = true;
- }
-
- closeLockedCursor(cursor);
- JEUtils.abort(txn);
- }
-
- /**
- * Returns the {@link ChangeNumberIndexRecord} at the current position of
- * the cursor.
- *
- * @return The current {@link ChangeNumberIndexRecord}.
- */
- public ChangeNumberIndexRecord currentRecord()
- {
- if (isClosed)
- {
- return null;
- }
- return record;
- }
-
- /**
- * Go to the next record on the cursor.
- *
- * @return the next record on this cursor.
- * @throws ChangelogException
- * If a database problem happened
- */
- public boolean next() throws ChangelogException
- {
- // first wipe old entry
- record = null;
- if (isClosed)
- {
- return false;
- }
-
- try {
- OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- record = newCNIndexRecord(this.key, entry);
- return true;
- }
- return false;
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- /**
- * Delete the record at the current cursor position.
- *
- * @throws ChangelogException
- * If a database problem happened
- */
- public void delete() throws ChangelogException
- {
- if (isClosed)
- {
- throw new IllegalStateException("DraftCNDB already closed");
- }
-
- try
- {
- cursor.delete();
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return getClass().getSimpleName() + " currentRecord=" + record;
- }
- }
-
- /**
- * Clears this change DB from the changes it contains.
- *
- * @throws ChangelogException
- * If a database problem happened
- */
- public void clear() throws ChangelogException
- {
- // The coming users will be blocked until the clear is done
- dbCloseLock.writeLock().lock();
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return;
- }
-
- final Database oldDb = db;
- db = null; // In case there's a failure between here and recreation.
- dbenv.clearDb(oldDb);
-
- // RE-create the db
- db = dbenv.getOrCreateCNIndexDB();
- }
- catch(Exception e)
- {
- logger.error(ERR_ERROR_CLEARING_DB, this, e.getMessage() + " " + stackTraceToSingleLineString(e));
- }
- finally
- {
- // Relax the waiting users
- dbCloseLock.writeLock().unlock();
- }
- }
-
- /**
- * Returns {@code true} if the DB is closed. This method assumes that either
- * the db read/write lock has been taken.
- *
- * @return {@code true} if the DB is closed.
- */
- private boolean isDBClosed()
- {
- return db == null || !db.getEnvironment().isValid();
- }
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/DraftCNData.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/DraftCNData.java
deleted file mode 100644
index 4eb3848..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/DraftCNData.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2010-2015 ForgeRock AS.
- */
-package org.opends.server.replication.server.changelog.je;
-
-import static org.opends.server.util.StaticUtils.*;
-
-import java.io.UnsupportedEncodingException;
-
-import org.forgerock.i18n.LocalizableMessage;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
-
-import com.sleepycat.je.DatabaseEntry;
-
-/**
- * Subclass of DatabaseEntry used for data stored in the DraftCNDB.
- */
-public class DraftCNData extends DatabaseEntry
-{
- private static final String FIELD_SEPARATOR = "!";
- private static final String EMPTY_STRING_PREVIOUS_COOKIE = "";
- private static final long serialVersionUID = 1L;
-
- private long changeNumber;
- private ChangeNumberIndexRecord record;
-
- /**
- * Creates a record to be stored in the DraftCNDB.
- *
- * @param changeNumber
- * the change number
- * @param baseDN
- * The baseDN (domain DN)
- * @param csn
- * The replication CSN
- */
- public DraftCNData(long changeNumber, DN baseDN, CSN csn)
- {
- this.changeNumber = changeNumber;
- // Although the previous cookie is not used any more, we need
- // to keep it in database for compatibility with previous versions
- String record = EMPTY_STRING_PREVIOUS_COOKIE + FIELD_SEPARATOR + baseDN + FIELD_SEPARATOR + csn;
- setData(getBytes(record));
- }
-
- /**
- * Creates a record to be stored in the DraftCNDB from the provided byte[].
- *
- * @param changeNumber
- * the change number
- * @param data
- * the provided byte[]
- * @throws ChangelogException
- * if a database problem occurred
- */
- public DraftCNData(long changeNumber, byte[] data) throws ChangelogException
- {
- this.changeNumber = changeNumber;
- this.record = decodeData(changeNumber, data);
- }
-
- /**
- * Decode and returns a {@link ChangeNumberIndexRecord}.
- *
- * @param changeNumber
- * @param data
- * the provided byte array.
- * @return the decoded {@link ChangeNumberIndexRecord}
- * @throws ChangelogException
- * when a problem occurs.
- */
- private ChangeNumberIndexRecord decodeData(long changeNumber, byte[] data)
- throws ChangelogException
- {
- try
- {
- // Although the previous cookie is not used any more, we need
- // to keep it in database for compatibility with previous versions
- String stringData = new String(data, "UTF-8");
- String[] str = stringData.split(FIELD_SEPARATOR, 3);
- // str[0] contains previous cookie and is ignored
- final DN baseDN = DN.valueOf(str[1]);
- final CSN csn = new CSN(str[2]);
- return new ChangeNumberIndexRecord(changeNumber, baseDN, csn);
- }
- catch (UnsupportedEncodingException e)
- {
- // should never happens
- // TODO: i18n
- throw new ChangelogException(LocalizableMessage.raw("need UTF-8 support"));
- }
- catch (DirectoryException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- /**
- * Getter for the decoded record.
- *
- * @return the {@link ChangeNumberIndexRecord} record.
- * @throws ChangelogException
- * when a problem occurs.
- */
- public ChangeNumberIndexRecord getRecord() throws ChangelogException
- {
- if (record == null)
- {
- record = decodeData(changeNumber, getData());
- }
- return record;
- }
-
- /**
- * Provide a string representation of these data.
- * @return the string representation of these data.
- */
- @Override
- public String toString()
- {
- return "DraftCNData : [" + record + "]";
- }
-
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
deleted file mode 100644
index b247f04..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import static org.opends.messages.ReplicationMessages.*;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.opendj.config.server.ConfigException;
-import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.MonitorProvider;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.server.changelog.api.*;
-import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
-import org.opends.server.types.*;
-
-/**
- * This class is used for managing the replicationServer database for each
- * server in the topology. It is responsible for efficiently saving the updates
- * that is received from each master server into stable storage. This class is
- * also able to generate a {@link DBCursor} that can be used to read all changes
- * from a given change number.
- * <p>
- * This class publishes some monitoring information below <code>
- * cn=monitor</code>.
- */
-public class JEChangeNumberIndexDB implements ChangeNumberIndexDB
-{
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
- private static final int NO_KEY = 0;
-
- private DraftCNDB db;
- /**
- * The newest changenumber stored in the DB. It is used to avoid purging the
- * record with the newest changenumber. The newest record in the changenumber
- * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
- * then retrieved on server startup.
- */
- private volatile long newestChangeNumber = NO_KEY;
- /**
- * The last generated value for the change number. It is kept separate from
- * the {@link #newestChangeNumber} because there is an opportunity for a race
- * condition between:
- * <ol>
- * <li>this atomic long being incremented for a new record ('recordB')</li>
- * <li>the current newest record ('recordA') being purged from the DB</li>
- * <li>'recordB' failing to be inserted in the DB</li>
- * </ol>
- */
- private final AtomicLong lastGeneratedChangeNumber;
- private DbMonitorProvider dbMonitor = new DbMonitorProvider();
- private final AtomicBoolean shutdown = new AtomicBoolean(false);
-
-
- /**
- * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
- *
- * @param dbEnv the Database Env to use to create the ReplicationServer DB.
- * server for this domain.
- * @throws ChangelogException If a database problem happened
- */
- public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
- {
- db = new DraftCNDB(dbEnv);
- final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
- newestChangeNumber = getChangeNumber(newestRecord);
- // initialization of the lastGeneratedChangeNumber from the DB content
- // if DB is empty => last record does not exist => default to 0
- lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber);
-
- // Monitoring registration
- DirectoryServer.deregisterMonitorProvider(dbMonitor);
- DirectoryServer.registerMonitorProvider(dbMonitor);
- }
-
- private long getChangeNumber(ChangeNumberIndexRecord record)
- throws ChangelogException
- {
- if (record != null)
- {
- return record.getChangeNumber();
- }
- return NO_KEY;
- }
-
- /** {@inheritDoc} */
- @Override
- public long addRecord(ChangeNumberIndexRecord record)
- throws ChangelogException
- {
- long changeNumber = nextChangeNumber();
- final ChangeNumberIndexRecord newRecord =
- new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN());
- db.addRecord(newRecord);
- newestChangeNumber = changeNumber;
-
- logger.trace("In JEChangeNumberIndexDB.add, added: %s", newRecord);
- return changeNumber;
- }
-
- /** {@inheritDoc} */
- @Override
- public ChangeNumberIndexRecord getOldestRecord() throws ChangelogException
- {
- return db.readFirstRecord();
- }
-
- /** {@inheritDoc} */
- @Override
- public ChangeNumberIndexRecord getNewestRecord() throws ChangelogException
- {
- return db.readLastRecord();
- }
-
- private long nextChangeNumber()
- {
- return lastGeneratedChangeNumber.incrementAndGet();
- }
-
- /** {@inheritDoc} */
- @Override
- public long getLastGeneratedChangeNumber()
- {
- return lastGeneratedChangeNumber.get();
- }
-
- /**
- * Get the number of changes.
- * @return Returns the number of changes.
- */
- public long count()
- {
- return db.count();
- }
-
- /**
- * Returns whether this database is empty.
- *
- * @return <code>true</code> if this database is empty, <code>false</code>
- * otherwise
- * @throws ChangelogException
- * if a database problem occurs.
- */
- public boolean isEmpty() throws ChangelogException
- {
- return getNewestRecord() == null;
- }
-
- /** {@inheritDoc} */
- @Override
- public DBCursor<ChangeNumberIndexRecord> getCursorFrom(long startChangeNumber)
- throws ChangelogException
- {
- return new JEChangeNumberIndexDBCursor(db, startChangeNumber);
- }
-
- /**
- * Shutdown this DB.
- */
- public void shutdown()
- {
- if (shutdown.compareAndSet(false, true))
- {
- db.shutdown();
- DirectoryServer.deregisterMonitorProvider(dbMonitor);
- }
- }
-
- /**
- * Synchronously purges the change number index DB up to and excluding the
- * provided timestamp.
- *
- * @param purgeCSN
- * the timestamp up to which purging must happen
- * @return the oldest non purged CSN.
- * @throws ChangelogException
- * if a database problem occurs.
- */
- public CSN purgeUpTo(CSN purgeCSN) throws ChangelogException
- {
- if (isEmpty() || purgeCSN == null)
- {
- return null;
- }
-
- final DraftCNDBCursor cursor = db.openDeleteCursor();
- try
- {
- while (!mustShutdown(shutdown) && cursor.next())
- {
- final ChangeNumberIndexRecord record = cursor.currentRecord();
- if (record.getChangeNumber() != newestChangeNumber
- && record.getCSN().isOlderThan(purgeCSN))
- {
- cursor.delete();
- }
- else
- {
- // 1- Current record is not old enough to purge.
- // 2- Do not purge the newest record to avoid having the last
- // generated changenumber dropping back to 0 when the server restarts
- return record.getCSN();
- }
- }
-
- return null;
- }
- catch (ChangelogException e)
- {
- cursor.abort();
- throw e;
- }
- catch (Exception e)
- {
- cursor.abort();
- throw new ChangelogException(e);
- }
- finally
- {
- cursor.close();
- }
- }
-
- /**
- * Clear the changes from this DB (from both memory cache and DB storage) for
- * the provided baseDN.
- *
- * @param baseDNToClear
- * The baseDN for which we want to remove all records from this DB,
- * null means all.
- * @throws ChangelogException
- * if a database problem occurs.
- */
- public void removeDomain(DN baseDNToClear) throws ChangelogException
- {
- if (isEmpty())
- {
- return;
- }
-
- final DraftCNDBCursor cursor = db.openDeleteCursor();
- try
- {
- while (!mustShutdown(shutdown) && cursor.next())
- {
- final ChangeNumberIndexRecord record = cursor.currentRecord();
- if (record.getChangeNumber() == newestChangeNumber)
- {
- // do not purge the newest record to avoid having the last generated
- // changenumber dropping back to 0 if the server restarts
- return;
- }
-
- if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear))
- {
- cursor.delete();
- }
- }
- }
- catch (ChangelogException e)
- {
- cursor.abort();
- throw e;
- }
- finally
- {
- cursor.close();
- }
- }
-
- private boolean mustShutdown(AtomicBoolean shutdown)
- {
- return shutdown != null && shutdown.get();
- }
-
- /**
- * This internal class is used to implement the Monitoring capabilities of the
- * JEChangeNumberIndexDB.
- */
- private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
- {
- /** {@inheritDoc} */
- @Override
- public List<Attribute> getMonitorData()
- {
- List<Attribute> attributes = new ArrayList<>();
- attributes.add(createChangeNumberAttribute(true));
- attributes.add(createChangeNumberAttribute(false));
- attributes.add(Attributes.create("count", Long.toString(count())));
- return attributes;
- }
-
- private Attribute createChangeNumberAttribute(boolean isFirst)
- {
- final String attributeName =
- isFirst ? "first-draft-changenumber" : "last-draft-changenumber";
- final String changeNumber = String.valueOf(readChangeNumber(isFirst));
- return Attributes.create(attributeName, changeNumber);
- }
-
- private long readChangeNumber(boolean isFirst)
- {
- try
- {
- return getChangeNumber(
- isFirst ? db.readFirstRecord() : db.readLastRecord());
- }
- catch (ChangelogException e)
- {
- logger.traceException(e);
- return NO_KEY;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String getMonitorInstanceName()
- {
- return "ChangeNumber Index Database";
- }
-
- /** {@inheritDoc} */
- @Override
- public void initializeMonitorProvider(MonitorProviderCfg configuration)
- throws ConfigException,InitializationException
- {
- // Nothing to do for now
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return getClass().getSimpleName() + ", newestChangeNumber="
- + newestChangeNumber;
- }
-
- /**
- * Clear the changes from this DB (from both memory cache and DB storage).
- *
- * @throws ChangelogException
- * if a database problem occurs.
- */
- public void clear() throws ChangelogException
- {
- db.clear();
- newestChangeNumber = NO_KEY;
- }
-
- @Override
- public void resetChangeNumberTo(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException
- {
- throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_UNSUPPORTED.get());
- }
-
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java
deleted file mode 100644
index 174d0f5..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBCursor.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.opends.server.replication.server.changelog.api.*;
-import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
-
-/**
- * This class allows to iterate through the changes comming from the change number index DB.
- *
- * \@NotThreadSafe
- */
-public class JEChangeNumberIndexDBCursor implements
- DBCursor<ChangeNumberIndexRecord>
-{
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
- private DraftCNDBCursor draftCNDbCursor;
-
- /**
- * As underlying cursor is already pointing to a record at start, this
- * indicator allow to shift the pointed record at initialization time.
- */
- private boolean isInitialized;
-
- /**
- * Creates a new DB Cursor. All created iterator must be released by
- * the caller using the {@link #close()} method.
- *
- * @param db
- * The db where the iterator must be created.
- * @param startChangeNumber
- * The change number after which the iterator must start.
- * @throws ChangelogException
- * If a database problem happened.
- */
- public JEChangeNumberIndexDBCursor(DraftCNDB db, long startChangeNumber)
- throws ChangelogException
- {
- draftCNDbCursor = db.openReadCursor(startChangeNumber);
- }
-
- /** {@inheritDoc} */
- @Override
- public ChangeNumberIndexRecord getRecord()
- {
- try
- {
- return isInitialized ? draftCNDbCursor.currentRecord() : null;
- }
- catch (Exception e)
- {
- logger.traceException(e);
- return null;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean next() throws ChangelogException
- {
- if (draftCNDbCursor != null)
- {
- if (!isInitialized)
- {
- isInitialized = true;
- return draftCNDbCursor.currentRecord() != null;
- }
- else
- {
- return draftCNDbCursor.next();
- }
- }
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
- public void close()
- {
- synchronized (this)
- {
- if (draftCNDbCursor != null)
- {
- draftCNDbCursor.close();
- draftCNDbCursor = null;
- }
- }
- }
-
- /**
- * Called by the Gc when the object is garbage collected
- * Release the cursor in case the iterator was badly used and releaseCursor
- * was never called.
- */
- @Override
- protected void finalize()
- {
- close();
- }
-
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
deleted file mode 100644
index 864060c..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ /dev/null
@@ -1,1028 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Portions Copyright 2013-2015 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.*;
-
-import java.io.File;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.forgerock.i18n.LocalizableMessageBuilder;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.opendj.config.server.ConfigException;
-import org.forgerock.util.Pair;
-import org.opends.server.admin.std.server.ReplicationServerCfg;
-import org.opends.server.api.DirectoryThread;
-import org.opends.server.backends.ChangelogBackend;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.server.ChangelogState;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
-import org.opends.server.replication.server.changelog.api.ChangelogDB;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
-import org.opends.server.replication.server.changelog.api.ReplicaId;
-import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
-import org.opends.server.replication.server.changelog.file.ChangeNumberIndexer;
-import org.opends.server.replication.server.changelog.file.DomainDBCursor;
-import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor;
-import org.opends.server.replication.server.changelog.file.ReplicaCursor;
-import org.opends.server.types.DN;
-import org.opends.server.util.StaticUtils;
-import org.opends.server.util.TimeThread;
-
-/**
- * JE implementation of the ChangelogDB interface.
- */
-public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB
-{
- /** The tracer object for the debug logger. */
- protected static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- /**
- * This map contains the List of updates received from each LDAP server.
- * <p>
- * When removing a domainMap, code:
- * <ol>
- * <li>first get the domainMap</li>
- * <li>synchronized on the domainMap</li>
- * <li>remove the domainMap</li>
- * <li>then check it's not null</li>
- * <li>then close all inside</li>
- * </ol>
- * When creating a replicaDB, synchronize on the domainMap to avoid
- * concurrent shutdown.
- */
- private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>> domainToReplicaDBs =
- new ConcurrentHashMap<>();
- private final ConcurrentSkipListMap<DN, CopyOnWriteArrayList<DomainDBCursor>> registeredDomainCursors =
- new ConcurrentSkipListMap<>();
- private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors = new CopyOnWriteArrayList<>();
- private final ConcurrentSkipListMap<ReplicaId, CopyOnWriteArrayList<ReplicaCursor>> replicaCursors =
- new ConcurrentSkipListMap<>();
- private ReplicationDbEnv replicationEnv;
- private final ReplicationServerCfg config;
- private final File dbDirectory;
-
- /**
- * The handler of the changelog database, the database stores the relation
- * between a change number and the associated cookie.
- * <p>
- * @GuardedBy("cnIndexDBLock")
- */
- private JEChangeNumberIndexDB cnIndexDB;
- private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<>();
-
- /** Used for protecting {@link ChangeNumberIndexDB} related state. */
- private final Object cnIndexDBLock = new Object();
-
- /**
- * The purge delay (in milliseconds). Records in the changelog DB that are
- * older than this delay might be removed.
- */
- private volatile long purgeDelayInMillis;
- private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<>();
-
- /** The local replication server. */
- private final ReplicationServer replicationServer;
- private final AtomicBoolean shutdown = new AtomicBoolean();
-
- private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
- new DBCursor<UpdateMsg>()
- {
-
- @Override
- public boolean next()
- {
- return false;
- }
-
- @Override
- public UpdateMsg getRecord()
- {
- return null;
- }
-
- @Override
- public void close()
- {
- // empty
- }
-
- @Override
- public String toString()
- {
- return "EmptyDBCursor<UpdateMsg>";
- }
- };
-
- /**
- * Creates a new changelog DB.
- *
- * @param replicationServer
- * the local replication server.
- * @param config
- * the replication server configuration
- * @throws ConfigException
- * if a problem occurs opening the supplied directory
- */
- public JEChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
- throws ConfigException
- {
- this.config = config;
- this.replicationServer = replicationServer;
- this.dbDirectory = makeDir(config.getReplicationDBDirectory());
- }
-
- private File makeDir(final String dbDirName) throws ConfigException
- {
- // Check that this path exists or create it.
- final File dbDirectory = getFileForPath(dbDirName);
- try
- {
- if (!dbDirectory.exists())
- {
- dbDirectory.mkdir();
- }
- return dbDirectory;
- }
- catch (Exception e)
- {
- final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(
- e.getLocalizedMessage()).append(" ").append(String.valueOf(dbDirectory));
- throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
- }
- }
-
- private Map<Integer, JEReplicaDB> getDomainMap(final DN baseDN)
- {
- final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
- if (domainMap != null)
- {
- return domainMap;
- }
- return Collections.emptyMap();
- }
-
- private JEReplicaDB getReplicaDB(final DN baseDN, final int serverId)
- {
- return getDomainMap(baseDN).get(serverId);
- }
-
- /**
- * Returns a {@link JEReplicaDB}, possibly creating it.
- *
- * @param baseDN
- * the baseDN for which to create a ReplicaDB
- * @param serverId
- * the serverId for which to create a ReplicaDB
- * @param server
- * the ReplicationServer
- * @return a Pair with the JEReplicaDB and a boolean indicating whether it has been created
- * @throws ChangelogException
- * if a problem occurred with the database
- */
- Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
- final ReplicationServer server) throws ChangelogException
- {
- while (!shutdown.get())
- {
- final ConcurrentMap<Integer, JEReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
- final Pair<JEReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
- if (result != null)
- {
- final Boolean dbWasCreated = result.getSecond();
- if (dbWasCreated)
- { // new replicaDB => update all cursors with it
- final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
- if (cursors != null && !cursors.isEmpty())
- {
- for (DomainDBCursor cursor : cursors)
- {
- cursor.addReplicaDB(serverId, null);
- }
- }
- }
-
- return result;
- }
- }
- throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
- }
-
- private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
- {
- // happy path: the domainMap already exists
- final ConcurrentMap<Integer, JEReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
- if (currentValue != null)
- {
- return currentValue;
- }
-
- // unlucky, the domainMap does not exist: take the hit and create the
- // newValue, even though the same could be done concurrently by another thread
- final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<>();
- final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
- if (previousValue != null)
- {
- // there was already a value associated to the key, let's use it
- return previousValue;
- }
-
- // we just created a new domain => update all cursors
- for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
- {
- cursor.addDomain(baseDN, null);
- }
- return newValue;
- }
-
- private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, JEReplicaDB> domainMap,
- final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
- {
- // happy path: the replicaDB already exists
- JEReplicaDB currentValue = domainMap.get(serverId);
- if (currentValue != null)
- {
- return Pair.of(currentValue, false);
- }
-
- // unlucky, the replicaDB does not exist: take the hit and synchronize
- // on the domainMap to create a new ReplicaDB
- synchronized (domainMap)
- {
- // double-check
- currentValue = domainMap.get(serverId);
- if (currentValue != null)
- {
- return Pair.of(currentValue, false);
- }
-
- if (domainToReplicaDBs.get(baseDN) != domainMap)
- {
- // The domainMap could have been concurrently removed because
- // 1) a shutdown was initiated or 2) an initialize was called.
- // Return will allow the code to:
- // 1) shutdown properly or 2) lazily recreate the replicaDB
- return null;
- }
-
- final JEReplicaDB newDB = new JEReplicaDB(serverId, baseDN, server, replicationEnv);
- domainMap.put(serverId, newDB);
- return Pair.of(newDB, true);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void initializeDB()
- {
- try
- {
- final File dbDir = getFileForPath(config.getReplicationDBDirectory());
- replicationEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
- final ChangelogState changelogState = replicationEnv.getChangelogState();
- initializeToChangelogState(changelogState);
- if (config.isComputeChangeNumber())
- {
- startIndexer();
- }
- setPurgeDelay(replicationServer.getPurgeDelay());
- }
- catch (ChangelogException e)
- {
- logger.traceException(e);
- logger.error(ERR_COULD_NOT_READ_DB, this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage());
- }
- }
-
- private void initializeToChangelogState(final ChangelogState changelogState)
- throws ChangelogException
- {
- for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet())
- {
- replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
- }
- for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
- {
- for (int serverId : entry.getValue())
- {
- getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
- }
- }
- }
-
- private void shutdownChangeNumberIndexDB() throws ChangelogException
- {
- synchronized (cnIndexDBLock)
- {
- if (cnIndexDB != null)
- {
- cnIndexDB.shutdown();
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void shutdownDB() throws ChangelogException
- {
- if (!this.shutdown.compareAndSet(false, true))
- { // shutdown has already been initiated
- return;
- }
-
- // Remember the first exception because :
- // - we want to try to remove everything we want to remove
- // - then throw the first encountered exception
- ChangelogException firstException = null;
-
- final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
- if (indexer != null)
- {
- indexer.initiateShutdown();
- }
- final ChangelogDBPurger purger = cnPurger.getAndSet(null);
- if (purger != null)
- {
- purger.initiateShutdown();
- }
-
- try
- {
- shutdownChangeNumberIndexDB();
- }
- catch (ChangelogException e)
- {
- firstException = e;
- }
-
- for (Iterator<ConcurrentMap<Integer, JEReplicaDB>> it =
- this.domainToReplicaDBs.values().iterator(); it.hasNext();)
- {
- final ConcurrentMap<Integer, JEReplicaDB> domainMap = it.next();
- synchronized (domainMap)
- {
- it.remove();
- for (JEReplicaDB replicaDB : domainMap.values())
- {
- replicaDB.shutdown();
- }
- }
- }
-
- if (replicationEnv != null)
- {
- // wait for shutdown of the threads holding cursors
- try
- {
- if (indexer != null)
- {
- indexer.join();
- }
- if (purger != null)
- {
- purger.join();
- }
- }
- catch (InterruptedException e)
- {
- // do nothing: we are already shutting down
- }
-
- replicationEnv.shutdown();
- }
-
- if (firstException != null)
- {
- throw firstException;
- }
- }
-
- /**
- * Clears all records from the changelog (does not remove the changelog itself).
- *
- * @throws ChangelogException
- * If an error occurs when clearing the changelog.
- */
- public void clearDB() throws ChangelogException
- {
- if (!dbDirectory.exists())
- {
- return;
- }
-
- // Remember the first exception because :
- // - we want to try to remove everything we want to remove
- // - then throw the first encountered exception
- ChangelogException firstException = null;
-
- for (DN baseDN : this.domainToReplicaDBs.keySet())
- {
- removeDomain(baseDN);
- }
-
- synchronized (cnIndexDBLock)
- {
- if (cnIndexDB != null)
- {
- try
- {
- cnIndexDB.clear();
- }
- catch (ChangelogException e)
- {
- firstException = e;
- }
-
- try
- {
- shutdownChangeNumberIndexDB();
- }
- catch (ChangelogException e)
- {
- if (firstException == null)
- {
- firstException = e;
- }
- else
- {
- logger.traceException(e);
- }
- }
-
- cnIndexDB = null;
- }
- }
-
- if (firstException != null)
- {
- throw firstException;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void removeDB() throws ChangelogException
- {
- shutdownDB();
- StaticUtils.recursiveDelete(dbDirectory);
- }
-
- /** {@inheritDoc} */
- @Override
- public ServerState getDomainOldestCSNs(DN baseDN)
- {
- final ServerState result = new ServerState();
- for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
- {
- result.update(replicaDB.getOldestCSN());
- }
- return result;
- }
-
- /** {@inheritDoc} */
- @Override
- public ServerState getDomainNewestCSNs(DN baseDN)
- {
- final ServerState result = new ServerState();
- for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
- {
- result.update(replicaDB.getNewestCSN());
- }
- return result;
- }
-
- /** {@inheritDoc} */
- @Override
- public void removeDomain(DN baseDN) throws ChangelogException
- {
- // Remember the first exception because :
- // - we want to try to remove everything we want to remove
- // - then throw the first encountered exception
- ChangelogException firstException = null;
-
- // 1- clear the replica DBs
- Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
- if (domainMap != null)
- {
- final ChangeNumberIndexer indexer = this.cnIndexer.get();
- if (indexer != null)
- {
- indexer.clear(baseDN);
- }
- synchronized (domainMap)
- {
- domainMap = domainToReplicaDBs.remove(baseDN);
- for (JEReplicaDB replicaDB : domainMap.values())
- {
- try
- {
- replicaDB.clear();
- }
- catch (ChangelogException e)
- {
- firstException = e;
- }
- replicaDB.shutdown();
- }
- }
- }
-
- // 2- clear the ChangeNumber index DB
- synchronized (cnIndexDBLock)
- {
- if (cnIndexDB != null)
- {
- try
- {
- cnIndexDB.removeDomain(baseDN);
- }
- catch (ChangelogException e)
- {
- if (firstException == null)
- {
- firstException = e;
- }
- else
- {
- logger.traceException(e);
- }
- }
- }
- }
-
- // 3- clear the changelogstate DB
- try
- {
- replicationEnv.clearGenerationId(baseDN);
- }
- catch (ChangelogException e)
- {
- if (firstException == null)
- {
- firstException = e;
- }
- else
- {
- logger.traceException(e);
- }
- }
-
- if (firstException != null)
- {
- throw firstException;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void setPurgeDelay(final long purgeDelayInMillis)
- {
- this.purgeDelayInMillis = purgeDelayInMillis;
- if (purgeDelayInMillis > 0)
- {
- final ChangelogDBPurger newPurger = new ChangelogDBPurger();
- if (cnPurger.compareAndSet(null, newPurger))
- { // no purger was running, run this new one
- newPurger.start();
- }
- else
- { // a purger was already running, just wake that one up
- // to verify if some entries can be purged with the new purge delay
- final ChangelogDBPurger currentPurger = cnPurger.get();
- synchronized (currentPurger)
- {
- currentPurger.notify();
- }
- }
- }
- else
- {
- final ChangelogDBPurger purgerToStop = cnPurger.getAndSet(null);
- if (purgerToStop != null)
- { // stop this purger
- purgerToStop.initiateShutdown();
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public void setComputeChangeNumber(final boolean computeChangeNumber)
- throws ChangelogException
- {
- if (computeChangeNumber)
- {
- startIndexer();
- }
- else
- {
- final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
- if (indexer != null)
- {
- indexer.initiateShutdown();
- }
- }
- }
-
- private void startIndexer()
- {
- final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv);
- if (cnIndexer.compareAndSet(null, indexer))
- {
- indexer.start();
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public ChangeNumberIndexDB getChangeNumberIndexDB()
- {
- synchronized (cnIndexDBLock)
- {
- if (cnIndexDB == null)
- {
- try
- {
- cnIndexDB = new JEChangeNumberIndexDB(replicationEnv);
- }
- catch (Exception e)
- {
- logger.traceException(e);
- logger.error(ERR_CHANGENUMBER_DATABASE, e.getLocalizedMessage());
- }
- }
- return cnIndexDB;
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public ReplicationDomainDB getReplicationDomainDB()
- {
- return this;
- }
-
- /** {@inheritDoc} */
- @Override
- public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, CursorOptions options)
- throws ChangelogException
- {
- final Set<DN> excludedDomainDns = Collections.emptySet();
- return getCursorFrom(startState, options, excludedDomainDns);
- }
-
- /** {@inheritDoc} */
- @Override
- public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
- CursorOptions options, final Set<DN> excludedDomainDns) throws ChangelogException
- {
- final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, options);
- registeredMultiDomainCursors.add(cursor);
- for (DN baseDN : domainToReplicaDBs.keySet())
- {
- if (!excludedDomainDns.contains(baseDN))
- {
- cursor.addDomain(baseDN, startState.getServerState(baseDN));
- }
- }
- return cursor;
- }
-
- /** {@inheritDoc} */
- @Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, CursorOptions options)
- throws ChangelogException
- {
- final DomainDBCursor cursor = newDomainDBCursor(baseDN, options);
- for (int serverId : getDomainMap(baseDN).keySet())
- {
- // get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
- cursor.addReplicaDB(serverId, lastCSN);
- }
- return cursor;
- }
-
- private DomainDBCursor newDomainDBCursor(final DN baseDN, CursorOptions options)
- {
- final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, options);
- putCursor(registeredDomainCursors, baseDN, cursor);
- return cursor;
- }
-
- private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
- {
- final MultiDomainServerState offlineReplicas =
- replicationEnv.getChangelogState().getOfflineReplicas();
- final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
- if (offlineCSN != null
- && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
- {
- return offlineCSN;
- }
- return null;
- }
-
- @Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
- CursorOptions options) throws ChangelogException
- {
- final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
- if (replicaDB != null)
- {
- final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN();
- final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
- actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
- final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN);
- final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
- final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
-
- putCursor(replicaCursors, replicaId, replicaCursor);
-
- return replicaCursor;
- }
- return EMPTY_CURSOR_REPLICA_DB;
- }
-
- private <K, V> void putCursor(ConcurrentSkipListMap<K, CopyOnWriteArrayList<V>> map, final K key, final V cursor)
- {
- CopyOnWriteArrayList<V> cursors = map.get(key);
- if (cursors == null)
- {
- cursors = new CopyOnWriteArrayList<>();
- CopyOnWriteArrayList<V> previousValue = map.putIfAbsent(key, cursors);
- if (previousValue != null)
- {
- cursors = previousValue;
- }
- }
- cursors.add(cursor);
- }
-
- /** {@inheritDoc} */
- @Override
- public void unregisterCursor(final DBCursor<?> cursor)
- {
- if (cursor instanceof MultiDomainDBCursor)
- {
- registeredMultiDomainCursors.remove(cursor);
- }
- else if (cursor instanceof DomainDBCursor)
- {
- final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
- final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
- if (cursors != null)
- {
- cursors.remove(cursor);
- }
- }
- else if (cursor instanceof ReplicaCursor)
- {
- final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
- final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaId());
- if (cursors != null)
- {
- cursors.remove(cursor);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
- {
- final CSN csn = updateMsg.getCSN();
- final Pair<JEReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
- csn.getServerId(), replicationServer);
- final JEReplicaDB replicaDB = pair.getFirst();
- replicaDB.add(updateMsg);
-
- ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg);
-
- final ChangeNumberIndexer indexer = cnIndexer.get();
- if (indexer != null)
- {
- notifyReplicaOnline(indexer, baseDN, csn.getServerId());
- indexer.publishUpdateMsg(baseDN, updateMsg);
- }
- return pair.getSecond(); // replica DB was created
- }
-
- /** {@inheritDoc} */
- @Override
- public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
- {
- final ChangeNumberIndexer indexer = cnIndexer.get();
- if (indexer != null)
- {
- notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
- indexer.publishHeartbeat(baseDN, heartbeatCSN);
- }
- }
-
- private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
- throws ChangelogException
- {
- if (indexer.isReplicaOffline(baseDN, serverId))
- {
- replicationEnv.notifyReplicaOnline(baseDN, serverId);
- }
- updateCursorsWithOfflineCSN(baseDN, serverId, null);
- }
-
- /** {@inheritDoc} */
- @Override
- public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
- {
- replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
- final ChangeNumberIndexer indexer = cnIndexer.get();
- if (indexer != null)
- {
- indexer.replicaOffline(baseDN, offlineCSN);
- }
- updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
- }
-
- private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
- {
- final List<ReplicaCursor> cursors = replicaCursors.get(ReplicaId.of(baseDN, serverId));
- if (cursors != null)
- {
- for (ReplicaCursor cursor : cursors)
- {
- cursor.setOfflineCSN(offlineCSN);
- }
- }
- }
-
- /**
- * The thread purging the changelogDB on a regular interval. Records are
- * purged from the changelogDB if they are older than a delay specified in
- * seconds. The purge process works in two steps:
- * <ol>
- * <li>first purge the changeNumberIndexDB and retrieve information to drive
- * replicaDBs purging</li>
- * <li>proceed to purge each replicaDBs based on the information collected
- * when purging the changeNumberIndexDB</li>
- * </ol>
- */
- private final class ChangelogDBPurger extends DirectoryThread
- {
- private static final int DEFAULT_SLEEP = 500;
-
- protected ChangelogDBPurger()
- {
- super("changelog DB purger");
- }
-
- /** {@inheritDoc} */
- @Override
- public void run()
- {
- // initialize CNIndexDB
- getChangeNumberIndexDB();
- while (!isShutdownInitiated())
- {
- try
- {
- final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
- final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
- final CSN oldestNotPurgedCSN;
-
- // next code assumes that the compute-change-number config
- // never changes during the life time of an RS
- if (!config.isComputeChangeNumber())
- {
- oldestNotPurgedCSN = purgeCSN;
- }
- else
- {
- final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
- if (localCNIndexDB == null)
- { // shutdown has been initiated
- return;
- }
-
- oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
- if (oldestNotPurgedCSN == null)
- { // shutdown may have been initiated...
- // ... or the change number index DB is empty,
- // wait for new changes to come in.
-
- // Note we cannot sleep for as long as the purge delay
- // (3 days default), because we might receive late updates
- // that will have to be purged before the purge delay elapses.
- // This can particularly happen in case of network partitions.
- if (!isShutdownInitiated())
- {
- synchronized (this)
- {
- if (!isShutdownInitiated())
- {
- wait(DEFAULT_SLEEP);
- }
- }
- }
- continue;
- }
- }
-
- for (final Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
- {
- for (final JEReplicaDB replicaDB : domainMap.values())
- {
- replicaDB.purgeUpTo(oldestNotPurgedCSN);
- }
- }
-
- if (!isShutdownInitiated())
- {
- synchronized (this)
- {
- if (!isShutdownInitiated())
- {
- wait(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
- }
- }
- }
- }
- catch (InterruptedException e)
- {
- // shutdown initiated?
- }
- catch (Exception e)
- {
- logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e));
- if (replicationServer != null)
- {
- replicationServer.shutdown();
- }
- }
- }
- }
-
- private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
- {
- final long nextPurgeTime = notPurgedCSN.getTime();
- final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
- if (currentPurgeTime <= nextPurgeTime)
- {
- // sleep till the next CSN to purge,
- return nextPurgeTime - currentPurgeTime;
- }
- // wait a bit before purging more
- return DEFAULT_SLEEP;
- }
-
- /** {@inheritDoc} */
- @Override
- public void initiateShutdown()
- {
- super.initiateShutdown();
- synchronized (this)
- {
- notify(); // wake up the purger thread for faster shutdown
- }
- }
- }
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
deleted file mode 100644
index a83bb0d..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.forgerock.opendj.config.server.ConfigException;
-import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.MonitorProvider;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.ReplicationServerDomain;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
-import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
-import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.DN;
-import org.opends.server.types.InitializationException;
-
-import static org.opends.messages.ReplicationMessages.*;
-
-/**
- * Represents a replication server database for one server in the topology.
- * <p>
- * It is responsible for efficiently saving the updates that is received from
- * each master server into stable storage.
- * <p>
- * It is also able to generate a {@link DBCursor} that can be used to
- * read all changes from a given {@link CSN}.
- * <p>
- * It publishes some monitoring information below cn=monitor.
- */
-class JEReplicaDB
-{
-
- /**
- * Class that allows atomically setting oldest and newest CSNs without
- * synchronization.
- *
- * @Immutable
- */
- private static final class CSNLimits
- {
- private final CSN oldestCSN;
- private final CSN newestCSN;
-
- public CSNLimits(CSN oldestCSN, CSN newestCSN)
- {
- this.oldestCSN = oldestCSN;
- this.newestCSN = newestCSN;
- }
- }
-
- private final AtomicBoolean shutdown = new AtomicBoolean(false);
- /**
- * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
- *
- * @NonNull
- */
- private volatile CSNLimits csnLimits;
- private final int serverId;
- private final DN baseDN;
- private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
- private final ReplicationServer replicationServer;
- private final ReplicationDB db;
-
- /**
- * Creates a new ReplicaDB associated to a given LDAP server.
- *
- * @param serverId
- * Id of this server.
- * @param baseDN
- * the replication domain baseDN.
- * @param replicationServer
- * The ReplicationServer that creates this ReplicaDB.
- * @param replicationEnv
- * the Database Env to use to create the ReplicationServer DB. server
- * for this domain.
- * @throws ChangelogException
- * If a database problem happened
- */
- JEReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer,
- final ReplicationDbEnv replicationEnv) throws ChangelogException
- {
- this.serverId = serverId;
- this.baseDN = baseDN;
- this.replicationServer = replicationServer;
- this.db = new ReplicationDB(serverId, baseDN, replicationServer, replicationEnv);
- this.csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
-
- DirectoryServer.deregisterMonitorProvider(dbMonitor);
- DirectoryServer.registerMonitorProvider(dbMonitor);
- }
-
- /**
- * Add an update to the list of messages that must be saved to the db managed
- * by this db handler. This method is blocking if the size of the list of
- * message is larger than its maximum.
- *
- * @param updateMsg
- * The update message that must be saved to the db managed by this db
- * handler.
- * @throws ChangelogException
- * If a database problem happened
- */
- public void add(UpdateMsg updateMsg) throws ChangelogException
- {
- if (shutdown.get())
- {
- throw new ChangelogException(
- ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg, baseDN, serverId));
- }
-
- db.addEntry(updateMsg);
-
- final CSNLimits limits = csnLimits;
- final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
- final boolean updateOld = limits.oldestCSN == null;
- if (updateOld || updateNew)
- {
- csnLimits = new CSNLimits(
- updateOld ? updateMsg.getCSN() : limits.oldestCSN,
- updateNew ? updateMsg.getCSN() : limits.newestCSN);
- }
- }
-
- /**
- * Get the oldest CSN that has not been purged yet.
- *
- * @return the oldest CSN that has not been purged yet.
- */
- CSN getOldestCSN()
- {
- return csnLimits.oldestCSN;
- }
-
- /**
- * Get the newest CSN that has not been purged yet.
- *
- * @return the newest CSN that has not been purged yet.
- */
- CSN getNewestCSN()
- {
- return csnLimits.newestCSN;
- }
-
- /**
- * Generate a new {@link DBCursor} that allows to browse the db managed by
- * this ReplicaDB and starting at the position defined by a given CSN.
- *
- * @param startCSN
- * The position where the cursor must start. If null, start from the
- * oldest CSN
- * @param matchingStrategy
- * Cursor key matching strategy
- * @param positionStrategy
- * Cursor position strategy
- * @return a new {@link DBCursor} that allows to browse the db managed by this
- * ReplicaDB and starting at the position defined by a given CSN.
- * @throws ChangelogException
- * if a database problem happened
- */
- DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy,
- final PositionStrategy positionStrategy) throws ChangelogException
- {
- CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null;
- return new JEReplicaDBCursor(db, actualStartCSN, matchingStrategy, positionStrategy, this);
- }
-
- /** Shutdown this ReplicaDB. */
- void shutdown()
- {
- if (shutdown.compareAndSet(false, true))
- {
- db.shutdown();
- DirectoryServer.deregisterMonitorProvider(dbMonitor);
- }
- }
-
- /**
- * Synchronously purge changes older than purgeCSN from this replicaDB.
- *
- * @param purgeCSN
- * The CSN up to which changes can be purged. No purging happens when
- * it is {@code null}.
- * @throws ChangelogException
- * In case of database problem.
- */
- void purgeUpTo(final CSN purgeCSN) throws ChangelogException
- {
- if (purgeCSN == null)
- {
- return;
- }
-
- for (int i = 0; i < 100; i++)
- {
- /*
- * the purge is done by group in order to save some CPU, IO bandwidth and
- * DB caches: start the transaction then do a bunch of remove then commit.
- */
- /*
- * Matt wrote: The record removal is done as a DB transaction and the
- * deleted records are only "deleted" on commit. While the txn/cursor is
- * open the records to be deleted will, I think, be pinned in the DB
- * cache. In other words, the larger the transaction (the more records
- * deleted during a single batch) the more DB cache will be used to
- * process the transaction.
- */
- final ReplServerDBCursor cursor = db.openDeleteCursor();
- try
- {
- for (int j = 0; j < 50; j++)
- {
- if (shutdown.get())
- {
- return;
- }
-
- CSN csn = cursor.nextCSN();
- if (csn == null)
- {
- return;
- }
-
- if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN))
- {
- cursor.delete();
- }
- else
- {
- csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
- return;
- }
- }
- }
- catch (ChangelogException e)
- {
- // mark shutdown for this db so that we don't try again to
- // stop it from cursor.close() or methods called by cursor.close()
- cursor.abort();
- shutdown.set(true);
- throw e;
- }
- finally
- {
- cursor.close();
- }
- }
- }
-
- /**
- * Implements monitoring capabilities of the ReplicaDB.
- */
- private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
- {
- /** {@inheritDoc} */
- @Override
- public List<Attribute> getMonitorData()
- {
- final List<Attribute> attributes = new ArrayList<>();
- create(attributes, "replicationServer-database",String.valueOf(serverId));
- create(attributes, "domain-name", baseDN.toString());
- final CSNLimits limits = csnLimits;
- if (limits.oldestCSN != null)
- {
- create(attributes, "first-change", encode(limits.oldestCSN));
- }
- if (limits.newestCSN != null)
- {
- create(attributes, "last-change", encode(limits.newestCSN));
- }
- return attributes;
- }
-
- private void create(final List<Attribute> attributes, final String name, final String value)
- {
- attributes.add(Attributes.create(name, value));
- }
-
- private String encode(final CSN csn)
- {
- return csn + " " + new Date(csn.getTime());
- }
-
- /** {@inheritDoc} */
- @Override
- public String getMonitorInstanceName()
- {
- ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN);
- return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName();
- }
-
- /** {@inheritDoc} */
- @Override
- public void initializeMonitorProvider(MonitorProviderCfg configuration)
- throws ConfigException,InitializationException
- {
- // Nothing to do for now
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- final CSNLimits limits = csnLimits;
- return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
- + limits.oldestCSN + " " + limits.newestCSN;
- }
-
- /**
- * Clear the changes from this DB (from both memory cache and DB storage).
- * @throws ChangelogException When an exception occurs while removing the
- * changes from the DB.
- */
- void clear() throws ChangelogException
- {
- db.clear();
- csnLimits = new CSNLimits(null, null);
- }
-
- /**
- * Getter for the serverID of the server for which this database is managed.
- *
- * @return the serverId.
- */
- public int getServerId()
- {
- return this.serverId;
- }
-
- /**
- * Return the number of records of this replicaDB.
- * <p>
- * For test purpose.
- *
- * @return The number of records of this replicaDB.
- */
- long getNumberRecords()
- {
- return db.getNumberRecords();
- }
-
- /**
- * Set the window size for writing counter records in the DB.
- * <p>
- * for unit tests only!!
- *
- * @param size
- * window size in number of records.
- */
- void setCounterRecordWindowSize(int size)
- {
- db.setCounterRecordWindowSize(size);
- }
-
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
deleted file mode 100644
index dd9afea..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
-
-import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
-import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
-
-/**
- * Berkeley DB JE implementation of {@link DBCursor}.
- *
- * \@NotThreadSafe
- */
-class JEReplicaDBCursor implements DBCursor<UpdateMsg>
-{
- private final ReplicationDB db;
- private PositionStrategy positionStrategy;
- private KeyMatchingStrategy matchingStrategy;
- private JEReplicaDB replicaDB;
- private final CSN startCSN;
- private CSN lastNonNullCurrentCSN;
- /**
- * The underlying replica DB cursor.
- * <p>
- * Initially <code>null</code>, the first call to {@link #next()} will
- * populate it. A call to {@link #close()} will set it to null again.
- */
- private ReplServerDBCursor cursor;
-
- /**
- * Creates a new {@link JEReplicaDBCursor}. All created cursor must be
- * released by the caller using the {@link #close()} method.
- *
- * @param db
- * The db where the cursor must be created.
- * @param startCSN
- * The CSN after which the cursor must start.If null, start from the
- * oldest CSN
- * @param matchingStrategy
- * Cursor key matching strategy
- * @param positionStrategy
- * Cursor position strategy
- * @param replicaDB
- * The associated JEReplicaDB.
- * @throws ChangelogException
- * if a database problem happened.
- */
- public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, KeyMatchingStrategy matchingStrategy,
- PositionStrategy positionStrategy, JEReplicaDB replicaDB) throws ChangelogException
- {
- this.db = db;
- this.matchingStrategy = matchingStrategy;
- this.positionStrategy = positionStrategy;
- this.replicaDB = replicaDB;
- this.startCSN = startCSN;
- this.lastNonNullCurrentCSN = startCSN;
- }
-
- /** {@inheritDoc} */
- @Override
- public UpdateMsg getRecord()
- {
- if (!isClosed() && cursor != null)
- {
- return cursor.getRecord();
- }
- return null;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean next() throws ChangelogException
- {
- if (isClosed())
- {
- return false;
- }
-
- final ReplServerDBCursor previousCursor = cursor;
- if (getRecord() == null)
- {
- synchronized (this)
- {
- closeCursor();
- // Previously exhausted cursor must be able to reinitialize themselves.
- // There is a risk of readLock never being unlocked
- // if following code is called while the cursor is closed.
- // It is better to let the deadlock happen to help quickly identifying
- // and fixing such issue with unit tests.
- if (lastNonNullCurrentCSN != startCSN)
- {
- // re-initialize to further CSN, take care to use appropriate strategies
- matchingStrategy = GREATER_THAN_OR_EQUAL_TO_KEY;
- positionStrategy = AFTER_MATCHING_KEY;
- }
- cursor = db.openReadCursor(lastNonNullCurrentCSN, matchingStrategy, positionStrategy);
- }
- }
-
- // For ON_MATCHING_KEY, do not call next() if the cursor has just been initialized.
- if ((positionStrategy == ON_MATCHING_KEY && previousCursor != null)
- || positionStrategy == AFTER_MATCHING_KEY)
- {
- cursor.next();
- }
-
- final UpdateMsg currentRecord = cursor.getRecord();
- if (currentRecord != null)
- {
- lastNonNullCurrentCSN = currentRecord.getCSN();
- return true;
- }
- return false;
- }
-
- /** {@inheritDoc} */
- @Override
- public void close()
- {
- synchronized (this)
- {
- closeCursor();
- replicaDB = null;
- }
- }
-
- private boolean isClosed()
- {
- return replicaDB == null;
- }
-
- private void closeCursor()
- {
- if (cursor != null)
- {
- cursor.close();
- cursor = null;
- }
- }
-
- /**
- * Called by the GC when the object is garbage collected. Release the internal
- * cursor in case the cursor was badly used and {@link #close()} was never
- * called.
- */
- @Override
- protected void finalize()
- {
- close();
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return getClass().getSimpleName()
- + " currentChange=" + cursor.getRecord()
- + " positionStrategy=" + positionStrategy
- + " matchingStrategy=" + matchingStrategy
- + " replicaDB=" + replicaDB;
- }
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEUtils.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEUtils.java
deleted file mode 100644
index ea88431..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEUtils.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- * Portions Copyright 2013-2014 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Transaction;
-
-/**
- * Utility class for JE.
- */
-public final class JEUtils
-{
-
- /** The tracer object for the debug logger. */
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- private JEUtils()
- {
- // Utility class
- }
-
- /**
- * Aborts the current transaction. It has no effect if the transaction has
- * committed.
- * <p>
- * This method should only be used after an exception was caught and is about
- * to be rethrown .
- *
- * @param txn
- * the transaction to abort
- */
- public static void abort(Transaction txn)
- {
- if (txn != null)
- {
- try
- {
- txn.abort();
- }
- catch (DatabaseException ignored)
- {
- // Ignored because code is already throwing an exception
- logger.traceException(ignored);
- }
- }
- }
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDB.java
deleted file mode 100644
index 2f7cdf0..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ /dev/null
@@ -1,956 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import java.io.UnsupportedEncodingException;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.ReplicationServerDomain;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
-import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
-import org.opends.server.types.DN;
-import org.opends.server.util.StaticUtils;
-
-import com.sleepycat.je.*;
-
-import static com.sleepycat.je.LockMode.*;
-import static com.sleepycat.je.OperationStatus.*;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
-import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
-import static org.opends.server.util.StaticUtils.*;
-
-/**
- * This class implements the interface between the underlying database
- * and the JEReplicaDB class.
- * <p>
- * This is the only class that should have code using the BDB interfaces.
- */
-class ReplicationDB
-{
-
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- private Database db;
- private final ReplicationDbEnv dbEnv;
- private final ReplicationServer replicationServer;
- private final int serverId;
- private final DN baseDN;
-
- /**
- * The lock used to provide exclusive access to the thread that close the db
- * (shutdown or clear).
- */
- private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true);
-
- // Change counter management
- // The Db itself does not allow to count records between a start and an end
- // change. And we cannot rely on the replication seqnum that is part of the
- // CSN, since there can be holes (when an operation is canceled).
- // And traversing all the records from the start one to the end one works
- // fine but can be very long (ECL:lastChangeNumber).
- //
- // So we are storing special records in the DB (called counter records),
- // that contain the number of changes since the previous counter record.
- // One special record is :
- // - a special key : changetime , serverid=0 seqnum=0
- // - a counter value : count of changes since previous counter record.
- //
- // A counter record has to follow the order of the db, so it needs to have
- // a CSN key that follows the order.
- // A counter record must have its own CSN key since the Db does not support
- // duplicate keys (it is a compatibility breaker character of the DB).
- //
- // We define 2 conditions to store a counter record :
- // 1/- at least 'counterWindowSize' changes have been stored in the Db
- // since the previous counter record
- // 2/- the change to be stored has a new timestamp - so that the counter
- // record is the first record for this timestamp.
-
-
- /** Current value of the counter. */
- private int counterCurrValue = 1;
-
- /**
- * When not null, the next change with a ts different from
- * tsForNewCounterRecord will lead to store a new counterRecord.
- */
- private long counterTsLimit;
-
- /**
- * The counter record will never be written to the db more often than each
- * counterWindowSize changes.
- */
- private int counterWindowSize = 1000;
-
- /**
- * Creates a new database or open existing database that will be used
- * to store and retrieve changes from an LDAP server.
- * @param serverId The identifier of the LDAP server.
- * @param baseDN The baseDN of the replication domain.
- * @param replicationServer The ReplicationServer that needs to be shutdown.
- * @param dbEnv The Db environment to use to create the db.
- * @throws ChangelogException If a database problem happened
- */
- ReplicationDB(int serverId, DN baseDN,
- ReplicationServer replicationServer, ReplicationDbEnv dbEnv)
- throws ChangelogException
- {
- this.serverId = serverId;
- this.baseDN = baseDN;
- this.dbEnv = dbEnv;
- this.replicationServer = replicationServer;
-
- // Get or create the associated ReplicationServerDomain and Db.
- final ReplicationServerDomain domain =
- replicationServer.getReplicationServerDomain(baseDN, true);
- db = dbEnv.getOrAddReplicationDB(serverId, baseDN, domain.getGenerationId());
-
- intializeCounters();
- }
-
- private void intializeCounters() throws ChangelogException
- {
- this.counterCurrValue = 1;
-
- Cursor cursor = null;
- try
- {
- cursor = db.openCursor(null, null);
-
- int distBackToCounterRecord = 0;
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
- while (status == OperationStatus.SUCCESS)
- {
- CSN csn = toCSN(key.getData());
- if (isACounterRecord(csn))
- {
- counterCurrValue = decodeCounterValue(data.getData()) + 1;
- counterTsLimit = csn.getTime();
- break;
- }
-
- status = cursor.getPrev(key, data, LockMode.DEFAULT);
- distBackToCounterRecord++;
- }
- counterCurrValue += distBackToCounterRecord;
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- finally
- {
- close(cursor);
- }
- }
-
- private static CSN toCSN(byte[] data)
- {
- return new CSN(decodeUTF8(data));
- }
-
- /**
- * Add one change to the underlying db.
- *
- * @param change
- * The change to add to the underlying db.
- * @throws ChangelogException
- * If a database problem happened
- */
- void addEntry(UpdateMsg change) throws ChangelogException
- {
- dbCloseLock.readLock().lock();
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return;
- }
-
- final DatabaseEntry key = createReplicationKey(change.getCSN());
- // Always keep messages in the replication DB with the current protocol
- // version
- final DatabaseEntry data = new DatabaseEntry(change.getBytes());
-
- insertCounterRecordIfNeeded(change.getCSN());
- db.put(null, key, data);
- counterCurrValue++;
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(
- ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(
- change, baseDN, serverId, stackTraceToSingleLineString(e)));
- }
- finally
- {
- dbCloseLock.readLock().unlock();
- }
- }
-
- private void insertCounterRecordIfNeeded(CSN csn) throws DatabaseException
- {
- if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0))
- {
- // enough changes to generate a counter record
- // wait for the next change of time
- counterTsLimit = csn.getTime();
- }
- if (counterTsLimit != 0 && csn.getTime() != counterTsLimit)
- {
- // Write the counter record
- final CSN counterRecord = newCounterRecord(csn);
- DatabaseEntry counterKey = createReplicationKey(counterRecord);
- DatabaseEntry counterValue = encodeCounterValue(counterCurrValue - 1);
- db.put(null, counterKey, counterValue);
- counterTsLimit = 0;
- }
- }
-
- private DatabaseEntry createReplicationKey(CSN csn)
- {
- final DatabaseEntry key = new DatabaseEntry();
- if (csn != null)
- {
- try
- {
- key.setData(csn.toString().getBytes("UTF-8"));
- }
- catch (UnsupportedEncodingException e)
- {
- // Should never happens, UTF-8 is always supported
- // TODO : add better logging
- }
- }
- return key;
- }
-
- /**
- * Shutdown the database.
- */
- void shutdown()
- {
- dbCloseLock.writeLock().lock();
- try
- {
- db.close();
- db = null;
- }
- catch (DatabaseException e)
- {
- logger.info(NOTE_EXCEPTION_CLOSING_DATABASE, this, stackTraceToSingleLineString(e));
- }
- finally
- {
- dbCloseLock.writeLock().unlock();
- }
- }
-
- /**
- * Create a cursor that can be used to search or iterate on this
- * ReplicationServer DB.
- *
- * @param startCSN
- * The CSN from which the cursor must start.If null, start from the
- * oldest CSN
- * @param matchingStrategy
- * Cursor key matching strategy
- * @param positionStrategy
- * Cursor position strategy
- * @return The ReplServerDBCursor.
- * @throws ChangelogException
- * If a database problem happened
- */
- ReplServerDBCursor openReadCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy,
- PositionStrategy positionStrategy) throws ChangelogException
- {
- return new ReplServerDBCursor(startCSN, matchingStrategy, positionStrategy);
- }
-
- /**
- * Create a cursor that can be used to delete some record from this
- * ReplicationServer database.
- *
- * @throws ChangelogException If a database error prevented the cursor
- * creation.
- *
- * @return The ReplServerDBCursor.
- */
- ReplServerDBCursor openDeleteCursor() throws ChangelogException
- {
- return new ReplServerDBCursor();
- }
-
- private void closeAndReleaseReadLock(Cursor cursor)
- {
- try
- {
- StaticUtils.close(cursor);
- }
- finally
- {
- dbCloseLock.readLock().unlock();
- }
- }
-
- /**
- * Read the oldest CSN present in the database.
- *
- * @return the oldest CSN in the DB, null if the DB is empty or closed
- * @throws ChangelogException
- * If a database problem happened
- */
- CSN readOldestCSN() throws ChangelogException
- {
- dbCloseLock.readLock().lock();
-
- Cursor cursor = null;
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return null;
- }
-
- cursor = db.openCursor(null, null);
-
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- if (cursor.getFirst(key, data, LockMode.DEFAULT) != SUCCESS)
- {
- // database is empty
- return null;
- }
-
- final CSN csn = toCSN(key.getData());
- if (!isACounterRecord(csn))
- {
- return csn;
- }
-
- // First record is a counter record .. go next
- if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
- {
- // DB contains only a counter record
- return null;
- }
- // There cannot be 2 counter record next to each other,
- // it is safe to return this record
- return toCSN(key.getData());
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- finally
- {
- closeAndReleaseReadLock(cursor);
- }
- }
-
- /**
- * Read the newest CSN present in the database.
- *
- * @return the newest CSN in the DB, null if the DB is empty or closed
- * @throws ChangelogException
- * If a database problem happened
- */
- CSN readNewestCSN() throws ChangelogException
- {
- dbCloseLock.readLock().lock();
-
- Cursor cursor = null;
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return null;
- }
-
- cursor = db.openCursor(null, null);
-
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- if (cursor.getLast(key, data, LockMode.DEFAULT) != SUCCESS)
- {
- // database is empty
- return null;
- }
-
- final CSN csn = toCSN(key.getData());
- if (!isACounterRecord(csn))
- {
- return csn;
- }
-
- if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
- {
- /*
- * database only contain a counter record - don't know how much it can
- * be possible but ...
- */
- return null;
- }
- // There cannot be 2 counter record next to each other,
- // it is safe to return this record
- return toCSN(key.getData());
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- finally
- {
- closeAndReleaseReadLock(cursor);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return serverId + " " + baseDN;
- }
-
- /** Hold a cursor and an indicator of wether the cursor should be considered as empty. */
- private static class CursorWithEmptyIndicator
- {
- private Cursor cursor;
- private boolean isEmpty;
-
- private CursorWithEmptyIndicator(Cursor localCursor, boolean isEmpty)
- {
- this.cursor = localCursor;
- this.isEmpty = isEmpty;
- }
-
- /** Creates cursor considered as empty. */
- static CursorWithEmptyIndicator createEmpty(Cursor cursor)
- {
- return new CursorWithEmptyIndicator(cursor, true);
- }
-
- /** Creates cursor considered as non-empty. */
- static CursorWithEmptyIndicator createNonEmpty(Cursor cursor)
- {
- return new CursorWithEmptyIndicator(cursor, false);
- }
- }
-
- /**
- * This Class implements a cursor that can be used to browse a
- * replicationServer database.
- */
- class ReplServerDBCursor implements DBCursor<UpdateMsg>
- {
- /**
- * The transaction that will protect the actions done with the cursor.
- * <p>
- * Will be let null for a read cursor
- * <p>
- * Will be set non null for a write cursor
- */
- private Cursor cursor;
- private final DatabaseEntry key;
- private final DatabaseEntry data;
- /** \@Null for read cursors, \@NotNull for deleting cursors. */
- private final Transaction txn;
- private UpdateMsg currentRecord;
-
- private boolean isClosed;
-
- /**
- * Creates a ReplServerDBCursor that can be used for browsing a
- * replicationServer db.
- *
- * @param startCSN
- * The CSN from which the cursor must start.
- * @param matchingStrategy
- * Cursor key matching strategy, which allow to indicates how key
- * is matched
- * @param positionStrategy
- * indicates at which exact position the cursor must start
- * @throws ChangelogException
- * When the startCSN does not exist.
- */
- private ReplServerDBCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
- throws ChangelogException
- {
- key = createReplicationKey(startCSN);
- data = new DatabaseEntry();
- txn = null;
-
- // Take the lock. From now on, whatever error that happen in the life
- // of this cursor should end by unlocking that lock. We must also
- // unlock it when throwing an exception.
- dbCloseLock.readLock().lock();
-
- CursorWithEmptyIndicator maybeEmptyCursor = null;
- try
- {
- // If the DB has been closed then create empty cursor.
- if (isDBClosed())
- {
- isClosed = true;
- cursor = null;
- return;
- }
-
- maybeEmptyCursor = generateCursor(startCSN, matchingStrategy, positionStrategy);
- if (maybeEmptyCursor.isEmpty)
- {
- isClosed = true;
- cursor = null;
- return;
- }
-
- cursor = maybeEmptyCursor.cursor;
- if (key.getData() != null)
- {
- computeCurrentRecord();
- }
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- finally
- {
- if (maybeEmptyCursor != null && maybeEmptyCursor.isEmpty)
- {
- closeAndReleaseReadLock(maybeEmptyCursor.cursor);
- }
- }
- }
-
- /** Generate a possibly empty cursor with the provided start CSN and strategies. */
- private CursorWithEmptyIndicator generateCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy,
- PositionStrategy positionStrategy)
- {
- Cursor cursor = db.openCursor(txn, null);
- boolean isCsnFound = startCSN == null || cursor.getSearchKey(key, data, LockMode.DEFAULT) == SUCCESS;
- if (!isCsnFound)
- {
- if (matchingStrategy == EQUAL_TO_KEY)
- {
- return CursorWithEmptyIndicator.createEmpty(cursor);
- }
-
- boolean isGreaterCsnFound = cursor.getSearchKeyRange(key, data, DEFAULT) == SUCCESS;
- if (isGreaterCsnFound)
- {
- if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY && positionStrategy == AFTER_MATCHING_KEY)
- {
- // Move backward so that the first call to next() points to this greater csn
- key.setData(null);
- if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
- {
- // Edge case: we're at the beginning of the database
- cursor.close();
- cursor = db.openCursor(txn, null);
- }
- }
- else if (matchingStrategy == LESS_THAN_OR_EQUAL_TO_KEY)
- {
- // Move backward to point on the lower csn
- key.setData(null);
- if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
- {
- // Edge case: we're at the beginning of the log, there is no lower csn
- return CursorWithEmptyIndicator.createEmpty(cursor);
- }
- }
- }
- else
- {
- if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
- {
- // There is no greater csn
- return CursorWithEmptyIndicator.createEmpty(cursor);
- }
- // LESS_THAN_OR_EQUAL_TO_KEY case : the lower csn is the highest csn available
- key.setData(null);
- boolean isLastKeyFound = cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS;
- if (!isLastKeyFound)
- {
- // Edge case: empty database
- cursor.close();
- cursor = db.openCursor(txn, null);
- }
- }
- }
- return CursorWithEmptyIndicator.createNonEmpty(cursor);
- }
-
- private ReplServerDBCursor() throws ChangelogException
- {
- key = new DatabaseEntry();
- data = new DatabaseEntry();
-
- // We'll go on only if no close or no clear is running
- dbCloseLock.readLock().lock();
-
- boolean cursorHeld = false;
- Transaction localTxn = null;
- Cursor localCursor = null;
- try
- {
- // If the DB has been closed then create empty cursor.
- if (isDBClosed())
- {
- isClosed = true;
- txn = null;
- cursor = null;
- return;
- }
-
- // Create the transaction that will protect whatever done with this
- // write cursor.
- localTxn = dbEnv.beginTransaction();
- localCursor = db.openCursor(localTxn, null);
-
- txn = localTxn;
- cursor = localCursor;
- cursorHeld = cursor != null;
- }
- catch (ChangelogException e)
- {
- JEUtils.abort(localTxn);
- throw e;
- }
- catch (Exception e)
- {
- JEUtils.abort(localTxn);
- throw new ChangelogException(e);
- }
- finally
- {
- if (!cursorHeld)
- {
- closeAndReleaseReadLock(localCursor);
- }
- }
- }
-
- /**
- * Close the ReplicationServer Cursor.
- */
- @Override
- public void close()
- {
- synchronized (this)
- {
- if (isClosed)
- {
- return;
- }
- isClosed = true;
- currentRecord = null;
- }
-
- closeAndReleaseReadLock(cursor);
-
- if (txn != null)
- {
- try
- {
- // No need for durability when purging.
- txn.commit(Durability.COMMIT_NO_SYNC);
- }
- catch (DatabaseException e)
- {
- dbEnv.shutdownOnException(e);
- }
- }
- }
-
- /**
- * Abort the cursor after an Exception.
- * This method catch and ignore the DatabaseException because
- * this must be done when aborting a cursor after a DatabaseException
- * (per the Cursor documentation).
- * This should not be used in any other case.
- */
- void abort()
- {
- synchronized (this)
- {
- if (isClosed)
- {
- return;
- }
- isClosed = true;
- }
-
- closeAndReleaseReadLock(cursor);
- JEUtils.abort(txn);
- }
-
- /**
- * Get the next CSN in the database from this Cursor.
- *
- * @return The next CSN in the database from this cursor.
- * @throws ChangelogException
- * In case of underlying database problem.
- */
- CSN nextCSN() throws ChangelogException
- {
- if (isClosed)
- {
- return null;
- }
-
- currentRecord = null;
- try
- {
- if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
- {
- return null;
- }
- return toCSN(key.getData());
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean next() throws ChangelogException
- {
- if (isClosed)
- {
- return false;
- }
-
- currentRecord = null;
- while (currentRecord == null)
- {
- try
- {
- if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
- {
- return false;
- }
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- computeCurrentRecord();
- }
- return currentRecord != null;
- }
-
- private void computeCurrentRecord()
- {
- CSN csn = null;
- try
- {
- csn = toCSN(key.getData());
- if (isACounterRecord(csn))
- {
- return;
- }
- currentRecord = toRecord(data.getData());
- }
- catch (Exception e)
- {
- /*
- * An error happening trying to convert the data from the
- * replicationServer database to an Update Message. This can only
- * happen if the database is corrupted. There is not much more that we
- * can do at this point except trying to continue with the next
- * record. In such case, it is therefore possible that we miss some
- * changes.
- * TODO : This should be handled by the repair functionality.
- */
- logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
- csn, e.getMessage());
- }
- }
-
- private UpdateMsg toRecord(final byte[] data) throws Exception
- {
- final short currentVersion = ProtocolVersion.getCurrentVersion();
- return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion);
- }
-
- /** {@inheritDoc} */
- @Override
- public UpdateMsg getRecord()
- {
- return currentRecord;
- }
-
- /**
- * Delete the record at the current cursor position.
- *
- * @throws ChangelogException In case of database problem.
- */
- void delete() throws ChangelogException
- {
- if (isClosed)
- {
- throw new IllegalStateException("ReplServerDBCursor already closed");
- }
-
- try
- {
- cursor.delete();
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- }
- }
-
- /**
- * Clears this change DB from the changes it contains.
- *
- * @throws ChangelogException In case of database problem.
- */
- void clear() throws ChangelogException
- {
- // The coming users will be blocked until the clear is done
- dbCloseLock.writeLock().lock();
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return;
- }
-
- // Clears the reference to this serverID
- dbEnv.clearServerId(baseDN, serverId);
-
- final Database oldDb = db;
- db = null; // In case there's a failure between here and recreation.
- dbEnv.clearDb(oldDb);
-
- // RE-create the db
- db = dbEnv.getOrAddReplicationDB(serverId, baseDN, -1);
- }
- catch (Exception e)
- {
- logger.error(ERR_ERROR_CLEARING_DB, this, e.getMessage() + " " + stackTraceToSingleLineString(e));
- }
- finally
- {
- // Relax the waiting users
- dbCloseLock.writeLock().unlock();
- }
- }
-
- /**
- * Whether a provided CSN represents a counter record. A counter record is
- * used to store the time.
- *
- * @param csn
- * The CSN to test
- * @return true if the provided CSN is a counter record, false if the change
- * is a regular/normal change that was performed on the replica.
- */
- private static boolean isACounterRecord(CSN csn)
- {
- return csn.getServerId() == 0 && csn.getSeqnum() == 0;
- }
-
- private static CSN newCounterRecord(CSN csn)
- {
- return new CSN(csn.getTime(), 0, 0);
- }
-
- /**
- * Decode the provided database entry as a the value of a counter.
- * @param entry The provided entry.
- * @return The counter value.
- */
- private static int decodeCounterValue(byte[] entry)
- {
- String numAckStr = decodeUTF8(entry);
- return Integer.parseInt(numAckStr);
- }
-
- /**
- * Encode the provided counter value in a database entry.
- * @return The database entry with the counter value encoded inside.
- */
- private static DatabaseEntry encodeCounterValue(int value)
- {
- DatabaseEntry entry = new DatabaseEntry();
- entry.setData(getBytes(String.valueOf(value)));
- return entry;
- }
-
- /**
- * Set the counter writing window size (public method for unit tests only).
- * @param size Size in number of record.
- */
- public void setCounterRecordWindowSize(int size)
- {
- this.counterWindowSize = size;
- }
-
- /**
- * Returns {@code true} if the DB is closed. This method assumes that either
- * the db read/write lock has been taken.
- */
- private boolean isDBClosed()
- {
- return db == null || !db.getEnvironment().isValid();
- }
-
- /**
- * Returns the number of records in this DB.
- *
- * @return the number of records in this DB.
- */
- long getNumberRecords()
- {
- return db.count();
- }
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
deleted file mode 100644
index d9f80bd..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ /dev/null
@@ -1,895 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import java.io.File;
-import java.io.UnsupportedEncodingException;
-import java.util.AbstractMap.SimpleImmutableEntry;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.forgerock.i18n.LocalizableMessage;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.server.ChangelogState;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
-
-import com.sleepycat.je.*;
-
-import static com.sleepycat.je.EnvironmentConfig.*;
-import static com.sleepycat.je.OperationStatus.*;
-
-import static org.opends.messages.BackendMessages.*;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.*;
-
-/**
- * This class represents a DB environment that acts as a factory for
- * ReplicationDBs.
- */
-public class ReplicationDbEnv implements ChangelogStateProvider
-{
- private Environment dbEnvironment;
- private Database changelogStateDb;
- /**
- * The current changelogState. This is in-memory version of what is inside the
- * on-disk changelogStateDB. It improves performances in case the
- * changelogState is read often.
- *
- * @GuardedBy("stateLock")
- */
- private final ChangelogState changelogState;
- /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState. */
- private final Object stateLock = new Object();
- private final List<Database> allDbs = new CopyOnWriteArrayList<>();
- private ReplicationServer replicationServer;
- private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
- private static final String GENERATION_ID_TAG = "GENID";
- private static final String OFFLINE_TAG = "OFFLINE";
- private static final String FIELD_SEPARATOR = " ";
- /** The tracer object for the debug logger. */
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
- /**
- * Initialize this class.
- * Creates Db environment that will be used to create databases.
- * It also reads the currently known databases from the "changelogstate"
- * database.
- * @param path Path where the backing files must be created.
- * @param replicationServer the ReplicationServer that creates this
- * ReplicationDbEnv.
- * @throws ChangelogException If an Exception occurred that prevented
- * the initialization to happen.
- */
- public ReplicationDbEnv(String path, ReplicationServer replicationServer)
- throws ChangelogException
- {
- this.replicationServer = replicationServer;
-
- try
- {
- dbEnvironment = openJEEnvironment(path);
-
- /*
- * One database is created to store the update from each LDAP server in
- * the topology. The database "changelogstate" is used to store the list
- * of all the servers that have been seen in the past.
- */
- changelogStateDb = openDatabase("changelogstate");
- changelogState = readOnDiskChangelogState();
- }
- catch (RuntimeException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- /**
- * Open a JE environment.
- * <p>
- * protected so it can be overridden by tests.
- *
- * @param path
- * the path to the JE environment in the filesystem
- * @return the opened JE environment
- */
- protected Environment openJEEnvironment(String path)
- {
- final EnvironmentConfig envConfig = new EnvironmentConfig();
-
- /*
- * Create the DB Environment that will be used for all the
- * ReplicationServer activities related to the db
- */
- envConfig.setAllowCreate(true);
- envConfig.setTransactional(true);
- envConfig.setConfigParam(STATS_COLLECT, "false");
- envConfig.setConfigParam(CLEANER_THREADS, "2");
- envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
- /*
- * Tests have shown that since the parsing of the Replication log is
- * always done sequentially, it is not necessary to use a large DB cache.
- */
- if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
- {
- /*
- * If the JVM is reasonably large then we can safely default to bigger
- * read buffers. This will result in more scalable checkpointer and
- * cleaner performance.
- */
- envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
- envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
- envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
-
- /*
- * The cache size must be bigger in order to accommodate the larger
- * buffers - see OPENDJ-943.
- */
- envConfig.setConfigParam(MAX_MEMORY, mb(16));
- }
- else
- {
- /*
- * Use 5M so that the replication can be used with 64M total for the
- * JVM.
- */
- envConfig.setConfigParam(MAX_MEMORY, mb(5));
- }
-
- // Since records are always added at the end of the Replication log and
- // deleted at the beginning of the Replication log, this should never
- // cause any deadlock.
- envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
- envConfig.setLockTimeout(0, TimeUnit.SECONDS);
-
- // Since replication provides durability, we can reduce the DB durability
- // level so that we are immune to application / JVM crashes.
- envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
-
- return new Environment(new File(path), envConfig);
- }
-
- private String kb(int sizeInKb)
- {
- return String.valueOf(sizeInKb * 1024);
- }
-
- private String mb(int sizeInMb)
- {
- return String.valueOf(sizeInMb * 1024 * 1024);
- }
-
- /**
- * Open a JE database.
- * <p>
- * protected so it can be overridden by tests.
- *
- * @param databaseName
- * the databaseName to open
- * @return the opened JE database
- * @throws ChangelogException
- * if a problem happened opening the database
- * @throws RuntimeException
- * if a problem happened with the JE database
- */
- protected Database openDatabase(String databaseName)
- throws ChangelogException, RuntimeException
- {
- if (isShuttingDown.get())
- {
- throw new ChangelogException(
- WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(
- databaseName, replicationServer.getServerId()));
- }
- final DatabaseConfig dbConfig = new DatabaseConfig();
- dbConfig.setAllowCreate(true);
- dbConfig.setTransactional(true);
- final Database db =
- dbEnvironment.openDatabase(null, databaseName, dbConfig);
- if (isShuttingDown.get())
- {
- closeDB(db);
- throw new ChangelogException(
- WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(
- databaseName, replicationServer.getServerId()));
- }
- allDbs.add(db);
- return db;
- }
-
- @Override
- public ChangelogState getChangelogState()
- {
- return changelogState;
- }
-
- /**
- * Read and return the changelog state from the database.
- *
- * @return the {@link ChangelogState} read from the changelogState DB
- * @throws ChangelogException
- * if a database problem occurs
- */
- protected ChangelogState readOnDiskChangelogState() throws ChangelogException
- {
- return decodeChangelogState(readWholeState());
- }
-
- /**
- * Decode the whole changelog state DB.
- *
- * @param wholeState
- * the whole changelog state DB as a Map.
- * The Map is only used as a convenient collection of key => data objects
- * @return the decoded changelog state
- * @throws ChangelogException
- * if a problem occurred while decoding
- */
- ChangelogState decodeChangelogState(Map<byte[], byte[]> wholeState)
- throws ChangelogException
- {
- try
- {
- final ChangelogState result = new ChangelogState();
- for (Entry<byte[], byte[]> entry : wholeState.entrySet())
- {
- final String stringKey = toString(entry.getKey());
- final String stringData = toString(entry.getValue());
-
- if (logger.isTraceEnabled())
- {
- debug("read (key, data)=(" + stringKey + ", " + stringData + ")");
- }
-
- final String prefix = stringKey.split(FIELD_SEPARATOR)[0];
- if (prefix.equals(GENERATION_ID_TAG))
- {
- final String[] str = stringData.split(FIELD_SEPARATOR, 3);
- final long generationId = toLong(str[1]);
- final DN baseDN = DN.valueOf(str[2]);
-
- if (logger.isTraceEnabled())
- {
- debug("has read generationId: baseDN=" + baseDN + " generationId="
- + generationId);
- }
- result.setDomainGenerationId(baseDN, generationId);
- }
- else if (prefix.equals(OFFLINE_TAG))
- {
- final String[] str = stringData.split(FIELD_SEPARATOR, 3);
- long timestamp = toLong(str[0]);
- final int serverId = toInt(str[1]);
- final DN baseDN = DN.valueOf(str[2]);
- if (logger.isTraceEnabled())
- {
- debug("has read replica offline: baseDN=" + baseDN + " serverId="
- + serverId);
- }
- result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId));
- }
- else
- {
- final String[] str = stringData.split(FIELD_SEPARATOR, 2);
- final int serverId = toInt(str[0]);
- final DN baseDN = DN.valueOf(str[1]);
-
- if (logger.isTraceEnabled())
- {
- debug("has read replica: baseDN=" + baseDN + " serverId="
- + serverId);
- }
- result.addServerIdToDomain(serverId, baseDN);
- }
- }
- return result;
- }
- catch (DirectoryException e)
- {
- throw new ChangelogException(e.getMessageObject(), e);
- }
- }
-
- private Map<byte[], byte[]> readWholeState() throws ChangelogException
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- Cursor cursor = changelogStateDb.openCursor(null, null);
-
- try
- {
- final Map<byte[], byte[]> results = new LinkedHashMap<>();
-
- OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
- while (status == OperationStatus.SUCCESS)
- {
- results.put(key.getData(), data.getData());
- status = cursor.getNext(key, data, LockMode.DEFAULT);
- }
-
- return results;
- }
- catch (RuntimeException e)
- {
- throw new ChangelogException(ERR_DATABASE_EXCEPTION.get(e.getMessage()), e);
- }
- finally
- {
- close(cursor);
- }
- }
-
- private int toInt(String data) throws ChangelogException
- {
- try
- {
- return Integer.parseInt(data);
- }
- catch (NumberFormatException e)
- {
- // should never happen
- // TODO: i18n
- throw new ChangelogException(LocalizableMessage.raw(
- "replicationServer state database has a wrong format: "
- + e.getLocalizedMessage() + "<" + data + ">"));
- }
- }
-
- private long toLong(String data) throws ChangelogException
- {
- try
- {
- return Long.parseLong(data);
- }
- catch (NumberFormatException e)
- {
- // should never happen
- // TODO: i18n
- throw new ChangelogException(LocalizableMessage.raw(
- "replicationServer state database has a wrong format: "
- + e.getLocalizedMessage() + "<" + data + ">"));
- }
- }
-
- private String toString(byte[] data) throws ChangelogException
- {
- try
- {
- return new String(data, "UTF-8");
- }
- catch (UnsupportedEncodingException e)
- {
- // should never happens
- // TODO: i18n
- throw new ChangelogException(LocalizableMessage.raw("need UTF-8 support"));
- }
- }
-
- /**
- * Converts the string to a UTF8-encoded byte array.
- *
- * @param s
- * the string to convert
- * @return the byte array representation of the UTF8-encoded string
- */
- static byte[] toBytes(String s)
- {
- try
- {
- return s.getBytes("UTF-8");
- }
- catch (UnsupportedEncodingException e)
- {
- // can't happen
- return null;
- }
- }
-
- /**
- * Finds or creates the database used to store changes for a replica with the
- * given baseDN and serverId.
- *
- * @param serverId
- * The server id that identifies the server.
- * @param baseDN
- * The baseDN that identifies the domain.
- * @param generationId
- * The generationId associated to this domain.
- * @return the Database.
- * @throws ChangelogException
- * in case of underlying Exception.
- */
- public Database getOrAddReplicationDB(int serverId, DN baseDN, long generationId)
- throws ChangelogException
- {
- if (logger.isTraceEnabled())
- {
- debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDN + ", "
- + generationId + ")");
- }
- try
- {
- // JNR: redundant info is stored between the key and data down below.
- // It is probably ok since "changelogstate" DB does not receive a high
- // volume of inserts.
- Entry<String, String> replicaEntry = toReplicaEntry(baseDN, serverId);
-
- // Opens the DB for the changes received from this server on this domain.
- final Database replicaDB = openDatabase(replicaEntry.getKey());
-
- synchronized (stateLock)
- {
- putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
- changelogState.addServerIdToDomain(serverId, baseDN);
- putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
- changelogState.setDomainGenerationId(baseDN, generationId);
- }
- return replicaDB;
- }
- catch (RuntimeException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- /**
- * Return an entry to store in the changelog state database representing a
- * replica in the topology.
- *
- * @param baseDN
- * the replica's baseDN
- * @param serverId
- * the replica's serverId
- * @return a database entry for the replica
- */
- static Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
- {
- final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedUrlSafeString();
- final String value = serverId + FIELD_SEPARATOR + baseDN;
- return toEntry(key, value);
- }
-
- /**
- * Return an entry to store in the changelog state database representing the
- * domain generation id.
- *
- * @param baseDN
- * the domain's baseDN
- * @param generationId
- * the domain's generationId
- * @return a database entry for the generationId
- */
- static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId)
- {
- final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDN.toNormalizedUrlSafeString();
- final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR + baseDN;
- return toEntry(toBytes(key), toBytes(data));
- }
-
- /**
- * Converts an Entry<String, String> to an Entry<byte[], byte[]>.
- *
- * @param entry
- * the entry to convert
- * @return the converted entry
- */
- static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry)
- {
- return toEntry(toBytes(entry.getKey()), toBytes(entry.getValue()));
- }
-
- /**
- * Return an entry to store in the changelog state database representing the
- * time a replica went offline.
- *
- * @param baseDN
- * the replica's baseDN
- * @param offlineCSN
- * the replica's serverId and offline timestamp
- * @return a database entry representing the time a replica went offline
- */
- static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
- {
- final int serverId = offlineCSN.getServerId();
- final byte[] key = toReplicaOfflineKey(baseDN, serverId);
- final byte[] data = toBytes(offlineCSN.getTime() + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN);
- return toEntry(key, data);
- }
-
- /**
- * Return the key for a replica offline entry in the changelog state database.
- *
- * @param baseDN
- * the replica's baseDN
- * @param serverId
- * the replica's serverId
- * @return the key used in the database to store offline time of the replica
- */
- private static byte[] toReplicaOfflineKey(DN baseDN, int serverId)
- {
- return toBytes(OFFLINE_TAG + FIELD_SEPARATOR + serverId + FIELD_SEPARATOR + baseDN.toNormalizedUrlSafeString());
- }
-
- /** Returns an entry with the provided key and a null value. */
- private SimpleImmutableEntry<byte[], byte[]> toEntryWithNullValue(byte[] key)
- {
- return toEntry(key, null);
- }
-
- private static <K, V> SimpleImmutableEntry<K, V> toEntry(final K key, final V value)
- {
- return new SimpleImmutableEntry<>(key, value);
- }
-
- private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
- throws ChangelogException, RuntimeException
- {
- DatabaseEntry key = new DatabaseEntry(entry.getKey());
- DatabaseEntry data = new DatabaseEntry();
- if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
- {
- Transaction txn = dbEnvironment.beginTransaction(null, null);
- try
- {
- data.setData(entry.getValue());
- if (logger.isTraceEnabled())
- {
- debug("putting record in the changelogstate Db key=["
- + toString(entry.getKey()) + "] value=["
- + toString(entry.getValue()) + "]");
- }
- changelogStateDb.put(txn, key, data);
- txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
- }
- catch (DatabaseException dbe)
- {
- // Abort the txn and propagate the Exception to the caller
- txn.abort();
- throw dbe;
- }
- }
- }
-
- /**
- * Creates a new transaction.
- *
- * @return the transaction.
- * @throws ChangelogException in case of underlying exception
- */
- public Transaction beginTransaction() throws ChangelogException
- {
- try
- {
- return dbEnvironment.beginTransaction(null, null);
- }
- catch (RuntimeException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- /**
- * Shutdown the Db environment.
- */
- public void shutdown()
- {
- isShuttingDown.set(true);
- // CopyOnWriteArrayList iterator never throw ConcurrentModificationException
- // This code rely on openDatabase() to close databases opened concurrently
- // with this code
- final Database[] allDbsCopy = allDbs.toArray(new Database[0]);
- allDbs.clear();
- for (Database db : allDbsCopy)
- {
- closeDB(db);
- }
-
- try
- {
- dbEnvironment.close();
- }
- catch (DatabaseException e)
- {
- logger.error(closeDBErrorMessage(null, e));
- }
- }
-
- private void closeDB(Database db)
- {
- allDbs.remove(db);
- try
- {
- db.close();
- }
- catch (DatabaseException e)
- {
- logger.error(closeDBErrorMessage(db.getDatabaseName(), e));
- }
- }
-
- private LocalizableMessage closeDBErrorMessage(String dbName, DatabaseException e)
- {
- if (dbName != null)
- {
- return NOTE_EXCEPTION_CLOSING_DATABASE.get(dbName,
- stackTraceToSingleLineString(e));
- }
- return ERR_ERROR_CLOSING_CHANGELOG_ENV.get(stackTraceToSingleLineString(e));
- }
-
- /**
- * Clears the provided generationId associated to the provided baseDN from the
- * state Db.
- *
- * @param baseDN
- * The baseDN for which the generationID must be cleared.
- * @throws ChangelogException
- * If a database problem happened
- */
- public void clearGenerationId(DN baseDN) throws ChangelogException
- {
- synchronized (stateLock)
- {
- final int unusedGenId = 0;
- deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
- "clearGenerationId(baseDN=" + baseDN + ")");
- changelogState.setDomainGenerationId(baseDN, unusedGenId);
- }
- }
-
- /**
- * Clears the provided serverId associated to the provided baseDN from the
- * state Db.
- *
- * @param baseDN
- * The baseDN for which the serverId must be cleared.
- * @param serverId
- * The serverId to remove from the Db.
- * @throws ChangelogException
- * If a database problem happened
- */
- public void clearServerId(DN baseDN, int serverId) throws ChangelogException
- {
- synchronized (stateLock)
- {
- deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
- "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
- changelogState.setDomainGenerationId(baseDN, -1);
- }
- }
-
- private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
- String methodInvocation) throws ChangelogException
- {
- if (logger.isTraceEnabled())
- {
- debug(methodInvocation + " starting");
- }
-
- try
- {
- final DatabaseEntry key = new DatabaseEntry(entry.getKey());
- final DatabaseEntry data = new DatabaseEntry();
- if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
- {
- Transaction txn = dbEnvironment.beginTransaction(null, null);
- try
- {
- changelogStateDb.delete(txn, key);
- txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
- if (logger.isTraceEnabled())
- {
- debug(methodInvocation + " succeeded");
- }
- }
- catch (RuntimeException dbe)
- {
- // Abort the txn and propagate the Exception to the caller
- txn.abort();
- throw dbe;
- }
- }
- else if (logger.isTraceEnabled())
- {
- debug(methodInvocation + " failed: key not found");
- }
- }
- catch (RuntimeException e)
- {
- if (logger.isTraceEnabled())
- {
- debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
- }
- throw new ChangelogException(e);
- }
- }
-
- /**
- * Notify that replica is offline.
- * <p>
- * This information is stored in the changelog state DB.
- *
- * @param baseDN
- * the domain of the offline replica
- * @param offlineCSN
- * the offline replica serverId and offline timestamp
- * @throws ChangelogException
- * if a database problem occurred
- */
- public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
- throws ChangelogException
- {
- synchronized (stateLock)
- {
- // just overwrite any older entry as it is assumed a newly received offline
- // CSN is newer than the previous one
- putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
- "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
- changelogState.addOfflineReplica(baseDN, offlineCSN);
- }
- }
-
- /**
- * Notify that replica is online.
- * <p>
- * Update the changelog state DB if necessary (ie, replica was known to be
- * offline).
- *
- * @param baseDN
- * the domain of replica
- * @param serverId
- * the serverId of replica
- * @throws ChangelogException
- * if a database problem occurred
- */
- public void notifyReplicaOnline(DN baseDN, int serverId) throws ChangelogException
- {
- deleteFromChangelogStateDB(toEntryWithNullValue(toReplicaOfflineKey(baseDN, serverId)),
- "removeOfflineReplica(baseDN=" + baseDN + ", serverId=" + serverId + ")");
- }
-
- private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
- String methodInvocation) throws ChangelogException
- {
- if (logger.isTraceEnabled())
- {
- debug(methodInvocation + " starting");
- }
-
- try
- {
- final DatabaseEntry key = new DatabaseEntry(entry.getKey());
- final DatabaseEntry data = new DatabaseEntry(entry.getValue());
- changelogStateDb.put(null, key, data);
- if (logger.isTraceEnabled())
- {
- debug(methodInvocation + " succeeded");
- }
- }
- catch (RuntimeException e)
- {
- if (logger.isTraceEnabled())
- {
- debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
- }
- throw new ChangelogException(e);
- }
- }
-
- /**
- * Clears the database.
- *
- * @param db
- * The database to clear.
- */
- public final void clearDb(Database db)
- {
- String databaseName = db.getDatabaseName();
-
- // Closing is requested by Berkeley JE before truncate
- db.close();
-
- Transaction txn = null;
- try
- {
- txn = dbEnvironment.beginTransaction(null, null);
- dbEnvironment.truncateDatabase(txn, databaseName, false);
- txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
- txn = null;
- }
- catch (RuntimeException e)
- {
- logger.error(ERR_ERROR_CLEARING_DB, databaseName,
- e.getMessage() + " " + stackTraceToSingleLineString(e));
- }
- finally
- {
- try
- {
- if (txn != null)
- {
- txn.abort();
- }
- }
- catch(Exception e)
- { /* do nothing */ }
- }
- }
-
- /**
- * Get or create a db to manage integer change number associated
- * to multidomain server state.
- * TODO:ECL how to manage compatibility of this db with new domains
- * added or removed ?
- * @return the retrieved or created db.
- * @throws ChangelogException when a problem occurs.
- */
- public Database getOrCreateCNIndexDB() throws ChangelogException
- {
- try
- {
- // Opens the database for change number associated to this domain.
- // Create it if it does not already exist.
- return openDatabase("draftcndb");
- }
- catch (RuntimeException e)
- {
- throw new ChangelogException(e);
- }
- }
-
- /**
- * Shuts down replication when an unexpected database exception occurs. Note
- * that we do not expect lock timeouts or txn timeouts because the replication
- * databases are deadlock free, thus all operations should complete
- * eventually.
- *
- * @param e
- * The unexpected database exception.
- */
- void shutdownOnException(DatabaseException e)
- {
- logger.error(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR, stackTraceToSingleLineString(e));
- replicationServer.shutdown();
- }
-
- private void debug(String message)
- {
- // replication server may be null in tests
- logger.trace("In %s, %s",
- replicationServer != null ? replicationServer.getMonitorInstanceName() : "[test]",
- message);
- }
-
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDraftCNKey.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDraftCNKey.java
deleted file mode 100644
index 92da371..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDraftCNKey.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2010-2013 ForgeRock AS.
- */
-package org.opends.server.replication.server.changelog.je;
-
-import java.io.UnsupportedEncodingException;
-
-import com.sleepycat.je.DatabaseEntry;
-
-import static org.opends.server.util.StaticUtils.*;
-
-/**
- * Useful to create ReplicationServer keys from sequence numbers.
- */
-public class ReplicationDraftCNKey extends DatabaseEntry
-{
- private static final long serialVersionUID = 1L;
-
- /**
- * Creates a ReplicationDraftCNKey that can start anywhere in the DB.
- */
- public ReplicationDraftCNKey()
- {
- super();
- }
-
- /**
- * Creates a new ReplicationKey from the given change number.
- *
- * @param changeNumber
- * The change number to use.
- */
- public ReplicationDraftCNKey(long changeNumber)
- {
- try
- {
- // Should it use StaticUtils.getBytes() to increase performances?
- setData(String.format("%016d", changeNumber).getBytes("UTF-8"));
- }
- catch (UnsupportedEncodingException e)
- {
- // Should never happens, UTF-8 is always supported
- // TODO : add better logging
- }
- }
-
- /**
- * Getter for the change number associated with this key.
- *
- * @return the change number associated with this key.
- */
- public long getChangeNumber()
- {
- return Long.valueOf(decodeUTF8(getData()));
- }
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/package-info.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/package-info.java
deleted file mode 100644
index bd7b15c..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/package-info.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Portions Copyright 2013 ForgeRock AS
- */
-
-/**
- * This package contains the Berkeley DB JE implementation for the changelog
- * database API.
- */
-@org.opends.server.types.PublicAPI(
- stability = org.opends.server.types.StabilityLevel.PRIVATE)
-package org.opends.server.replication.server.changelog.je;
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/tools/upgrade/Upgrade.java b/opendj-server-legacy/src/main/java/org/opends/server/tools/upgrade/Upgrade.java
index bb66001..25a9ca8 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/tools/upgrade/Upgrade.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/tools/upgrade/Upgrade.java
@@ -562,11 +562,7 @@
/** See OPENDJ-1742 */
register("3.0.0",
- clearReplicationDbDirectory(),
- modifyConfigEntry(INFO_UPGRADE_TASK_ENABLED_FILE_BASED_CHANGELOG.get(),
- "(objectClass=ds-cfg-replication-server)",
- "replace: ds-cfg-replication-db-implementation",
- "ds-cfg-replication-db-implementation: log"));
+ clearReplicationDbDirectory());
/**
* All upgrades will refresh the server configuration schema and generate a new upgrade folder.
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/DirectoryServerTestCase.java b/opendj-server-legacy/src/test/java/org/opends/server/DirectoryServerTestCase.java
index 013569b..4344c90 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/DirectoryServerTestCase.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/DirectoryServerTestCase.java
@@ -56,10 +56,6 @@
@BeforeSuite
public final void suppressOutput() {
- System.out.println("Replication DB implementation used in tests: '" +
- ReplicationTestCase.replicationDbImplementation + "'.");
- System.out.flush();
-
TestCaseUtils.suppressOutput();
}
@@ -147,9 +143,7 @@
while (DirectoryServerTestCase.class.isAssignableFrom(cls) &&
!DirectoryServerTestCase.class.equals(cls))
{
- Field fields[] = cls.getDeclaredFields();
- for (int i = 0; i < fields.length; i++) {
- Field field = fields[i];
+ for (Field field : cls.getDeclaredFields()) {
int modifiers = field.getModifiers();
Class<?> fieldClass = field.getType();
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/ChangelogBackendTestCase.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/ChangelogBackendTestCase.java
index 1581ded..21ee482 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/ChangelogBackendTestCase.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/ChangelogBackendTestCase.java
@@ -198,7 +198,6 @@
ReplServerFakeConfiguration config = new ReplServerFakeConfiguration(
replicationServerPort,
"ChangelogBackendTestDB",
- replicationDbImplementation,
0, // purge delay
71, // server id
0, // queue size
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/ChangeNumberControlPluginTestCase.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/ChangeNumberControlPluginTestCase.java
index c767e26..1e448cf 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/ChangeNumberControlPluginTestCase.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/ChangeNumberControlPluginTestCase.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2013-2014 ForgeRock AS
+ * Portions Copyright 2013-2015 ForgeRock AS
*/
package org.opends.server.replication;
@@ -79,7 +79,6 @@
+ "cn: Replication Server\n"
+ "ds-cfg-replication-port: " + replServerPort + "\n"
+ "ds-cfg-replication-db-directory: ChangeNumberControlDbTest\n"
- + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
+ "ds-cfg-replication-server-id: 103\n";
// suffix synchronized
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java
index 5aaf401..4b0231a 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/DependencyTest.java
@@ -420,9 +420,8 @@
private ReplicationServer newReplicationServer(int replServerId, int windowSize, String dirName) throws Exception
{
int replServerPort = TestCaseUtils.findFreePort();
- ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
- replServerPort, dirName, replicationDbImplementation, 0, replServerId, 0, windowSize, null);
- return new ReplicationServer(conf);
+ return new ReplicationServer(new ReplServerFakeConfiguration(
+ replServerPort, dirName, 0, replServerId, 0, windowSize, null));
}
private LDAPReplicationDomain startNewLDAPReplicationDomain(ReplicationServer replServer, DN baseDN, int serverId,
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/GenerationIdTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/GenerationIdTest.java
index 74c9e7a..76aa3c3 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/GenerationIdTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/GenerationIdTest.java
@@ -343,9 +343,8 @@
}
int rsPort = getRSPort(replServerId);
String rsDir = "generationIdTest" + replServerId + testCase + "Db";
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rsPort, rsDir, replicationDbImplementation, 0, replServerId, 0, 100, servers);
- ReplicationServer replicationServer = new ReplicationServer(conf);
+ ReplicationServer replicationServer = new ReplicationServer(
+ new ReplServerFakeConfiguration(rsPort, rsDir, 0, replServerId, 0, 100, servers));
Thread.sleep(1000);
return replicationServer;
}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/InitOnLineTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/InitOnLineTest.java
index e120fcc..05709ae 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/InitOnLineTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/InitOnLineTest.java
@@ -491,12 +491,8 @@
}
final int port = getReplServerPort(replServerId);
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(
- port,
- "initOnlineTest" + port + testCase + "Db",
- replicationDbImplementation,
- 0, replServerId, 0, 100, servers);
+ ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
+ port, "initOnlineTest" + port + testCase + "Db", 0, replServerId, 0, 100, servers);
ReplicationServer replicationServer = new ReplicationServer(conf);
Thread.sleep(1000);
return replicationServer;
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/ProtocolWindowTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/ProtocolWindowTest.java
index 5609ef5..cd074ae 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/ProtocolWindowTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/ProtocolWindowTest.java
@@ -258,8 +258,7 @@
// configure the replication Server.
replicationServer = new ReplicationServer(new ReplServerFakeConfiguration(
- replServerPort, "protocolWindowTestDb", replicationDbImplementation,
- 0, 1, REPLICATION_QUEUE_SIZE, WINDOW_SIZE, null));
+ replServerPort, "protocolWindowTestDb", 0, 1, REPLICATION_QUEUE_SIZE, WINDOW_SIZE, null));
String personLdif = "dn: uid=user.windowTest," + TEST_ROOT_DN_STRING + "\n"
+ "objectClass: top\n" + "objectClass: person\n"
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/ReSyncTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/ReSyncTest.java
index 847beb4..89b72b0 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/ReSyncTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/ReSyncTest.java
@@ -91,7 +91,6 @@
+ "cn: Replication Server\n"
+ "ds-cfg-replication-port:" + replServerPort + "\n"
+ "ds-cfg-replication-db-directory: ReSyncTest\n"
- + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
+ "ds-cfg-replication-server-id: 104\n";
// suffix synchronized
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
index f0b7868..cbbcfb8 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
@@ -44,7 +44,6 @@
import org.forgerock.opendj.ldap.SearchScope;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
-import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
@@ -64,7 +63,6 @@
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.file.FileChangelogDB;
-import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
@@ -122,9 +120,6 @@
private static final String REPLICATION_DB_IMPL_PROPERTY = "org.opends.test.replicationDbImpl";
- public static ReplicationDBImplementation replicationDbImplementation = ReplicationDBImplementation.valueOf(
- System.getProperty(REPLICATION_DB_IMPL_PROPERTY, ReplicationDBImplementation.LOG.name()));
-
/** Replication monitor stats. */
private DN monitorDN;
private String monitorAttr;
@@ -376,16 +371,7 @@
protected void clearChangelogDB(ReplicationServer rs) throws Exception
{
- if (rs == null)
- {
- return;
- }
-
- if (replicationDbImplementation == ReplicationDBImplementation.JE)
- {
- ((JEChangelogDB) rs.getChangelogDB()).clearDB();
- }
- else
+ if (rs != null)
{
((FileChangelogDB) rs.getChangelogDB()).clearDB();
}
@@ -581,7 +567,7 @@
return null;
}
});
-
+
Entry entry = DirectoryServer.getEntry(dn);
if (entry != null)
{
@@ -934,9 +920,4 @@
}
});
}
-
- protected static void setReplicationDBImplementation(ReplicationDBImplementation impl)
- {
- replicationDbImplementation = impl;
- }
}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java
index 70c23a9..d4d93dd 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/SchemaReplicationTest.java
@@ -97,7 +97,6 @@
+ "cn: Replication Server\n"
+ "ds-cfg-replication-port: " + replServerPort + "\n"
+ "ds-cfg-replication-db-directory: SchemaReplicationTest\n"
- + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
+ "ds-cfg-replication-server-id: 105\n";
// suffix synchronized
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java
index 0bee1a1..c8e9306 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/UpdateOperationTest.java
@@ -130,7 +130,6 @@
+ "cn: Replication Server\n"
+ "ds-cfg-replication-port: " + replServerPort + "\n"
+ "ds-cfg-replication-db-directory: UpdateOperationTest\n"
- + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
+ "ds-cfg-replication-server-id: 107\n";
// suffix synchronized
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 47a96d5..11fa1dc 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -525,10 +525,8 @@
SortedSet<String> replServers = new TreeSet<>();
String dir = testName + RS_ID + testCase + "Db";
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(replServerPort, dir, replicationDbImplementation, 0, RS_ID, 0,
- 100, replServers);
- replicationServer = new ReplicationServer(conf);
+ replicationServer = new ReplicationServer(
+ new ReplServerFakeConfiguration(replServerPort, dir, 0, RS_ID, 0, 100, replServers));
}
private static final String REPLICATION_GENERATION_ID =
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/GroupIdHandshakeTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/GroupIdHandshakeTest.java
index 68d9d8e..62df3c3 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/GroupIdHandshakeTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/GroupIdHandshakeTest.java
@@ -26,6 +26,9 @@
*/
package org.opends.server.replication.plugin;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
+
import java.io.IOException;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -33,7 +36,6 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.TestCaseUtils;
-import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
@@ -41,9 +43,6 @@
import org.opends.server.types.HostPort;
import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
-import static org.testng.Assert.*;
-
/**
* Some real connections from clients that should end up with a server with
* the right groupId if available.
@@ -297,14 +296,11 @@
String dir = "groupIdHandshakeTest" + serverId + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(port, dir, replicationDbImplementation, 0, serverId, 0,
- 100, replServers, groupId, 1000, 5000);
+ new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100, replServers, groupId, 1000, 5000);
return new ReplicationServer(conf);
}
- /**
- * Creates a new ReplicationDomain.
- */
+ /** Creates a new ReplicationDomain. */
private LDAPReplicationDomain createReplicationDomain(int serverId,
int groupId, String testCase) throws Exception
{
@@ -506,8 +502,7 @@
otherReplServers.add("localhost:" + rs2Port);
String dir = "groupIdHandshakeTest" + RS3_ID + testCase + "Db";
ReplServerFakeConfiguration rsConfWithNewGid =
- new ReplServerFakeConfiguration(rs3Port, dir, replicationDbImplementation, 0, RS3_ID, 0,
- 100, otherReplServers, 1, 1000, 5000);
+ new ReplServerFakeConfiguration(rs3Port, dir, 0, RS3_ID, 0, 100, otherReplServers, 1, 1000, 5000);
rs3.applyConfigurationChange(rsConfWithNewGid);
/**
@@ -517,8 +512,8 @@
otherReplServers.add("localhost:" + rs2Port);
otherReplServers.add("localhost:" + rs3Port);
dir = "groupIdHandshakeTest" + RS1_ID + testCase + "Db";
- rsConfWithNewGid = new ReplServerFakeConfiguration(rs1Port, dir, ReplicationDBImplementation.JE, 0,
- RS1_ID, 0, 100, otherReplServers, 3, 1000, 5000);
+ rsConfWithNewGid =
+ new ReplServerFakeConfiguration(rs1Port, dir, 0, RS1_ID, 0, 100, otherReplServers, 3, 1000, 5000);
rs1.applyConfigurationChange(rsConfWithNewGid);
checkConnection(30, DS1_ID, RS3_ID,
"Change GID of RS3 to 1 and RS1 to 3, DS1 should reconnect to RS3 with GID=1");
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
index 3ab149e..ccf6518 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -334,10 +334,8 @@
int rsPort = TestCaseUtils.findFreePort();
replServers.add("localhost:" + rsPort);
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rsPort, "HistoricalCsnOrdering", replicationDbImplementation, 0,
- 1, 0, 100, replServers, 1, 1000, 5000);
- ReplicationServer replicationServer = new ReplicationServer(conf);
+ ReplicationServer replicationServer = new ReplicationServer(
+ new ReplServerFakeConfiguration(rsPort, "HistoricalCsnOrdering", 0, 1, 0, 100, replServers, 1, 1000, 5000));
clearChangelogDB(replicationServer);
return replicationServer;
}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/HistoricalTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/HistoricalTest.java
index d59774f..2bf9192 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/HistoricalTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/HistoricalTest.java
@@ -86,7 +86,6 @@
+ "cn: replication Server\n"
+ "ds-cfg-replication-port: " + replServerPort + "\n"
+ "ds-cfg-replication-db-directory: HistoricalTest\n"
- + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
+ "ds-cfg-replication-server-id: 102\n";
// The suffix to be synchronized.
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
index 837dde2..2b192da 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -26,14 +26,17 @@
*/
package org.opends.server.replication.plugin;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
+
import java.io.IOException;
import java.util.SortedSet;
import java.util.TreeSet;
import org.forgerock.i18n.LocalizableMessage;
-import org.opends.server.TestCaseUtils;
-import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.opends.server.TestCaseUtils;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
@@ -41,9 +44,6 @@
import org.opends.server.types.HostPort;
import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
-import static org.testng.Assert.*;
-
/**
* Test if the replication domain is able to switch of replication server
* if there is some replication server failure.
@@ -360,10 +360,7 @@
}
String dir = "replicationServerFailoverTest" + serverId + suffix + "Db";
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(port, dir, replicationDbImplementation, 0, serverId, 0,
- 100, replServers);
- return new ReplicationServer(conf);
+ return new ReplicationServer(new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100, replServers));
}
/**
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
index 533f29b..7794c5e 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerLoadBalancingTest.java
@@ -26,13 +26,20 @@
*/
package org.opends.server.replication.plugin;
-import java.util.*;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.assertj.core.api.Assertions;
import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.server.ReplicationServerCfg;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
@@ -40,9 +47,6 @@
import org.opends.server.types.DN;
import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
-import static org.testng.Assert.*;
-
/**
* Test in real situations the algorithm for load balancing the DSs connections
* to the RSs. This uses the weights of the RSs. We concentrate the tests on
@@ -154,9 +158,8 @@
}
String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rsPort[rsIndex], dir, replicationDbImplementation, 0, rsIndex+501, 0,
- 100, replServers, 1, 1000, 5000, weight);
+ ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
+ rsPort[rsIndex], dir, 0, rsIndex + 501, 0, 100, replServers, 1, 1000, 5000, weight);
return new ReplicationServer(conf);
}
@@ -186,8 +189,8 @@
}
String dir = "replicationServerLoadBalancingTest" + rsIndex + testCase + "Db";
- return new ReplServerFakeConfiguration(rsPort[rsIndex], dir, replicationDbImplementation,
- 0, rsIndex + 501, 0, 100, replServers, 1, 1000, 5000, weight);
+ return new ReplServerFakeConfiguration(
+ rsPort[rsIndex], dir, 0, rsIndex + 501, 0, 100, replServers, 1, 1000, 5000, weight);
}
/**
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java
index 0a7b55e..1173dc6 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/StateMachineTest.java
@@ -190,7 +190,7 @@
String dir = "stateMachineTest" + RS1_ID + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rs1Port, dir, replicationDbImplementation, 0, RS1_ID, 0,
+ new ReplServerFakeConfiguration(rs1Port, dir, 0, RS1_ID, 0,
100, replServers, 1, 1000, degradedStatusThreshold);
return new ReplicationServer(conf);
}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/TopologyViewTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/TopologyViewTest.java
index b7d2941..1f2e0f2 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -26,15 +26,23 @@
*/
package org.opends.server.replication.plugin;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
+
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
-import org.forgerock.opendj.config.server.ConfigException;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.DSInfo;
@@ -47,9 +55,6 @@
import org.opends.server.types.HostPort;
import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
-import static org.testng.Assert.*;
-
/**
* Some tests to know if at any time the view DSs and RSs have of the current
* topology is accurate, even after some connections, disconnections and
@@ -367,7 +372,7 @@
String dir = "topologyViewTest" + rsId + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rsPort, dir, replicationDbImplementation, 0, rsId, 0,
+ new ReplServerFakeConfiguration(rsPort, dir, 0, rsId, 0,
100, replServers, groupId, 1000, 5000);
return new ReplicationServer(conf);
}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 4ac3c70..10bda4d 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -391,7 +391,7 @@
String dir = testName + serverId + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(port, dir, replicationDbImplementation, 0, serverId, 0,
+ new ReplServerFakeConfiguration(port, dir, 0, serverId, 0,
100, otherRsUrls, groupId, assuredTimeout, 5000);
// No monitoring publisher to not interfere with some SocketTimeoutException
// expected at some points in these tests
@@ -2992,7 +2992,7 @@
// Create real RS
String dir = testName + RS1_ID + testCase + "Db";
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(rsPorts[0], dir, replicationDbImplementation, 0, RS1_ID, 0,
+ new ReplServerFakeConfiguration(rsPorts[0], dir, 0, RS1_ID, 0,
100, new TreeSet<String>(), DEFAULT_GID, SMALL_TIMEOUT, 1);
rs1 = new ReplicationServer(conf);
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/MonitorTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/MonitorTest.java
index 23bd6f8..87ad23b 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/MonitorTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/MonitorTest.java
@@ -26,13 +26,20 @@
*/
package org.opends.server.replication.server;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
+
import java.io.ByteArrayOutputStream;
import java.net.SocketException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
import org.forgerock.i18n.LocalizableMessage;
-import org.opends.server.TestCaseUtils;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.TestCaseUtils;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
@@ -49,9 +56,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
-import static org.testng.Assert.*;
-
/**
* Tests for the replicationServer code.
*/
@@ -173,9 +177,7 @@
}
int chPort = getChangelogPort(changelogId);
String chDir = "monitorTest" + changelogId + suffix + "Db";
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(chPort, chDir, replicationDbImplementation, 0, changelogId, 0,
- 100, servers);
+ ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(chPort, chDir, 0, changelogId, 0, 100, servers);
final DN testBaseDN = this.baseDN;
ReplicationServer replicationServer = new ReplicationServer(conf, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()
{
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplServerFakeConfiguration.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplServerFakeConfiguration.java
index 1482746..ca033be 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplServerFakeConfiguration.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -33,7 +33,6 @@
import org.opends.server.admin.Configuration;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ServerManagedObject;
-import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.types.DN;
@@ -70,19 +69,12 @@
/** The monitoring publisher period. */
private long monitoringPeriod = 3000;
private boolean computeChangenumber;
-
- /** The DB implementation to use for replication changelog. */
- private final ReplicationDBImplementation dbImpl;
- /**
- * Constructor without group id, assured info and weight.
- */
+ /** Constructor without group id, assured info and weight. */
public ReplServerFakeConfiguration(
- int port, String dirName, ReplicationDBImplementation dbImpl, int purgeDelay,
- int serverId, int queueSize, int windowSize, SortedSet<String> servers)
+ int port, String dirName, int purgeDelay, int serverId, int queueSize, int windowSize, SortedSet<String> servers)
{
this.port = port;
- this.dbImpl = dbImpl;
this.dirName = dirName != null ? dirName : "changelogDb";
if (purgeDelay == 0)
@@ -121,30 +113,25 @@
* Constructor with group id and assured info.
*/
public ReplServerFakeConfiguration(
- int port, String dirName, ReplicationDBImplementation dbImpl, int purgeDelay,
- int serverId, int queueSize, int windowSize,
+ int port, String dirName, int purgeDelay, int serverId, int queueSize, int windowSize,
SortedSet<String> servers, int groupId, long assuredTimeout, int degradedStatusThreshold)
{
- this(port, dirName, dbImpl, purgeDelay, serverId, queueSize, windowSize, servers);
+ this(port, dirName, purgeDelay, serverId, queueSize, windowSize, servers);
this.groupId = groupId;
this.assuredTimeout = assuredTimeout;
this.degradedStatusThreshold = degradedStatusThreshold;
}
- /**
- * Constructor with group id, assured info and weight.
- */
+ /** Constructor with group id, assured info and weight. */
public ReplServerFakeConfiguration(
- int port, String dirName, ReplicationDBImplementation dbImpl, int purgeDelay,
- int serverId, int queueSize, int windowSize,
+ int port, String dirName, int purgeDelay, int serverId, int queueSize, int windowSize,
SortedSet<String> servers, int groupId, long assuredTimeout, int degradedStatusThreshold, int weight)
{
- this(port, dirName, dbImpl, purgeDelay, serverId, queueSize, windowSize,
+ this(port, dirName, purgeDelay, serverId, queueSize, windowSize,
servers, groupId, assuredTimeout, degradedStatusThreshold);
this.weight = weight;
}
- /** {@inheritDoc} */
@Override
public void addChangeListener(
ConfigurationChangeListener<ReplicationServerCfg> listener)
@@ -152,82 +139,69 @@
// not supported
}
- /** {@inheritDoc} */
@Override
public Class<? extends ReplicationServerCfg> configurationClass()
{
return null;
}
- /** {@inheritDoc} */
@Override
public String getReplicationDBDirectory()
{
return dirName;
}
- /** {@inheritDoc} */
@Override
public int getReplicationPort()
{
return port;
}
- /** {@inheritDoc} */
@Override
public long getReplicationPurgeDelay()
{
return purgeDelay;
}
- /** {@inheritDoc} */
@Override
public SortedSet<String> getReplicationServer()
{
return servers;
}
- /** {@inheritDoc} */
@Override
public int getReplicationServerId()
{
return serverId;
}
- /** {@inheritDoc} */
@Override
public InetAddress getSourceAddress() { return null; }
- /** {@inheritDoc} */
@Override
public int getQueueSize()
{
return queueSize;
}
- /** {@inheritDoc} */
@Override
public int getWindowSize()
{
return windowSize;
}
- /** {@inheritDoc} */
@Override
- public void removeChangeListener(
- ConfigurationChangeListener<ReplicationServerCfg> listener)
+ public void removeChangeListener(ConfigurationChangeListener<ReplicationServerCfg> listener)
{
// not supported
}
- /** {@inheritDoc} */
@Override
public DN dn()
{
return null;
}
- /** {@inheritDoc} */
public ServerManagedObject<? extends Configuration> managedObject() {
return null;
}
@@ -282,10 +256,4 @@
{
this.computeChangenumber = computeChangenumber;
}
-
- @Override
- public ReplicationDBImplementation getReplicationDBImplementation()
- {
- return dbImpl;
- }
}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
index c3a47fd..c141d18 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
@@ -26,15 +26,15 @@
*/
package org.opends.server.replication.server;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.*;
+
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.DN;
import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
-import static org.testng.Assert.*;
-
/**
* Tests that we can dynamically modify the configuration of replicationServer.
*/
@@ -55,9 +55,7 @@
int[] ports = TestCaseUtils.findFreePorts(2);
// instantiate a Replication server using the first port number.
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(
- ports[0], null, replicationDbImplementation, 0, 1, 0, 0, null);
+ ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(ports[0], null, 0, 1, 0, 0, null);
replicationServer = new ReplicationServer(conf);
// Most of the configuration change are trivial to apply.
@@ -65,10 +63,7 @@
// build a new ReplServerFakeConfiguration with a new server port
// apply this new configuration and check that it is now possible to
// connect to this new portnumber.
- ReplServerFakeConfiguration newconf =
- new ReplServerFakeConfiguration(
- ports[1], null, replicationDbImplementation, 0, 1, 0, 0, null);
-
+ ReplServerFakeConfiguration newconf = new ReplServerFakeConfiguration(ports[1], null, 0, 1, 0, 0, null);
replicationServer.applyConfigurationChange(newconf);
ReplicationBroker broker = openReplicationSession(
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerTest.java
index e0123f9..f13b46b 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/ReplicationServerTest.java
@@ -115,7 +115,6 @@
"--provider-name", "Multimaster Synchronization",
"--set", "replication-db-directory:" + "replicationServerTestConfigureDb",
"--set", "replication-port:" + replicationServerPort,
- "--set", "replication-db-implementation:" + replicationDbImplementation,
"--set", "replication-server-id:71");
for (SynchronizationProvider<?> provider : DirectoryServer
@@ -668,7 +667,7 @@
"localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]));
ReplServerFakeConfiguration conf =
new ReplServerFakeConfiguration(changelogPorts[i], "replicationServerTestChangelogChainingDb"+i,
- replicationDbImplementation, 0, changelogIds[i], 0, 100, servers);
+ 0, changelogIds[i], 0, 100, servers);
changelogs[i] = new ReplicationServer(conf);
}
@@ -761,7 +760,7 @@
SortedSet<String> servers = new TreeSet<>();
servers.add("localhost:" + changelogPorts[1]);
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPorts[0], "replicationServerTestChangelogChainingDb"+0, replicationDbImplementation,
+ new ReplServerFakeConfiguration(changelogPorts[0], "replicationServerTestChangelogChainingDb" + 0,
0, changelogIds[0], 0, 100, servers);
changelogs[0] = new ReplicationServer(conf);
}
@@ -813,7 +812,7 @@
SortedSet<String> servers = new TreeSet<>();
servers.add("localhost:"+changelogPorts[0]);
ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
- changelogPorts[1], null, replicationDbImplementation, 0, changelogIds[1], 0, 100, null);
+ changelogPorts[1], null, 0, changelogIds[1], 0, 100, null);
changelogs[1] = new ReplicationServer(conf);
// Connect broker 2 to changelog2
@@ -1140,7 +1139,7 @@
servers.add("localhost:" + changelogPorts[1]);
}
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPorts[i], "replicationServerTestReplicationServerConnectedDb"+i, replicationDbImplementation,
+ new ReplServerFakeConfiguration(changelogPorts[i], "replicationServerTestReplicationServerConnectedDb" + i,
0, changelogIds[i], 0, 100, servers);
changelogs[i] = new ReplicationServer(conf);
}
@@ -1184,8 +1183,7 @@
SortedSet<String> servers = new TreeSet<>();
// Configure replicationServer[0] to be disconnected from ReplicationServer[1]
ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPorts[0], "changelogDb0", replicationDbImplementation,
- 0, changelogIds[0], 0, 100, servers);
+ new ReplServerFakeConfiguration(changelogPorts[0], "changelogDb0", 0, changelogIds[0], 0, 100, servers);
changelogs[0].applyConfigurationChange(conf) ;
// The link between RS[0] & RS[1] should be destroyed by the new configuration.
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
index 64484b6..2f8a9c8 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -25,9 +25,12 @@
*/
package org.opends.server.replication.server.changelog.file;
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.FileReplicaDBTest.*;
+import static org.testng.Assert.*;
+
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.TestCaseUtils;
-import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
@@ -40,10 +43,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import static org.assertj.core.api.Assertions.*;
-import static org.opends.server.replication.server.changelog.file.FileReplicaDBTest.*;
-import static org.testng.Assert.*;
-
@SuppressWarnings("javadoc")
public class FileChangeNumberIndexDBTest extends ReplicationTestCase
{
@@ -96,14 +95,18 @@
assertEquals(cnIndexDB.count(), 3, "Db count");
assertFalse(cnIndexDB.isEmpty());
- DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
- assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
-
- cursor = cnIndexDB.getCursorFrom(cn2);
- assertCursorReadsInOrder(cursor, cn2, cn3);
-
- cursor = cnIndexDB.getCursorFrom(cn3);
- assertCursorReadsInOrder(cursor, cn3);
+ try (DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1))
+ {
+ assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
+ }
+ try (DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn2))
+ {
+ assertCursorReadsInOrder(cursor, cn2, cn3);
+ }
+ try (DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn3))
+ {
+ assertCursorReadsInOrder(cursor, cn3);
+ }
}
finally
{
@@ -205,12 +208,6 @@
return cnIndexDB.addRecord(new ChangeNumberIndexRecord(baseDN, csn));
}
- private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN)
- {
- assertEquals(record.getCSN(), csn);
- assertEquals(record.getBaseDN(), baseDN);
- }
-
private FileChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
{
final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB();
@@ -241,27 +238,19 @@
{
TestCaseUtils.startServer();
final int port = TestCaseUtils.findFreePort();
- final ReplServerFakeConfiguration cfg =
- new ReplServerFakeConfiguration(port, null, ReplicationDBImplementation.LOG, 0, 2, 0, 100, null);
+ final ReplServerFakeConfiguration cfg = new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null);
cfg.setComputeChangeNumber(true);
return new ReplicationServer(cfg);
}
- private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor,
- long... cns) throws ChangelogException
+ private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor, long... cns)
+ throws ChangelogException
{
- try
+ for (long cn : cns)
{
- for (long cn : cns)
- {
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getChangeNumber(), cn);
- }
- assertFalse(cursor.next());
+ assertTrue(cursor.next());
+ assertEquals(cursor.getRecord().getChangeNumber(), cn);
}
- finally
- {
- cursor.close();
- }
+ assertFalse(cursor.next());
}
}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
index 8d72446..9b493e0 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -35,8 +35,6 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.util.time.TimeService;
import org.opends.server.TestCaseUtils;
-import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
-import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
@@ -533,9 +531,8 @@
throws IOException, ConfigException
{
final int changelogPort = findFreePort();
- final ReplicationServerCfg conf = new ReplServerFakeConfiguration(
- changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize, windowSize, null);
- return new ReplicationServer(conf);
+ return new ReplicationServer(
+ new ReplServerFakeConfiguration(changelogPort, null, 0, 2, queueSize, windowSize, null));
}
private FileReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
deleted file mode 100644
index 8f5a6d7..0000000
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2014 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import org.opends.server.TestCaseUtils;
-import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
-import org.opends.server.replication.ReplicationTestCase;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.replication.server.ReplServerFakeConfiguration;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
-import org.opends.server.replication.server.changelog.api.ChangelogDB;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.types.DN;
-import org.opends.server.util.StaticUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import static org.opends.server.replication.server.changelog.je.JEReplicaDBTest.*;
-import static org.testng.Assert.*;
-
-/**
- * Test the JEChangeNumberIndexDB class.
- */
-@SuppressWarnings("javadoc")
-public class JEChangeNumberIndexDBTest extends ReplicationTestCase
-{
- private final MultiDomainServerState previousCookie =
- new MultiDomainServerState();
-
- private static final ReplicationDBImplementation previousDBImpl = replicationDbImplementation;
-
- @BeforeClass
- public void setDBImpl()
- {
- setReplicationDBImplementation(ReplicationDBImplementation.JE);
- }
-
- @AfterClass
- public void resetDBImplToPrevious()
- {
- setReplicationDBImplementation(previousDBImpl);
- }
-
- /**
- * This test makes basic operations of a JEChangeNumberIndexDB:
- * <ol>
- * <li>create the db</li>
- * <li>add records</li>
- * <li>read them with a cursor</li>
- * <li>set a very short purge period</li>
- * <li>wait for the db to be purged / here since the changes are not stored
- * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
- * </ol>
- */
- @Test
- public void testPurge() throws Exception
- {
- ReplicationServer replicationServer = null;
- try
- {
- replicationServer = newReplicationServer();
- final ChangelogDB changelogDB = replicationServer.getChangelogDB();
- changelogDB.setPurgeDelay(0); // disable purging
-
- // Prepare data to be stored in the db
- DN baseDN1 = DN.valueOf("o=baseDN1");
- DN baseDN2 = DN.valueOf("o=baseDN2");
- DN baseDN3 = DN.valueOf("o=baseDN3");
-
- CSN[] csns = generateCSNs(1, 0, 3);
-
- // Add records
- final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
- long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
- addRecord(cnIndexDB, baseDN2, csns[1]);
- long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
-
- // The ChangeNumber should not get purged
- final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
- assertEquals(oldestCN, cn1);
- assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
-
- DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
- try
- {
- assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[0], baseDN1);
- assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[1], baseDN2);
- assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[2], baseDN3);
- assertFalse(cursor.next());
- }
- finally
- {
- StaticUtils.close(cursor);
- }
-
- // Now test that purging removes all changes bar the last one
- changelogDB.setPurgeDelay(1);
- int count = 0;
- while (cnIndexDB.count() > 1 && count < 100)
- {
- Thread.sleep(10);
- count++;
- }
- assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
- }
- finally
- {
- remove(replicationServer);
- }
- }
-
- private long addRecord(JEChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
- {
- return cnIndexDB.addRecord(new ChangeNumberIndexRecord(baseDN, csn));
- }
-
- private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN)
- {
- assertEquals(record.getCSN(), csn);
- assertEquals(record.getBaseDN(), baseDN);
- }
-
- private JEChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
- {
- final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
- final JEChangeNumberIndexDB cnIndexDB =
- (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB();
- assertTrue(cnIndexDB.isEmpty());
- return cnIndexDB;
- }
-
- /**
- * This test makes basic operations of a JEChangeNumberIndexDB and explicitly
- * calls the clear() method instead of waiting for the periodic trim to clear
- * it.
- * <ol>
- * <li>create the db</li>
- * <li>add records</li>
- * <li>read them with a cursor</li>
- * <li>clear the db</li>
- * </ol>
- */
- @Test
- public void testClear() throws Exception
- {
- ReplicationServer replicationServer = null;
- try
- {
- replicationServer = newReplicationServer();
- final ChangelogDB changelogDB = replicationServer.getChangelogDB();
- changelogDB.setPurgeDelay(0);
-
- // Prepare data to be stored in the db
-
- DN baseDN1 = DN.valueOf("o=baseDN1");
- DN baseDN2 = DN.valueOf("o=baseDN2");
- DN baseDN3 = DN.valueOf("o=baseDN3");
-
- CSN[] csns = generateCSNs(1, 0, 3);
-
- // Add records
- final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
- long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
- long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]);
- long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
-
- // Checks
- assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1);
- assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
-
- assertEquals(cnIndexDB.count(), 3, "Db count");
- assertFalse(cnIndexDB.isEmpty());
-
- DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
- assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
-
- cursor = cnIndexDB.getCursorFrom(cn2);
- assertCursorReadsInOrder(cursor, cn2, cn3);
-
- cursor = cnIndexDB.getCursorFrom(cn3);
- assertCursorReadsInOrder(cursor, cn3);
-
- cnIndexDB.removeDomain(null);
- assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
-
- // Check the db is cleared.
- cnIndexDB.clear();
- assertNull(cnIndexDB.getOldestRecord());
- assertNull(cnIndexDB.getNewestRecord());
- assertEquals(cnIndexDB.count(), 0);
- assertTrue(cnIndexDB.isEmpty());
- }
- finally
- {
- remove(replicationServer);
- }
- }
-
- /**
- * The newest record is no longer cleared to ensure persistence to the last
- * generated change number across server restarts.
- */
- private void assertOnlyNewestRecordIsLeft(JEChangeNumberIndexDB cnIndexDB,
- int newestChangeNumber) throws ChangelogException
- {
- assertEquals(cnIndexDB.count(), 1);
- assertFalse(cnIndexDB.isEmpty());
- final ChangeNumberIndexRecord oldest = cnIndexDB.getOldestRecord();
- final ChangeNumberIndexRecord newest = cnIndexDB.getNewestRecord();
- assertEquals(oldest.getChangeNumber(), newestChangeNumber);
- assertEquals(oldest.getChangeNumber(), newest.getChangeNumber());
- assertEquals(oldest.getBaseDN(), newest.getBaseDN());
- assertEquals(oldest.getCSN(), newest.getCSN());
- }
-
- private ReplicationServer newReplicationServer() throws Exception
- {
- TestCaseUtils.startServer();
- final int port = TestCaseUtils.findFreePort();
- final ReplServerFakeConfiguration cfg =
- new ReplServerFakeConfiguration(port, null, ReplicationDBImplementation.JE, 0, 2, 0, 100, null);
- cfg.setComputeChangeNumber(true);
- return new ReplicationServer(cfg);
- }
-
- private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor,
- long... cns) throws ChangelogException
- {
- try
- {
- for (long cn : cns)
- {
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getChangeNumber(), cn);
- }
- assertFalse(cursor.next());
- }
- finally
- {
- cursor.close();
- }
- }
-}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
deleted file mode 100644
index b2ea0c4..0000000
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ /dev/null
@@ -1,570 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-
-import org.assertj.core.api.SoftAssertions;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.opendj.config.server.ConfigException;
-import org.opends.server.TestCaseUtils;
-import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
-import org.opends.server.admin.std.server.ReplicationServerCfg;
-import org.opends.server.replication.ReplicationTestCase;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.CSNGenerator;
-import org.opends.server.replication.protocol.DeleteMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.server.ReplServerFakeConfiguration;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
-import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
-import org.opends.server.types.DN;
-import org.opends.server.util.CollectionUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import static org.assertj.core.api.Assertions.*;
-import static org.opends.server.TestCaseUtils.*;
-import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
-import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
-import static org.testng.Assert.*;
-
-/** Test the JEReplicaDB class. */
-@SuppressWarnings("javadoc")
-public class JEReplicaDBTest extends ReplicationTestCase
-{
- /** The tracer object for the debug logger. */
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
- private DN TEST_ROOT_DN;
-
- private static final ReplicationDBImplementation previousDBImpl = replicationDbImplementation;
- private ReplicationServer replicationServer;
- private JEReplicaDB replicaDB;
-
- @BeforeClass
- public void setDBImpl()
- {
- setReplicationDBImplementation(ReplicationDBImplementation.JE);
- }
-
- @AfterClass
- public void resetDBImplToPrevious()
- {
- setReplicationDBImplementation(previousDBImpl);
- }
-
- /**
- * Utility - log debug message - highlight it is from the test and not
- * from the server code. Makes easier to observe the test steps.
- */
- private void debugInfo(String tn, String s)
- {
- if (logger.isTraceEnabled())
- {
- logger.trace("** TEST " + tn + " ** " + s);
- }
- }
-
- @BeforeClass
- public void setup() throws Exception
- {
- TEST_ROOT_DN = DN.valueOf(TEST_ROOT_DN_STRING);
- }
-
- @DataProvider
- Object[][] cursorData()
- {
- // create 7 csns
- final CSN[] sevenCsns = generateCSNs(1, System.currentTimeMillis(), 7);
- CSN beforeCsn = sevenCsns[0];
- CSN middleCsn = sevenCsns[3]; // will be between csns[1] and csns[2]
- CSN afterCsn = sevenCsns[6];
-
- // but use only 4 of them for update msg
- // beforeCsn, middleCsn and afterCsn are not used
- // in order to test cursor generation from a key not present in the log (before, in the middle, after)
- final List<CSN> usedCsns = CollectionUtils.newArrayList(sevenCsns);
- usedCsns.remove(beforeCsn);
- usedCsns.remove(middleCsn);
- usedCsns.remove(afterCsn);
- final CSN[] csns = usedCsns.toArray(new CSN[4]);
-
- return new Object[][] {
- // equal matching
- { csns, beforeCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
- { csns, csns[0], EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
- { csns, csns[1], EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
- { csns, middleCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
- { csns, csns[2], EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
- { csns, csns[3], EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
- { csns, afterCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
-
- { csns, beforeCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
- { csns, csns[0], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 },
- { csns, csns[1], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
- { csns, middleCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
- { csns, csns[2], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 },
- { csns, csns[3], EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
- { csns, afterCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
-
- // less than or equal matching
- { csns, beforeCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
- { csns, csns[0], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
- { csns, csns[1], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
- { csns, middleCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
- { csns, csns[2], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
- { csns, csns[3], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
- { csns, afterCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
-
- { csns, beforeCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
- { csns, csns[0], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 },
- { csns, csns[1], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
- { csns, middleCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
- { csns, csns[2], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 },
- { csns, csns[3], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
- { csns, afterCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
-
- // greater than or equal matching
- { csns, beforeCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
- { csns, csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
- { csns, csns[1], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
- { csns, middleCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
- { csns, csns[2], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
- { csns, csns[3], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
- { csns, afterCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
-
- { csns, beforeCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 0, 3 },
- { csns, csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 },
- { csns, csns[1], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
- { csns, middleCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
- { csns, csns[2], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 },
- { csns, csns[3], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
- { csns, afterCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
- { null, null, null, null, -1, -1 } // stop line
- };
- }
-
- /**
- * Test the cursor with all acceptable strategies combination.
- * Creation of a replication server is costly so it is created only once on first test and cleaned after the
- * last test using the stop line in data to do so.
- */
- @Test(dataProvider="cursorData")
- public void testGenerateCursor(CSN[] csns, CSN startCsn, KeyMatchingStrategy matchingStrategy,
- PositionStrategy positionStrategy, int startIndex, int endIndex) throws Exception
- {
- try
- {
- if (replicationServer == null)
- {
- // initialize only once
- TestCaseUtils.startServer();
- replicationServer = configureReplicationServer(100000, 10);
- replicaDB = newReplicaDB(replicationServer);
- for (CSN csn : csns)
- {
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
- }
- }
- if (csns == null)
- {
- return; // stop line, time to clean replication artifacts
- }
-
- try (DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCsn, matchingStrategy, positionStrategy))
- {
- if (startIndex != -1)
- {
- assertThatCursorCanBeFullyReadFromStart(cursor, csns, startIndex, endIndex);
- }
- else
- {
- assertThatCursorIsExhausted(cursor);
- }
- }
- }
- finally
- {
- if (csns == null)
- {
- // stop line, stop and remove replication
- shutdown(replicaDB);
- remove(replicationServer);
- }
- }
- }
-
- @Test
- public void testTrim() throws Exception
- {
- ReplicationServer replicationServer = null;
- JEReplicaDB replicaDB = null;
- try
- {
- TestCaseUtils.startServer();
- replicationServer = configureReplicationServer(100, 5000);
- replicaDB = newReplicaDB(replicationServer);
-
- CSN[] csns = generateCSNs(1, 0, 5);
-
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
- DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid");
-
- //--
- // Iterator tests with changes persisted
- assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
- assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
-
- assertEquals(replicaDB.getOldestCSN(), csns[0]);
- assertEquals(replicaDB.getNewestCSN(), csns[2]);
-
- //--
- // Cursor tests with changes persisted
- replicaDB.add(update4);
-
- assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
- // Test cursor from existing CSN
- assertFoundInOrder(replicaDB, csns[2], csns[3]);
- assertFoundInOrder(replicaDB, csns[3]);
- assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
-
- replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
-
- int count = 0;
- boolean purgeSucceeded = false;
- final CSN expectedNewestCSN = csns[3];
- do
- {
- Thread.sleep(10);
-
- final CSN oldestCSN = replicaDB.getOldestCSN();
- final CSN newestCSN = replicaDB.getNewestCSN();
- purgeSucceeded =
- oldestCSN.equals(expectedNewestCSN)
- && newestCSN.equals(expectedNewestCSN);
- count++;
- }
- while (!purgeSucceeded && count < 100);
- assertTrue(purgeSucceeded);
- }
- finally
- {
- shutdown(replicaDB);
- remove(replicationServer);
- }
- }
-
- static CSN[] newCSNs(int serverId, long timestamp, int number)
- {
- CSNGenerator gen = new CSNGenerator(serverId, timestamp);
- CSN[] csns = new CSN[number];
- for (int i = 0; i < csns.length; i++)
- {
- csns[i] = gen.newCSN();
- }
- return csns;
- }
-
- private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
- throws IOException, ConfigException
- {
- final int changelogPort = findFreePort();
- final ReplicationServerCfg conf =
- new ReplServerFakeConfiguration(changelogPort, null, ReplicationDBImplementation.JE, 0, 2, queueSize, windowSize, null);
- return new ReplicationServer(conf);
- }
-
- private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
- {
- final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
- return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
- }
-
- private File createCleanDir() throws IOException
- {
- String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
- String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
- + File.separator + "build");
- path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB";
- final File testRoot = new File(path);
- TestCaseUtils.deleteDirectory(testRoot);
- testRoot.mkdirs();
- return testRoot;
- }
-
- private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception
- {
- if (csns.length == 0)
- {
- return;
- }
-
- assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
- assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
- }
-
- /**
- * Test the feature of clearing a JEReplicaDB used by a replication server.
- * The clear feature is used when a replication server receives a request to
- * reset the generationId of a given domain.
- */
- @Test
- public void testClear() throws Exception
- {
- ReplicationServer replicationServer = null;
- JEReplicaDB replicaDB = null;
- try
- {
- TestCaseUtils.startServer();
- replicationServer = configureReplicationServer(100, 5000);
- replicaDB = newReplicaDB(replicationServer);
-
- CSN[] csns = generateCSNs(1, 0, 3);
-
- // Add the changes and check they are here
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
-
- assertEquals(csns[0], replicaDB.getOldestCSN());
- assertEquals(csns[2], replicaDB.getNewestCSN());
-
- // Clear DB and check it is cleared.
- replicaDB.clear();
-
- assertNull(replicaDB.getOldestCSN());
- assertNull(replicaDB.getNewestCSN());
- }
- finally
- {
- shutdown(replicaDB);
- remove(replicationServer);
- }
- }
-
- private void advanceCursorUpTo(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex, int endIndex)
- throws Exception
- {
- for (int i = startIndex; i <= endIndex; i++)
- {
- assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
- assertThat(cursor.getRecord().getCSN()).isEqualTo(csns[i]);
- }
- }
-
- private void assertThatCursorIsExhausted(DBCursor<UpdateMsg> cursor) throws Exception
- {
- final SoftAssertions softly = new SoftAssertions();
- softly.assertThat(cursor.next()).isFalse();
- softly.assertThat(cursor.getRecord()).isNull();
- softly.assertAll();
- }
-
- private void assertThatCursorCanBeFullyRead(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex, int endIndex)
- throws Exception
- {
- advanceCursorUpTo(cursor, csns, startIndex, endIndex);
- assertThatCursorIsExhausted(cursor);
- }
-
- private void assertThatCursorCanBeFullyReadFromStart(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex,
- int endIndex) throws Exception
- {
- assertThat(cursor.getRecord()).isNull();
- assertThatCursorCanBeFullyRead(cursor, csns, startIndex, endIndex);
- }
-
- private void assertNotFound(JEReplicaDB replicaDB, final CSN startCSN,
- final PositionStrategy positionStrategy) throws ChangelogException
- {
- try (DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
- startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy))
- {
- final SoftAssertions softly = new SoftAssertions();
- softly.assertThat(cursor.next()).isFalse();
- softly.assertThat(cursor.getRecord()).isNull();
- softly.assertAll();
- }
- }
-
- /**
- * Test the logic that manages counter records in the JEReplicaDB in order to
- * optimize the oldest and newest records in the replication changelog db.
- */
- @Test(groups = { "opendj-256" })
- public void testGetOldestNewestCSNs() throws Exception
- {
- // It's worth testing with 2 different setting for counterRecord
- // - a counter record is put every 10 Update msg in the db - just a unit
- // setting.
- // - a counter record is put every 1000 Update msg in the db - something
- // closer to real setting.
- // In both cases, we want to test the counting algorithm,
- // - when start and stop are before the first counter record,
- // - when start and stop are before and after the first counter record,
- // - when start and stop are after the first counter record,
- // - when start and stop are before and after more than one counter record,
- // After a purge.
- // After shutting down/closing and reopening the db.
- testGetOldestNewestCSNs(40, 10);
- testGetOldestNewestCSNs(4000, 1000);
- }
-
- private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception
- {
- String tn = "testDBCount("+max+","+counterWindow+")";
- debugInfo(tn, "Starting test");
-
- File testRoot = null;
- ReplicationServer replicationServer = null;
- ReplicationDbEnv dbEnv = null;
- JEReplicaDB replicaDB = null;
- try
- {
- TestCaseUtils.startServer();
- replicationServer = configureReplicationServer(100000, 10);
-
- testRoot = createCleanDir();
- dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer);
- replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
- replicaDB.setCounterRecordWindowSize(counterWindow);
-
- // Populate the db with 'max' msg
- int mySeqnum = 1;
- CSN csns[] = new CSN[2 * (max + 1)];
- long now = System.currentTimeMillis();
- for (int i=1; i<=max; i++)
- {
- csns[i] = new CSN(now + i, mySeqnum, 1);
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
- mySeqnum+=2;
- }
-
- assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
- assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
-
- // Now we want to test that after closing and reopening the db, the
- // counting algo is well reinitialized and when new messages are added
- // the new counter are correctly generated.
- debugInfo(tn, "SHUTDOWN replicaDB and recreate");
- replicaDB.shutdown();
-
- replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
- replicaDB.setCounterRecordWindowSize(counterWindow);
-
- assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
- assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
-
- // Populate the db with 'max' msg
- for (int i=max+1; i<=2 * max; i++)
- {
- csns[i] = new CSN(now + i, mySeqnum, 1);
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
- mySeqnum+=2;
- }
-
- assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
- assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
-
- replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
-
- String testcase = "AFTER PURGE (oldest, newest)=";
- debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());
- assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest=");
-
- // Clear ...
- debugInfo(tn,"clear:");
- replicaDB.clear();
-
- // Check the db is cleared.
- assertNull(replicaDB.getOldestCSN());
- assertNull(replicaDB.getNewestCSN());
- debugInfo(tn,"Success");
- }
- finally
- {
- shutdown(replicaDB);
- if (dbEnv != null)
- {
- dbEnv.shutdown();
- }
- remove(replicationServer);
- TestCaseUtils.deleteDirectory(testRoot);
- }
- }
-
- private void shutdown(JEReplicaDB replicaDB)
- {
- if (replicaDB != null)
- {
- replicaDB.shutdown();
- }
- }
-
- static CSN[] generateCSNs(int serverId, long timestamp, int number)
- {
- CSNGenerator gen = new CSNGenerator(serverId, timestamp);
- CSN[] csns = new CSN[number];
- for (int i = 0; i < csns.length; i++)
- {
- csns[i] = gen.newCSN();
- }
- return csns;
- }
-
- private void assertFoundInOrder(JEReplicaDB replicaDB,
- final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
- {
- try (DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
- csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy))
- {
- assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
-
- for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
- {
- final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
- final SoftAssertions softly = new SoftAssertions();
- softly.assertThat(cursor.next()).as(msg).isTrue();
- softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
- softly.assertAll();
- }
- final SoftAssertions softly = new SoftAssertions();
- softly.assertThat(cursor.next()).isFalse();
- softly.assertThat(cursor.getRecord()).isNull();
- softly.assertAll();
- }
- }
-}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
deleted file mode 100644
index b3c4b7f..0000000
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- * Copyright 2014-2015 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.HashSet;
-
-import org.opends.server.DirectoryServerTestCase;
-import org.opends.server.TestCaseUtils;
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.server.ChangelogState;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.types.DN;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-import com.sleepycat.je.Database;
-import com.sleepycat.je.Environment;
-
-import static java.util.Arrays.*;
-import static java.util.Collections.*;
-
-import static org.assertj.core.api.Assertions.*;
-import static org.opends.server.replication.server.changelog.je.ReplicationDbEnv.*;
-
-@SuppressWarnings("javadoc")
-public class ReplicationDbEnvTest extends DirectoryServerTestCase
-{
-
- /**
- * Bypass heavyweight setup.
- */
- private final class TestableReplicationDbEnv extends ReplicationDbEnv
- {
- private TestableReplicationDbEnv() throws ChangelogException
- {
- super(null, null);
- }
-
- @Override
- protected Environment openJEEnvironment(String path)
- {
- return null;
- }
-
- @Override
- protected Database openDatabase(String databaseName) throws ChangelogException, RuntimeException
- {
- return null;
- }
-
- @Override
- protected ChangelogState readOnDiskChangelogState() throws ChangelogException
- {
- return new ChangelogState();
- }
- }
-
- @BeforeClass
- public void setup() throws Exception
- {
- TestCaseUtils.startFakeServer();
- }
-
- @AfterClass
- public void teardown()
- {
- TestCaseUtils.shutdownFakeServer();
- }
-
- @DataProvider
- public Object[][] changelogStateDataProvider() throws Exception
- {
- final int genId = 524157415;
- final int id1 = 42;
- final int id2 = 346;
- final int t1 = 1956245524;
- return new Object[][] {
- { DN.valueOf("dc=example,dc=com"), genId, EMPTY_LIST, EMPTY_LIST },
- { DN.valueOf("dc=example,dc=com"), genId, asList(id1, id2),
- asList(new CSN(id2, 0, t1)) },
- // test with a space in the baseDN (space is the field separator in the DB)
- { DN.valueOf("cn=admin data"), genId, asList(id1, id2), EMPTY_LIST }, };
- }
-
- @Test(dataProvider = "changelogStateDataProvider")
- public void encodeDecodeChangelogState(DN baseDN, long generationId,
- List<Integer> replicas, List<CSN> offlineReplicas) throws Exception
- {
- final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv();
-
- // encode data
- final Map<byte[], byte[]> wholeState = new LinkedHashMap<>();
- put(wholeState, toGenIdEntry(baseDN, generationId));
- for (Integer serverId : replicas)
- {
- put(wholeState, toByteArray(toReplicaEntry(baseDN, serverId)));
- }
- for (CSN offlineCSN : offlineReplicas)
- {
- put(wholeState, toReplicaOfflineEntry(baseDN, offlineCSN));
- }
-
- // decode data
- final ChangelogState state =
- changelogStateDB.decodeChangelogState(wholeState);
- assertThat(state.getDomainToGenerationId()).containsExactly(
- entry(baseDN, generationId));
- if (!replicas.isEmpty())
- {
- assertThat(state.getDomainToServerIds())
- .containsExactly(entry(baseDN, new HashSet<Integer>(replicas)));
- }
- else
- {
- assertThat(state.getDomainToServerIds()).isEmpty();
- }
- if (!offlineReplicas.isEmpty())
- {
- assertThat(state.getOfflineReplicas().getSnapshot())
- .containsExactly(entry(baseDN, offlineReplicas));
- }
- else
- {
- assertThat(state.getOfflineReplicas()).isEmpty();
- }
- }
-
- private void put(Map<byte[], byte[]> map, Entry<byte[], byte[]> entry)
- {
- map.put(entry.getKey(), entry.getValue());
- }
-
-}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/service/ReplicationDomainTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/service/ReplicationDomainTest.java
index 138e255..a5bab98 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/service/ReplicationDomainTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -307,10 +307,8 @@
int replicationPort, String dirName, int windowSize,
SortedSet<String> replServers) throws Exception
{
- ReplServerFakeConfiguration cfg =
- new ReplServerFakeConfiguration(replicationPort, dirName, replicationDbImplementation, 0,
- serverId, 0, windowSize, replServers);
- return new ReplicationServer(cfg);
+ return new ReplicationServer(
+ new ReplServerFakeConfiguration(replicationPort, dirName, 0, serverId, 0, windowSize, replServers));
}
private void disable(ReplicationDomain... domains)
--
Gitblit v1.10.0