From c56828ecb3b656bb531d313b722bd572eeb10905 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 05:45:15 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 380 +++++++++++++++++++++++++-----------------------------
1 files changed, 177 insertions(+), 203 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 1f44467..19f3dc7 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -31,7 +31,6 @@
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.zip.DataFormatException;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -317,7 +316,7 @@
}
}
- private String clDomCtxtsToString(String msg)
+ private String domaimCtxtsToString(String msg)
{
StringBuilder buffer = new StringBuilder();
buffer.append(msg).append("\n");
@@ -483,17 +482,12 @@
/**
* Wait receiving the StartSessionMsg from the remote DS and process it.
+ *
* @return the startSessionMsg received
- * @throws DirectoryException
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws DataFormatException
- * @throws NotSupportedOldVersionPDUException
+ * @throws Exception
*/
private StartECLSessionMsg waitAndProcessStartSessionECLFromRemoteServer()
- throws DirectoryException, IOException, ClassNotFoundException,
- DataFormatException,
- NotSupportedOldVersionPDUException
+ throws Exception
{
ReplicationMsg msg = session.receive();
@@ -505,9 +499,8 @@
}
else if (!(msg instanceof StartECLSessionMsg))
{
- Message message = Message
- .raw("Protocol error: StartECLSessionMsg required." + msg
- + " received.");
+ Message message = Message.raw(
+ "Protocol error: StartECLSessionMsg required." + msg + " received.");
abortStart(message);
return null;
}
@@ -576,7 +569,7 @@
* @param startChangeNumber
* the start change number coming from the request filter.
* @return the cookie corresponding to the passed in startChangeNumber.
- * @throws Exception
+ * @throws ChangelogException
* if a database problem occurred
* @throws DirectoryException
* if a database problem occurred
@@ -677,170 +670,9 @@
private void initializeChangelogDomainCtxts(String providedCookie,
boolean allowUnknownDomains) throws DirectoryException
{
- /*
- This map is initialized from the providedCookie.
- Below, it will be traversed and each domain configured with ECL will be
- checked and removed from the map.
- At the end, normally the map should be empty.
- Depending on allowUnknownDomains provided flag, a non empty map will
- be considered as an error when allowUnknownDomains is false.
- */
- Map<DN, ServerState> startStatesFromProvidedCookie =
- new HashMap<DN, ServerState>();
-
- ReplicationServer rs = this.replicationServer;
-
- // Parse the provided cookie and overwrite startState from it.
- if ((providedCookie != null) && (providedCookie.length()!=0))
- startStatesFromProvidedCookie =
- MultiDomainServerState.splitGenStateToServerStates(providedCookie);
-
try
{
- // Creates the table that will contain the real-time info for each
- // and every domain.
- final Set<DomainContext> tmpSet = new HashSet<DomainContext>();
- final StringBuilder missingDomains = new StringBuilder();
- for (ReplicationServerDomain domain : toIterable(rs.getDomainIterator()))
- {
- // skip the 'unreal' changelog domain
- if (domain == this.replicationServerDomain)
- continue;
-
- // skip the excluded domains
- if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString()))
- {
- // this is an excluded domain
- if (allowUnknownDomains)
- startStatesFromProvidedCookie.remove(domain.getBaseDN());
- continue;
- }
-
- // skip unused domains
- final ServerState latestServerState = domain.getLatestServerState();
- if (latestServerState.isEmpty())
- continue;
-
- // Creates the new domain context
- final DomainContext newDomainCtxt = new DomainContext();
- newDomainCtxt.active = true;
- newDomainCtxt.rsDomain = domain;
- newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate();
-
- // Assign the start state for the domain
- if (isPersistent == PERSISTENT_CHANGES_ONLY)
- {
- newDomainCtxt.startState = latestServerState;
- startStatesFromProvidedCookie.remove(domain.getBaseDN());
- }
- else
- {
- // let's take the start state for this domain from the provided
- // cookie
- newDomainCtxt.startState =
- startStatesFromProvidedCookie.remove(domain.getBaseDN());
-
- if (providedCookie == null
- || providedCookie.length() == 0
- || allowUnknownDomains)
- {
- // when there is no cookie provided in the request,
- // let's start traversing this domain from the beginning of
- // what we have in the replication changelog
- if (newDomainCtxt.startState == null)
- {
- CSN latestTrimCSN =
- new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0);
- newDomainCtxt.startState =
- domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN);
- }
- }
- else
- {
- // when there is a cookie provided in the request,
- if (newDomainCtxt.startState == null)
- {
- missingDomains.append(domain.getBaseDN()).append(":;");
- continue;
- }
- else if (!newDomainCtxt.startState.isEmpty())
- {
- if (hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
- {
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
- newDomainCtxt.rsDomain.getBaseDN().toNormalizedString()));
- }
- }
- }
-
- newDomainCtxt.stopState = latestServerState;
- }
- newDomainCtxt.currentState = new ServerState();
-
- // Creates an unconnected SH for the domain
- MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
- mh.setInitialServerState(newDomainCtxt.startState);
- mh.setBaseDNAndDomain(domain.getBaseDN(), false);
- // register the unconnected into the domain
- domain.registerHandler(mh);
- newDomainCtxt.mh = mh;
-
- previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(),
- newDomainCtxt.startState);
-
- // store the new context
- tmpSet.add(newDomainCtxt);
- }
-
- if (missingDomains.length()>0)
- {
- // If there are domain missing in the provided cookie,
- // the request is rejected and a full resync is required.
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
- missingDomains,
- "<" + providedCookie + missingDomains + ">"));
- }
-
- domainCtxts = tmpSet;
-
- /*
- When it is valid to have the provided cookie containing unknown domains
- (allowUnknownDomains is true), 2 cases must be considered :
- - if the cookie contains a domain that is replicated but where
- ECL is disabled, then this case is considered above
- - if the cookie contains a domain that is even not replicated
- then this case need to be considered here in another loop.
- */
- if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
- {
- for (DN providedDomain : startStatesFromProvidedCookie.keySet())
- if (rs.getReplicationServerDomain(providedDomain) == null)
- // the domain provided in the cookie is not replicated
- startStatesFromProvidedCookie.remove(providedDomain);
- }
-
- // Now do the final checking
- if (!startStatesFromProvidedCookie.isEmpty())
- {
- /*
- After reading all the known domains from the provided cookie, there
- is one (or several) domain that are not currently configured.
- This domain has probably been removed or replication disabled on it.
- The request is rejected and full resync is required.
- */
- StringBuilder sb = new StringBuilder();
- for (DomainContext domainCtxt : domainCtxts) {
- sb.append(domainCtxt.rsDomain.getBaseDN()).append(":")
- .append(domainCtxt.startState).append(";");
- }
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
- startStatesFromProvidedCookie.toString() ,sb.toString()));
- }
-
- // the next record from the CNIndexDB should be the one
+ domainCtxts = buildDomainContexts(providedCookie, allowUnknownDomains);
startCookie = providedCookie;
// Initializes each and every domain with the next(first) eligible message
@@ -867,8 +699,163 @@
e);
}
if (debugEnabled())
- TRACER.debugInfo(
- " initializeCLDomCtxts ends with " + " " + dumpState());
+ TRACER.debugInfo("initializeChangelogDomainCtxts() ends with "
+ + dumpState());
+ }
+
+ private Set<DomainContext> buildDomainContexts(String providedCookie,
+ boolean allowUnknownDomains) throws DirectoryException
+ {
+ final Set<DomainContext> results = new HashSet<DomainContext>();
+ final ReplicationServer rs = this.replicationServer;
+
+ /*
+ This map is initialized from the providedCookie.
+ Below, it will be traversed and each domain configured with ECL will be
+ checked and removed from the map.
+ At the end, normally the map should be empty.
+ Depending on allowUnknownDomains provided flag, a non empty map will
+ be considered as an error when allowUnknownDomains is false.
+ */
+ final Map<DN, ServerState> startStatesFromProvidedCookie =
+ MultiDomainServerState.splitGenStateToServerStates(providedCookie);
+
+ final StringBuilder missingDomains = new StringBuilder();
+ for (ReplicationServerDomain domain : toIterable(rs.getDomainIterator()))
+ {
+ // skip the 'unreal' changelog domain
+ if (domain == this.replicationServerDomain)
+ continue;
+
+ // skip the excluded domains
+ if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString()))
+ {
+ // this is an excluded domain
+ if (allowUnknownDomains)
+ startStatesFromProvidedCookie.remove(domain.getBaseDN());
+ continue;
+ }
+
+ // skip unused domains
+ final ServerState latestServerState = domain.getLatestServerState();
+ if (latestServerState.isEmpty())
+ continue;
+
+
+ // Creates the new domain context
+ final DomainContext newDomainCtxt = new DomainContext();
+ newDomainCtxt.active = true;
+ newDomainCtxt.rsDomain = domain;
+ newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate();
+
+ // Assign the start state for the domain
+ if (isPersistent == PERSISTENT_CHANGES_ONLY)
+ {
+ newDomainCtxt.startState = latestServerState;
+ startStatesFromProvidedCookie.remove(domain.getBaseDN());
+ }
+ else
+ {
+ // let's take the start state for this domain from the provided
+ // cookie
+ newDomainCtxt.startState =
+ startStatesFromProvidedCookie.remove(domain.getBaseDN());
+
+ if (providedCookie == null || providedCookie.length() == 0
+ || allowUnknownDomains)
+ {
+ // when there is no cookie provided in the request,
+ // let's start traversing this domain from the beginning of
+ // what we have in the replication changelog
+ if (newDomainCtxt.startState == null)
+ {
+ CSN latestTrimCSN =
+ new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0);
+ newDomainCtxt.startState =
+ domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN);
+ }
+ }
+ else
+ {
+ // when there is a cookie provided in the request,
+ if (newDomainCtxt.startState == null)
+ {
+ missingDomains.append(domain.getBaseDN()).append(":;");
+ continue;
+ }
+ else if (!newDomainCtxt.startState.isEmpty()
+ && hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
+ {
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
+ newDomainCtxt.rsDomain.getBaseDN().toNormalizedString()));
+ }
+ }
+
+ newDomainCtxt.stopState = latestServerState;
+ }
+ newDomainCtxt.currentState = new ServerState();
+
+ // Creates an unconnected SH for the domain
+ MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
+ mh.setInitialServerState(newDomainCtxt.startState);
+ mh.setBaseDNAndDomain(domain.getBaseDN(), false);
+ // register the unconnected into the domain
+ domain.registerHandler(mh);
+ newDomainCtxt.mh = mh;
+
+ previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(),
+ newDomainCtxt.startState);
+
+ results.add(newDomainCtxt);
+ }
+
+ if (missingDomains.length()>0)
+ {
+ // If there are domain missing in the provided cookie,
+ // the request is rejected and a full resync is required.
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
+ missingDomains,
+ "<" + providedCookie + missingDomains + ">"));
+ }
+
+ /*
+ When it is valid to have the provided cookie containing unknown domains
+ (allowUnknownDomains is true), 2 cases must be considered :
+ - if the cookie contains a domain that is replicated but where
+ ECL is disabled, then this case is considered above
+ - if the cookie contains a domain that is even not replicated
+ then this case need to be considered here in another loop.
+ */
+ if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
+ {
+ for (DN providedDomain : startStatesFromProvidedCookie.keySet())
+ if (rs.getReplicationServerDomain(providedDomain) == null)
+ // the domain provided in the cookie is not replicated
+ startStatesFromProvidedCookie.remove(providedDomain);
+ }
+
+ // Now do the final checking
+ if (!startStatesFromProvidedCookie.isEmpty())
+ {
+ /*
+ After reading all the known domains from the provided cookie, there
+ is one (or several) domain that are not currently configured.
+ This domain has probably been removed or replication disabled on it.
+ The request is rejected and full resync is required.
+ */
+ StringBuilder sb = new StringBuilder();
+ for (DomainContext domainCtxt : domainCtxts) {
+ sb.append(domainCtxt.rsDomain.getBaseDN()).append(":")
+ .append(domainCtxt.startState).append(";");
+ }
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
+ startStatesFromProvidedCookie.toString() ,sb.toString()));
+ }
+
+ return results;
}
private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain,
@@ -1039,6 +1026,7 @@
try
{
// Disable timeout for next communications
+ // FIXME: why? and where is it reset?
session.setSoTimeout(0);
}
catch(Exception e) { /* do nothing */ }
@@ -1046,18 +1034,15 @@
// sendWindow MUST be created before starting the writer
sendWindow = new Semaphore(sendWindowSize);
- // create reader
reader = new ServerReader(session, this);
reader.start();
if (writer == null)
{
- // create writer
writer = new ECLServerWriter(session,this,replicationServerDomain);
writer.start();
}
- // Resume the writer
((ECLServerWriter)writer).resumeWriter();
// TODO:ECL Potential race condition if writer not yet resumed here
@@ -1073,7 +1058,7 @@
if (debugEnabled())
TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId
+ " initialized: " + " " + dumpState() + " " + " "
- + clDomCtxtsToString(""));
+ + domaimCtxtsToString(""));
}
private void initializeChangelogSearch(StartECLSessionMsg msg)
@@ -1136,18 +1121,17 @@
@Override
protected UpdateMsg getNextMessage(boolean synchronous)
{
- UpdateMsg msg = null;
try
{
ECLUpdateMsg eclMsg = getNextECLUpdate();
- if (eclMsg!=null)
- msg = eclMsg.getUpdateMsg();
+ if (eclMsg != null)
+ return eclMsg.getUpdateMsg();
}
catch(DirectoryException de)
{
TRACER.debugCaught(DebugLogLevel.ERROR, de);
}
- return msg;
+ return null;
}
/**
@@ -1187,7 +1171,7 @@
while (continueLooping && searchPhase == INIT_PHASE)
{
// Step 1 & 2
- DomainContext oldestContext = findOldestChangeFromDomainCtxts();
+ final DomainContext oldestContext = findDomainCtxtWithOldestChange();
if (oldestContext == null)
{ // there is no oldest change to process
closeInitPhase();
@@ -1196,7 +1180,6 @@
return null;
}
- // Build the ECLUpdateMsg to be returned
final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
// Default is not to loop, with one exception
@@ -1222,8 +1205,6 @@
}
if (oldestContext.active)
{
- // populates the table with the next eligible msg from iDom
- // in non blocking mode, return null when no more eligible msg
oldestContext.computeNextEligibleMessageForDomain(operationId);
}
oldestChange = change;
@@ -1232,7 +1213,7 @@
if (searchPhase == PERSISTENT_PHASE)
{
if (debugEnabled())
- TRACER.debugInfo(clDomCtxtsToString(
+ TRACER.debugInfo(domaimCtxtsToString(
"In getNextECLUpdate (persistent): "
+ "looking for the generalized oldest change"));
@@ -1240,12 +1221,11 @@
domainCtxt.computeNextEligibleMessageForDomain(operationId);
}
- DomainContext oldestContext = findOldestChangeFromDomainCtxts();
+ final DomainContext oldestContext = findDomainCtxtWithOldestChange();
if (oldestContext != null)
{
final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
oldestContext.currentState.update(change.getUpdateMsg().getCSN());
-
if (draftCompat)
{
assignNewChangeNumberAndStore(change);
@@ -1269,16 +1249,12 @@
if (debugEnabled())
TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn);
- // Update the current state
previousCookie.update(oldestChange.getBaseDN(), csn);
-
- // Set the current value of global state in the returned message
oldestChange.setCookie(previousCookie);
if (debugEnabled())
TRACER.debugInfo("getNextECLUpdate returns result oldestChange="
+ oldestChange);
-
}
return oldestChange;
}
@@ -1327,8 +1303,6 @@
return true;
}
-
- // the next change from the CNIndexDB
final CNIndexRecord currentRecord = cnIndexDBCursor.getRecord();
final CSN csnFromCNIndexDB = currentRecord.getCSN();
final DN dnFromCNIndexDB = currentRecord.getBaseDN();
@@ -1454,7 +1428,7 @@
searchPhase = UNDEFINED_PHASE;
}
- // End of INIT_PHASE => always release the iterator
+ // End of INIT_PHASE => always release the cursor
releaseCursor();
}
@@ -1464,13 +1438,13 @@
* @return the domainCtxt of the domain with the oldest change, null when
* none.
*/
- private DomainContext findOldestChangeFromDomainCtxts()
+ private DomainContext findDomainCtxtWithOldestChange()
{
DomainContext oldestCtxt = null;
for (DomainContext domainCtxt : domainCtxts)
{
if (domainCtxt.active
- // .msg is null when the previous (non blocking) nextMessage did
+ // .nextMsg is null when the previous (non blocking) nextMessage did
// not have any eligible msg to return
&& domainCtxt.nextMsg != null
&& (oldestCtxt == null
@@ -1482,7 +1456,7 @@
if (debugEnabled())
TRACER.debugInfo("In cn=changelog," + this
- + " getOldestChangeFromDomainCtxts() returns "
+ + " findDomainCtxtWithOldestChange() returns "
+ ((oldestCtxt != null) ? oldestCtxt.nextMsg : "-1"));
return oldestCtxt;
--
Gitblit v1.10.0