From f2279a32d3187f3903437f1d6fdd6639a0d13b2b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 12:43:41 +0000
Subject: [PATCH] OPENDJ-1430 Some changes are missing from the external changelog
---
opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java | 296 ++++++++++++++++++++++------------------------------------
1 files changed, 112 insertions(+), 184 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 673973e..e4a7260 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -27,28 +27,32 @@
package org.opends.server.replication.server;
import java.io.IOException;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.changelog.api.*;
-import org.opends.server.types.*;
-import org.forgerock.opendj.ldap.ResultCode;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
import org.opends.server.util.ServerConstants;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
-import static org.opends.server.replication.protocol.StartECLSessionMsg
-.ECLRequestType.*;
-import static org.opends.server.replication.protocol.StartECLSessionMsg
-.Persistent.*;
+import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*;
+import static org.opends.server.replication.protocol.StartECLSessionMsg.Persistent.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -117,12 +121,6 @@
private MultiDomainServerState previousCookie = new MultiDomainServerState();
/**
- * Eligible CSN - only changes older or equal to eligibleCSN are published in
- * the ECL.
- */
- private CSN eligibleCSN;
-
- /**
* The global list of contexts by domain for the search currently processed.
*/
private Set<DomainContext> domainCtxts = Collections.emptySet();
@@ -133,16 +131,15 @@
*/
private String dumpState()
{
- return getClass().getCanonicalName() +
- "[" +
- "[draftCompat=" + draftCompat +
- "] [persistent=" + startECLSessionMsg.getPersistent() +
- "] [startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() +
- "] [isEndOfCNIndexDBReached=" + isEndOfCNIndexDBReached +
- "] [searchPhase=" + searchPhase +
- "] [startCookie=" + startCookie +
- "] [previousCookie=" + previousCookie +
- "]]";
+ return getClass().getSimpleName() +
+ " [draftCompat=" + draftCompat +
+ ", persistent=" + startECLSessionMsg.getPersistent() +
+ ", startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() +
+ ", endOfCNIndexDBReached=" + isEndOfCNIndexDBReached +
+ ", searchPhase=" + searchPhase +
+ ", startCookie=" + startCookie +
+ ", previousCookie=" + previousCookie +
+ "]";
}
/**
@@ -155,27 +152,35 @@
*/
private class DomainContext
{
- private ReplicationServerDomain rsDomain;
+ private final ReplicationServerDomain rsDomain;
/**
- * active when there are still changes supposed eligible for the ECL.
+ * Active when there are still changes supposed eligible for the ECL. It is
+ * active by default.
*/
- private boolean active;
+ private boolean active = true;
+ private UpdateMsg nextMsg;
/**
* the message handler from which are reading the changes for this domain.
*/
- private MessageHandler mh;
- private UpdateMsg nextMsg;
- private UpdateMsg nextNonEligibleMsg;
- private ServerState startState;
- private ServerState currentState;
- private ServerState stopState;
- private long domainLatestTrimDate;
+ private final MessageHandler mh;
+ private final ServerState startState;
+ private final ServerState currentState = new ServerState();
+ private final ServerState stopState;
+ private final long domainLatestTrimDate;
- /**
- * {@inheritDoc}
- */
+ public DomainContext(ReplicationServerDomain domain,
+ ServerState startState, ServerState stopState, MessageHandler mh)
+ {
+ this.rsDomain = domain;
+ this.startState = startState;
+ this.stopState = stopState;
+ this.mh = mh;
+ this.domainLatestTrimDate = domain.getLatestDomainTrimDate();
+ }
+
+ /** {@inheritDoc} */
@Override
public String toString()
{
@@ -183,96 +188,38 @@
toString(buffer);
return buffer.toString();
}
- /**
- * Provide a string representation of this object for debug purpose..
- * @param buffer Append to this buffer.
- */
- public void toString(StringBuilder buffer)
+
+ private StringBuilder toString(StringBuilder buffer)
{
- buffer.append("[ [active=").append(active)
- .append("] [rsDomain=").append(rsDomain)
- .append("] [nextMsg=").append(nextMsg).append("(")
- .append(nextMsg != null ? asDate(nextMsg.getCSN()).toString() : "")
- .append(")")
- .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
- .append("] [startState=").append(startState)
- .append("] [currentState=").append(currentState)
- .append("] [stopState=").append(stopState)
- .append("]]");
+ buffer.append(getClass().getSimpleName());
+ buffer.append(" [");
+ buffer.append(active ? "active" : "inactive");
+ buffer.append(", baseDN=\"").append(rsDomain.getBaseDN()).append("\"");
+ if (nextMsg != null)
+ {
+ buffer.append(", csn=").append(nextMsg.getCSN().toStringUI());
+ }
+ buffer.append(", nextMsg=[").append(nextMsg);
+ buffer.append("]")
+ .append(", startState=").append(startState)
+ .append(", currentState=").append(currentState)
+ .append(", stopState=").append(stopState)
+ .append("]");
+ return buffer;
}
/**
- * Computes the next message eligible regarding the crossDomain eligible
- * CSN.
- *
- * @param opId The operation id.
+ * Computes the next available message for this domain context.
*/
- private void computeNextEligibleMessageForDomain(String opId)
+ private void computeNextAvailableMessage()
{
+ nextMsg = getNextMessage();
if (logger.isTraceEnabled())
- debugInfo(opId, "ctxt=" + this);
-
- assert(nextMsg == null);
- try
{
- // Before get a new message from the domain, evaluate in priority
- // a message that has not been published to the ECL because it was
- // not eligible
- if (nextNonEligibleMsg != null)
- {
- final boolean hasBecomeEligible = isEligible(nextNonEligibleMsg);
-
- if (logger.isTraceEnabled())
- debugInfo(opId, "stored nonEligibleMsg " + nextNonEligibleMsg
- + " has now become eligible regarding the eligibleCSN ("
- + eligibleCSN + " ): " + hasBecomeEligible);
-
- if (hasBecomeEligible)
- {
- nextMsg = nextNonEligibleMsg;
- nextNonEligibleMsg = null;
- }
- // else the oldest is still not eligible - let's wait next
- }
- else
- {
- // Here comes a new message !!!
- final UpdateMsg newMsg = getNextMessage();
- if (newMsg == null)
- {
- return;
- }
-
- if (logger.isTraceEnabled())
- debugInfo(opId, "got new message : [newMsg=" + newMsg + "] "
- + dumpState());
-
- final boolean isEligible = isEligible(newMsg);
-
- if (logger.isTraceEnabled())
- debugInfo(opId, "newMsg isEligible=" + isEligible + " since "
- + "newMsg=[" + toString(newMsg.getCSN()) + "] eligibleCSN=["
- + toString(eligibleCSN) + "] " + dumpState());
-
- if (isEligible)
- {
- nextMsg = newMsg;
- }
- else
- {
- nextNonEligibleMsg = newMsg;
- }
- }
+ logger.trace("In ECLServerHandler, for baseDN="
+ + mh.getBaseDNString() + " computeNextAvailableMessage("
+ + getOperationId() + ") : newMsg=[" + nextMsg + "] " + dumpState());
}
- catch(Exception e)
- {
- logger.traceException(e);
- }
- }
-
- private boolean isEligible(UpdateMsg msg)
- {
- return msg.getCSN().getTime() <= eligibleCSN.getTime();
}
private UpdateMsg getNextMessage()
@@ -296,18 +243,6 @@
}
}
- private String toString(CSN csn)
- {
- return csn + " " + asDate(csn);
- }
-
- private void debugInfo(String opId, String message)
- {
- logger.trace("In ECLServerHandler, for baseDN="
- + mh.getBaseDNString() + " getNextEligibleMessageForDomain(" + opId
- + ") " + message);
- }
-
/**
* Unregister the handler from the DomainContext ReplicationDomain.
* @return Whether the handler has been unregistered with success.
@@ -328,11 +263,10 @@
private String domaimCtxtsToString(String msg)
{
- StringBuilder buffer = new StringBuilder();
+ final StringBuilder buffer = new StringBuilder();
buffer.append(msg).append("\n");
for (DomainContext domainCtxt : domainCtxts) {
- domainCtxt.toString(buffer);
- buffer.append("\n");
+ domainCtxt.toString(buffer).append("\n");
}
return buffer.toString();
}
@@ -702,10 +636,12 @@
// Initializes each and every domain with the next(first) eligible message
// from the domain.
for (DomainContext domainCtxt : domainCtxts) {
- domainCtxt.computeNextEligibleMessageForDomain(getOperationId());
+ domainCtxt.computeNextAvailableMessage();
if (domainCtxt.nextMsg == null)
+ {
domainCtxt.active = false;
+ }
}
}
catch(DirectoryException de)
@@ -767,64 +703,48 @@
continue;
// Creates the new domain context
- final DomainContext newDomainCtxt = new DomainContext();
- newDomainCtxt.active = true;
- newDomainCtxt.rsDomain = domain;
- newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate();
-
- // Assign the start state for the domain
+ final DomainContext newDomainCtxt;
+ final ServerState domainStartState =
+ startStatesFromProvidedCookie.remove(domain.getBaseDN());
if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY)
{
- newDomainCtxt.startState = latestState;
- startStatesFromProvidedCookie.remove(domain.getBaseDN());
+ newDomainCtxt = newDomainContext(domain, null, latestState);
}
else
{
// let's take the start state for this domain from the provided cookie
- newDomainCtxt.startState =
- startStatesFromProvidedCookie.remove(domain.getBaseDN());
-
+ ServerState startState = domainStartState;
if (providedCookie == null || providedCookie.length() == 0
|| allowUnknownDomains)
{
// when there is no cookie provided in the request,
// let's start traversing this domain from the beginning of
// what we have in the replication changelog
- if (newDomainCtxt.startState == null)
+ if (startState == null)
{
- newDomainCtxt.startState =
+ startState =
domain.getOldestState().duplicateOnlyOlderThan(
- newDomainCtxt.domainLatestTrimDate);
+ domain.getLatestDomainTrimDate());
}
}
else
{
// when there is a cookie provided in the request,
- if (newDomainCtxt.startState == null)
+ if (startState == null)
{
missingDomains.append(domain.getBaseDN()).append(":;");
continue;
}
- else if (!newDomainCtxt.startState.isEmpty()
- && hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
+ else if (!startState.isEmpty()
+ && hasCookieBeenTrimmedFromDB(domain, startState))
{
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
- newDomainCtxt.rsDomain.getBaseDN().toNormalizedString()));
+ domain.getBaseDN().toNormalizedString()));
}
}
-
- newDomainCtxt.stopState = latestState;
+ newDomainCtxt = newDomainContext(domain, startState, latestState);
}
- newDomainCtxt.currentState = new ServerState();
-
- // Creates an unconnected SH for the domain
- MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
- mh.setInitialServerState(newDomainCtxt.startState);
- mh.setBaseDNAndDomain(domain.getBaseDN(), false);
- // register the unconnected into the domain
- domain.registerHandler(mh);
- newDomainCtxt.mh = mh;
previousCookie.replace(newDomainCtxt.rsDomain.getBaseDN(),
newDomainCtxt.startState.duplicate());
@@ -832,7 +752,7 @@
results.add(newDomainCtxt);
}
- if (missingDomains.length()>0)
+ if (missingDomains.length() > 0)
{
// If there are domain missing in the provided cookie,
// the request is rejected and a full resync is required.
@@ -851,11 +771,16 @@
*/
if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
{
- // JNR: Will the following code trigger a ConcurrentModificationException?
- for (DN providedDomain : startStatesFromProvidedCookie.keySet())
+ final Set<DN> providedDomains = startStatesFromProvidedCookie.keySet();
+ for (Iterator<DN> iter = providedDomains.iterator(); iter.hasNext();)
+ {
+ DN providedDomain = iter.next();
if (rs.getReplicationServerDomain(providedDomain) == null)
+ {
// the domain provided in the cookie is not replicated
- startStatesFromProvidedCookie.remove(providedDomain);
+ iter.remove();
+ }
+ }
}
// Now do the final checking
@@ -880,6 +805,19 @@
return results;
}
+ private DomainContext newDomainContext(ReplicationServerDomain domain,
+ ServerState startState, ServerState stopState) throws DirectoryException
+ {
+ // Create an unconnected MessageHandler for the domain
+ MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
+ mh.setInitialServerState(startState);
+ mh.setBaseDNAndDomain(domain.getBaseDN(), false);
+ // register the unconnected into the domain
+ domain.registerHandler(mh);
+
+ return new DomainContext(domain, startState, stopState, mh);
+ }
+
private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain,
ServerState cookie)
{
@@ -1070,15 +1008,12 @@
if (logger.isTraceEnabled())
logger.trace(getClass().getCanonicalName() + " " + getOperationId()
- + " initialized: " + " " + dumpState() + " " + " "
- + domaimCtxtsToString(""));
+ + " initialized: " + " " + dumpState() + domaimCtxtsToString(""));
}
private void initializeChangelogSearch(StartECLSessionMsg msg)
throws DirectoryException
{
- refreshEligibleCSN();
-
if (msg.getECLRequestType() == REQUEST_TYPE_FROM_COOKIE)
{
initializeCLSearchFromCookie(msg.getCrossDomainServerState());
@@ -1099,7 +1034,6 @@
*/
public ECLUpdateMsg takeECLUpdate() throws DirectoryException
{
- refreshEligibleCSN();
ECLUpdateMsg msg = getNextECLUpdate();
// TODO:ECL We should refactor so that a SH always have a session
@@ -1157,8 +1091,10 @@
public ECLUpdateMsg getNextECLUpdate() throws DirectoryException
{
if (logger.isTraceEnabled())
+ {
logger.trace("In cn=changelog" + this +
" getNextECLUpdate starts: " + dumpState());
+ }
ECLUpdateMsg oldestChange = null;
try
@@ -1221,7 +1157,7 @@
}
if (oldestContext.active)
{
- oldestContext.computeNextEligibleMessageForDomain(getOperationId());
+ oldestContext.computeNextAvailableMessage();
}
oldestChange = change;
}
@@ -1233,8 +1169,9 @@
"In getNextECLUpdate (persistent): "
+ "looking for the generalized oldest change"));
- for (DomainContext domainCtxt : domainCtxts) {
- domainCtxt.computeNextEligibleMessageForDomain(getOperationId());
+ for (DomainContext domainCtxt : domainCtxts)
+ {
+ domainCtxt.computeNextAvailableMessage();
}
final DomainContext oldestContext = findDomainCtxtWithOldestChange();
@@ -1494,13 +1431,4 @@
return this.searchPhase != INIT_PHASE;
}
- /**
- * Refresh the eligibleCSN by requesting the replication server.
- */
- private void refreshEligibleCSN()
- {
- Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs();
- eligibleCSN = replicationServer.getEligibleCSN(excludedBaseDNs);
- }
-
}
--
Gitblit v1.10.0