From c64a67b3d0b51743d9f2a2bf110cb365b8b104af Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 26 Aug 2013 08:41:18 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 168 ++++++++++++++++++++++++++-----------------------------
1 files changed, 80 insertions(+), 88 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 01afb43..7b16799 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -52,6 +52,7 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.je.DbHandler;
import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
@@ -65,6 +66,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.types.ResultCode.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
@@ -143,23 +145,22 @@
private long monitoringPublisherPeriod = 3000;
/**
- * The handler of the draft change numbers database, the database used to
- * store the relation between a draft change number ('seqnum') and the
- * associated cookie.
+ * The handler of the changelog database, the database stores the relation
+ * between a draft change number ('seqnum') and the associated cookie.
* <p>
- * Guarded by draftCNLock
+ * Guarded by changelogDBLock
*/
- private DraftCNDbHandler draftCNDbHandler;
+ private ChangelogDB changelogDB;
/**
* The last value generated of the draft change number.
* <p>
- * Guarded by draftCNLock
+ * Guarded by changelogDBLock
**/
private int lastGeneratedDraftCN = 0;
- /** Used for protecting draft CN related state. */
- private final Object draftCNLock = new Object();
+ /** Used for protecting changelogDB related state. */
+ private final Object changelogDBLock = new Object();
/**
* The tracer object for the debug logger.
@@ -183,7 +184,7 @@
private long domainTicket = 0L;
/** BaseDNs excluded for ECL. */
- private Collection<String> excludedBaseDNs = new ArrayList<String>();
+ private Set<String> excludedBaseDNs = new HashSet<String>();
/**
* The weight affected to the replication server.
@@ -470,7 +471,7 @@
private Set<String> getConnectedRSUrls(ReplicationServerDomain domain)
{
- Set<String> results = new LinkedHashSet<String>();
+ Set<String> results = new HashSet<String>();
for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values())
{
results.add(normalizeServerURL(rsHandler.getServerAddressURL()));
@@ -714,11 +715,11 @@
eclwe.finalizeWorkflowElement();
}
- synchronized (draftCNLock)
+ synchronized (changelogDBLock)
{
- if (draftCNDbHandler != null)
+ if (changelogDB != null)
{
- draftCNDbHandler.shutdown();
+ changelogDB.shutdown();
}
}
}
@@ -900,42 +901,39 @@
{
dbEnv.clearGenerationId(baseDn);
}
- catch (Exception e)
+ catch (Exception ignored)
{
- // Ignore.
if (debugEnabled())
{
- TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
}
}
- synchronized (draftCNLock)
+ synchronized (changelogDBLock)
{
- if (draftCNDbHandler != null)
+ if (changelogDB != null)
{
try
{
- draftCNDbHandler.clear(baseDn);
+ changelogDB.clear(baseDn);
}
- catch (Exception e)
+ catch (Exception ignored)
{
- // Ignore.
if (debugEnabled())
{
- TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
}
}
try
{
- lastGeneratedDraftCN = draftCNDbHandler.getLastKey();
+ lastGeneratedDraftCN = changelogDB.getLastKey();
}
- catch (Exception e)
+ catch (Exception ignored)
{
- // Ignore.
if (debugEnabled())
{
- TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
}
}
}
@@ -1352,12 +1350,10 @@
public void processExportBegin(Backend backend, LDIFExportConfig config)
{
if (debugEnabled())
- TRACER.debugInfo("RS " +getMonitorInstanceName()+
- " Export starts");
+ TRACER.debugInfo("RS " + getMonitorInstanceName() + " Export starts");
if (backend.getBackendID().equals(backendId))
{
// Retrieves the backend related to this replicationServerDomain
- // backend =
ReplicationBackend b =
(ReplicationBackend)DirectoryServer.getBackend(backendId);
b.setServer(this);
@@ -1394,38 +1390,36 @@
rsd.clearDbs();
}
- synchronized (draftCNLock)
+ synchronized (changelogDBLock)
{
- if (draftCNDbHandler != null)
+ if (changelogDB != null)
{
try
{
- draftCNDbHandler.clear();
+ changelogDB.clear();
}
- catch (Exception e)
+ catch (Exception ignored)
{
- // Ignore.
if (debugEnabled())
{
- TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
}
}
try
{
- draftCNDbHandler.shutdown();
+ changelogDB.shutdown();
}
- catch (Exception e)
+ catch (Exception ignored)
{
- // Ignore.
if (debugEnabled())
{
- TRACER.debugCaught(DebugLogLevel.WARNING, e);
+ TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
}
}
lastGeneratedDraftCN = 0;
- draftCNDbHandler = null;
+ changelogDB = null;
}
}
}
@@ -1614,67 +1608,70 @@
ChangeNumber eligibleCN = null;
for (ReplicationServerDomain domain : getReplicationServerDomains())
{
- if ((excludedBaseDNs != null) &&
- excludedBaseDNs.contains(domain.getBaseDn()))
+ if (contains(excludedBaseDNs, domain.getBaseDn()))
continue;
- ChangeNumber domainEligibleCN = domain.getEligibleCN();
- String dates = "";
- if (domainEligibleCN != null)
+ final ChangeNumber domainEligibleCN = domain.getEligibleCN();
+ if (eligibleCN == null
+ || (domainEligibleCN != null && domainEligibleCN.older(eligibleCN)))
{
- if ((eligibleCN == null) || (domainEligibleCN.older(eligibleCN)))
- {
- eligibleCN = domainEligibleCN;
- }
- dates = new Date(domainEligibleCN.getTime()).toString();
+ eligibleCN = domainEligibleCN;
}
- debugLog += "[dn=" + domain.getBaseDn()
- + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]";
+
+ if (debugEnabled())
+ {
+ final String dates = domainEligibleCN == null ?
+ "" : new Date(domainEligibleCN.getTime()).toString();
+ debugLog += "[baseDN=" + domain.getBaseDn()
+ + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]";
+ }
}
- if (eligibleCN==null)
+ if (eligibleCN==null )
{
eligibleCN = new ChangeNumber(TimeThread.getTime(), 0, 0);
}
- if (debugEnabled())
+ if (debugEnabled()) {
TRACER.debugInfo("In " + this + " getEligibleCN() ends with " +
" the following domainEligibleCN for each domain :" + debugLog +
" thus CrossDomainEligibleCN=" + eligibleCN +
" ts=" + new Date(eligibleCN.getTime()).toString());
-
+ }
return eligibleCN;
}
-
+ private boolean contains(Set<String> col, String elem)
+ {
+ return col != null && col.contains(elem);
+ }
/**
- * Get or create a handler on a Db on DraftCN for external changelog.
+ * Get (or create) a handler on the ChangelogDB for external changelog.
*
* @return the handler.
* @throws DirectoryException
* when needed.
*/
- public DraftCNDbHandler getDraftCNDbHandler() throws DirectoryException
+ public ChangelogDB getChangelogDB() throws DirectoryException
{
- synchronized (draftCNLock)
+ synchronized (changelogDBLock)
{
try
{
- if (draftCNDbHandler == null)
+ if (changelogDB == null)
{
- draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv);
+ changelogDB = new DraftCNDbHandler(this, this.dbEnv);
lastGeneratedDraftCN = getLastDraftChangeNumber();
}
- return draftCNDbHandler;
+ return changelogDB;
}
catch (Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get(""));
- throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
- mb.toMessage(), e);
+ throw new DirectoryException(OPERATIONS_ERROR, mb.toMessage(), e);
}
}
}
@@ -1685,11 +1682,11 @@
*/
public int getFirstDraftChangeNumber()
{
- synchronized (draftCNLock)
+ synchronized (changelogDBLock)
{
- if (draftCNDbHandler != null)
+ if (changelogDB != null)
{
- return draftCNDbHandler.getFirstKey();
+ return changelogDB.getFirstKey();
}
return 0;
}
@@ -1701,11 +1698,11 @@
*/
public int getLastDraftChangeNumber()
{
- synchronized (draftCNLock)
+ synchronized (changelogDBLock)
{
- if (draftCNDbHandler != null)
+ if (changelogDB != null)
{
- return draftCNDbHandler.getLastKey();
+ return changelogDB.getLastKey();
}
return 0;
}
@@ -1717,7 +1714,7 @@
*/
public int getNewDraftCN()
{
- synchronized (draftCNLock)
+ synchronized (changelogDBLock)
{
return ++lastGeneratedDraftCN;
}
@@ -1756,12 +1753,11 @@
*/
int lastDraftCN;
- Boolean dbEmpty = false;
- Long newestDate = 0L;
- DraftCNDbHandler draftCNDbH = getDraftCNDbHandler();
+ boolean dbEmpty = false;
+ long newestDate = 0L;
+ ChangelogDB changelogDB = getChangelogDB();
- // Get the first DraftCN from the DraftCNdb
- int firstDraftCN = draftCNDbH.getFirstKey();
+ int firstDraftCN = changelogDB.getFirstKey();
Map<String,ServerState> domainsServerStateForLastSeqnum = null;
ChangeNumber changeNumberForLastSeqnum = null;
String domainForLastSeqnum = null;
@@ -1773,12 +1769,11 @@
}
else
{
- // Get the last DraftCN from the DraftCNdb
- lastDraftCN = draftCNDbH.getLastKey();
+ lastDraftCN = changelogDB.getLastKey();
// Get the generalized state associated with the current last DraftCN
// and initializes from it the startStates table
- String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
+ String lastSeqnumGenState = changelogDB.getPreviousCookie(lastDraftCN);
if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
{
domainsServerStateForLastSeqnum = MultiDomainServerState.
@@ -1786,16 +1781,16 @@
}
// Get the changeNumber associated with the current last DraftCN
- changeNumberForLastSeqnum = draftCNDbH.getChangeNumber(lastDraftCN);
+ changeNumberForLastSeqnum = changelogDB.getChangeNumber(lastDraftCN);
// Get the domain associated with the current last DraftCN
- domainForLastSeqnum = draftCNDbH.getBaseDN(lastDraftCN);
+ domainForLastSeqnum = changelogDB.getBaseDN(lastDraftCN);
}
// Domain by domain
for (ReplicationServerDomain rsd : getReplicationServerDomains())
{
- if (excludedBaseDNs.contains(rsd.getBaseDn()))
+ if (contains(excludedBaseDNs, rsd.getBaseDn()))
continue;
// for this domain, have the state in the replchangelog
@@ -1860,15 +1855,12 @@
{
disableEligibility(excludedBaseDNs);
+ // Initialize start state for all running domains with empty state
MultiDomainServerState result = new MultiDomainServerState();
- // Initialize start state for all running domains with empty state
for (ReplicationServerDomain rsd : getReplicationServerDomains())
{
- if ((excludedBaseDNs != null)
- && (excludedBaseDNs.contains(rsd.getBaseDn())))
- continue;
-
- if (rsd.getDbServerState().isEmpty())
+ if (contains(excludedBaseDNs, rsd.getBaseDn())
+ || rsd.getDbServerState().isEmpty())
continue;
result.update(rsd.getBaseDn(), rsd.getEligibleState(getEligibleCN()));
--
Gitblit v1.10.0