From abe3bce25e7f6ecd0ce8b90a14036d3380739e9e Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 24 Aug 2009 16:11:46 +0000
Subject: [PATCH] Fix 4183 - ECL (draft mode): first and last ChangeNumber are 0 until first search
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 115 +++++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java | 336 +++++++++++++++++++++++----
opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java | 22 +
opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java | 18 +
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 133 ----------
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java | 6
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java | 24 +-
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 20 +
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java | 36 +-
9 files changed, 479 insertions(+), 231 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java b/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
index a8a4320..0c332e2 100644
--- a/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
+++ b/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
@@ -25,8 +25,8 @@
* Copyright 2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.common;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -39,7 +39,8 @@
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.SearchOperation;
-import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.AttributeValues;
import org.opends.server.types.ByteString;
@@ -48,6 +49,7 @@
import org.opends.server.types.InitializationException;
import org.opends.server.types.ResultCode;
import org.opends.server.types.VirtualAttributeRule;
+import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -63,7 +65,6 @@
extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg>
implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg>
{
- private static final DebugTracer TRACER = getTracer();
// The current configuration for this virtual attribute provider.
private UserDefinedVirtualAttributeCfg currentConfig;
@@ -137,8 +138,19 @@
DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
if (eclwe!=null)
{
- first = String.valueOf(
- eclwe.getReplicationServer().getFirstDraftChangeNumber());
+ // Set a list of excluded domains (also exclude 'cn=changelog' itself)
+ ArrayList<String> excludedDomains =
+ MultimasterReplication.getPrivateDomains();
+ if (!excludedDomains.contains(
+ ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
+ excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
+
+ ReplicationServer rs = eclwe.getReplicationServer();
+ int[] limits = rs.getECLDraftCNLimits(
+ rs.getEligibleCN(), excludedDomains);
+
+ first = String.valueOf(limits[0]);
+
}
}
catch(Exception e)
diff --git a/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java b/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
index a2a44b8..c8a068b 100644
--- a/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
+++ b/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
@@ -27,6 +27,7 @@
package org.opends.server.replication.common;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -40,6 +41,8 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.AttributeValues;
import org.opends.server.types.ByteString;
@@ -48,6 +51,7 @@
import org.opends.server.types.InitializationException;
import org.opends.server.types.ResultCode;
import org.opends.server.types.VirtualAttributeRule;
+import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -137,8 +141,18 @@
DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
if (eclwe!=null)
{
- last = String.valueOf(
- eclwe.getReplicationServer().getLastDraftChangeNumber());
+ // Set a list of excluded domains (also exclude 'cn=changelog' itself)
+ ArrayList<String> excludedDomains =
+ MultimasterReplication.getPrivateDomains();
+ if (!excludedDomains.contains(
+ ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
+ excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
+
+ ReplicationServer rs = eclwe.getReplicationServer();
+ int[] limits = rs.getECLDraftCNLimits(
+ rs.getEligibleCN(), excludedDomains);
+
+ last = String.valueOf(limits[1]);
}
}
catch(Exception e)
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
index c7bff36..5beda7e 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -315,28 +315,28 @@
if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
return "ModifyDNMsg content: " +
- "\nprotocolVersion: " + protocolVersion +
- "\ndn: " + dn +
- "\nchangeNumber: " + changeNumber +
- "\nuniqueId: " + uniqueId +
- "\nassuredFlag: " + assuredFlag +
- "\nnewRDN: " + newRDN +
- "\nnewSuperior: " + newSuperior +
- "\ndeleteOldRdn: " + deleteOldRdn;
+ " protocolVersion: " + protocolVersion +
+ " dn: " + dn +
+ " changeNumber: " + changeNumber +
+ " uniqueId: " + uniqueId +
+ " assuredFlag: " + assuredFlag +
+ " newRDN: " + newRDN +
+ " newSuperior: " + newSuperior +
+ " deleteOldRdn: " + deleteOldRdn;
}
if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
{
return "ModifyDNMsg content: " +
- "\nprotocolVersion: " + protocolVersion +
- "\ndn: " + dn +
- "\nchangeNumber: " + changeNumber +
- "\nuniqueId: " + uniqueId +
- "\nnewRDN: " + newRDN +
- "\nnewSuperior: " + newSuperior +
- "\ndeleteOldRdn: " + deleteOldRdn +
- "\nassuredFlag: " + assuredFlag +
- "\nassuredMode: " + assuredMode +
- "\nsafeDataLevel: " + safeDataLevel;
+ " protocolVersion: " + protocolVersion +
+ " dn: " + dn +
+ " changeNumber: " + changeNumber +
+ " uniqueId: " + uniqueId +
+ " newRDN: " + newRDN +
+ " newSuperior: " + newSuperior +
+ " deleteOldRdn: " + deleteOldRdn +
+ " assuredFlag: " + assuredFlag +
+ " assuredMode: " + assuredMode +
+ " safeDataLevel: " + safeDataLevel;
}
return "!!! Unknown version: " + protocolVersion + "!!!";
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
index 507461a..2b50944 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -197,22 +197,22 @@
if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
return "ModifyMsg content: " +
- "\nprotocolVersion: " + protocolVersion +
- "\ndn: " + dn +
- "\nchangeNumber: " + changeNumber +
- "\nuniqueId: " + uniqueId +
- "\nassuredFlag: " + assuredFlag;
+ " protocolVersion: " + protocolVersion +
+ " dn: " + dn +
+ " changeNumber: " + changeNumber +
+ " uniqueId: " + uniqueId +
+ " assuredFlag: " + assuredFlag;
}
if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
{
return "ModifyMsg content: " +
- "\nprotocolVersion: " + protocolVersion +
- "\ndn: " + dn +
- "\nchangeNumber: " + changeNumber +
- "\nuniqueId: " + uniqueId +
- "\nassuredFlag: " + assuredFlag +
- "\nassuredMode: " + assuredMode +
- "\nsafeDataLevel: " + safeDataLevel;
+ " protocolVersion: " + protocolVersion +
+ " dn: " + dn +
+ " changeNumber: " + changeNumber +
+ " uniqueId: " + uniqueId +
+ " assuredFlag: " + assuredFlag +
+ " assuredMode: " + assuredMode +
+ " safeDataLevel: " + safeDataLevel;
}
return "!!! Unknown version: " + protocolVersion + "!!!";
}
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index f884b31..13dd7cd 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -43,7 +43,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.server.DraftCNDB.*;
+import org.opends.server.replication.server.DraftCNDB.DraftCNDBCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
@@ -81,7 +81,6 @@
private boolean shutdown = false;
private boolean trimDone = false;
private DirectoryThread thread = null;
- private final Object flushLock = new Object();
private ReplicationServer replicationServer;
// The maximum number of retries in case of DatabaseDeadlock Exception.
@@ -142,7 +141,8 @@
db.addEntry(key, value, serviceID, cn);
if (debugEnabled())
- TRACER.debugInfo("In DraftCNDbhandler.add, added: "
+ TRACER.debugInfo(
+ "In DraftCNDbhandler.add, added: "
+ " key=" + key
+ " value=" + value
+ " serviceID=" + serviceID
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index c5effda..4346763 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -105,6 +105,7 @@
* Specifies the excluded DNs (like cn=admin, ...).
*/
public ArrayList<String> excludedServiceIDs = new ArrayList<String>();
+ //HashSet<String> excludedServiceIDs = new HashSet<String>();
/**
* Eligible changeNumber - only changes older or equal to eligibleCN
@@ -565,7 +566,8 @@
// Get the draftLimits (from the eligibleCN got at the beginning of
// the operation.
- int[] limits = getECLDraftCNLimits(eligibleCN);
+ int[] limits = replicationServer.getECLDraftCNLimits(
+ eligibleCN, excludedServiceIDs);
if (startDraftCN<=limits[1])
{
@@ -630,7 +632,7 @@
continue;
// skip the excluded domains
- if (isServiceIDExcluded(rsd.getBaseDn()))
+ if (excludedServiceIDs.contains(rsd.getBaseDn()))
continue;
// Creates the new domain context
@@ -829,11 +831,6 @@
throws DirectoryException
{
- //
- //this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ?
- //this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ?
- //this.setConsumerActive(true);
-
this.operationId = startECLSessionMsg.getOperationId();
this.setName(this.getClass().getCanonicalName()+ " " + operationId);
@@ -1434,126 +1431,4 @@
eligibleCN = replicationServer.getEligibleCN();
}
- /*
- * Get first and last DraftCN
- * @param crossDomainEligibleCN
- * @return
- */
- private int[] getECLDraftCNLimits(ChangeNumber crossDomainEligibleCN)
- throws DirectoryException
- {
- /* The content of the DraftCNdb depends on the SEARCH operations done before
- * requesting the DraftCN. If no operations, DraftCNdb is empty.
- * The limits we want to get are the "potential" limits if a request was
- * done, the DraftCNdb is probably not complete to do that.
- *
- * The first DraftCN is :
- * - the first record from the DraftCNdb
- * - if none because DraftCNdb empty,
- * then
- * if no change in replchangelog then return 0
- * else return 1 (DraftCN that WILL be returned to next search)
- *
- * The last DraftCN is :
- * - initialized with the last record from the DraftCNdb (0 if none)
- * and consider the genState associated
- * - to the last DraftCN, we add the count of updates in the replchangelog
- * FROM that genState TO the crossDomainEligibleCN
- * (this diff is done domain by domain)
- */
-
- int firstDraftCN;
- int lastDraftCN;
- boolean DraftCNdbIsEmpty;
- DraftCNDbHandler draftCNDbH = replicationServer.getDraftCNDbHandler();
-
- ReplicationServer rs = replicationServerDomain.getReplicationServer();
-
- // Get the first DraftCN from the DraftCNdb
- firstDraftCN = draftCNDbH.getFirstKey();
- HashMap<String,ServerState> domainsServerStateForLastSeqnum = null;
- if (firstDraftCN < 1)
- {
- DraftCNdbIsEmpty=true;
- firstDraftCN = 0;
- lastDraftCN = 0;
- }
- else
- {
- DraftCNdbIsEmpty=false;
-
- // Get the last DraftCN from the DraftCNdb
- lastDraftCN = draftCNDbH.getLastKey();
-
- // Get the generalized state associated with the current last DraftCN
- // and initializes from it the startStates table
- String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
- if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
- {
- domainsServerStateForLastSeqnum = MultiDomainServerState.
- splitGenStateToServerStates(lastSeqnumGenState);
- }
- }
-
- // Domain by domain
- Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
- if (rsdi != null)
- {
- while (rsdi.hasNext())
- {
- // process a domain
- ReplicationServerDomain rsd = rsdi.next();
-
- if (isServiceIDExcluded(rsd.getBaseDn()))
- continue;
-
- // for this domain, have the state in the replchangelog
- // where the last DraftCN update is
- ServerState domainServerStateForLastSeqnum;
- if ((domainsServerStateForLastSeqnum == null) ||
- (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null))
- {
- domainServerStateForLastSeqnum = new ServerState();
- }
- else
- {
- domainServerStateForLastSeqnum =
- domainsServerStateForLastSeqnum.get(rsd.getBaseDn());
- }
-
- // Count the number of (eligible) changes from this place
- // to the eligible CN (cross server)
- long ec = rsd.getEligibleCount(
- domainServerStateForLastSeqnum, crossDomainEligibleCN);
-
- // ... hum ...
- if ((ec>0) && (DraftCNdbIsEmpty==false))
- ec--;
-
- // cumulates on domains
- lastDraftCN += ec;
-
- // DraftCN is empty and there are eligible updates in the repl changelog
- // then init first DraftCN
- if ((ec>0) && (firstDraftCN==0))
- firstDraftCN = 1;
- }
- }
- return new int[]{firstDraftCN, lastDraftCN};
- }
-
- private boolean isServiceIDExcluded(String serviceID)
- {
- // skip the excluded domains
- boolean excluded = false;
- for(String excludedServiceID : this.excludedServiceIDs)
- {
- if (excludedServiceID.equalsIgnoreCase(serviceID))
- {
- excluded=true;
- break;
- }
- }
- return excluded;
- }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 0b2ae52..4c98bbe 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -70,6 +71,8 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ExternalChangeLogSession;
+import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
@@ -1584,4 +1587,116 @@
return ++lastGeneratedDraftCN;
}
+ /**
+ * Get first and last DraftCN.
+ * @param crossDomainEligibleCN The provided crossDomainEligibleCN used as
+ * the upper limit for the lastDraftCN
+ * @param excludedServiceIDs The serviceIDs that are excluded from the ECL.
+ * @return The first and last draftCN.
+ * @throws DirectoryException a.
+ */
+ public int[] getECLDraftCNLimits(
+ ChangeNumber crossDomainEligibleCN,
+ ArrayList<String> excludedServiceIDs)
+ throws DirectoryException
+ {
+ /* The content of the DraftCNdb depends on the SEARCH operations done before
+ * requesting the DraftCN. If no operations, DraftCNdb is empty.
+ * The limits we want to get are the "potential" limits if a request was
+ * done, the DraftCNdb is probably not complete to do that.
+ *
+ * The first DraftCN is :
+ * - the first record from the DraftCNdb
+ * - if none because DraftCNdb empty,
+ * then
+ * if no change in replchangelog then return 0
+ * else return 1 (DraftCN that WILL be returned to next search)
+ *
+ * The last DraftCN is :
+ * - initialized with the last record from the DraftCNdb (0 if none)
+ * and consider the genState associated
+ * - to the last DraftCN, we add the count of updates in the replchangelog
+ * FROM that genState TO the crossDomainEligibleCN
+ * (this diff is done domain by domain)
+ */
+
+ int firstDraftCN;
+ int lastDraftCN;
+ boolean DraftCNdbIsEmpty;
+ DraftCNDbHandler draftCNDbH = this.getDraftCNDbHandler();
+
+ // Get the first DraftCN from the DraftCNdb
+ firstDraftCN = draftCNDbH.getFirstKey();
+ HashMap<String,ServerState> domainsServerStateForLastSeqnum = null;
+ if (firstDraftCN < 1)
+ {
+ DraftCNdbIsEmpty=true;
+ firstDraftCN = 0;
+ lastDraftCN = 0;
+ }
+ else
+ {
+ DraftCNdbIsEmpty=false;
+
+ // Get the last DraftCN from the DraftCNdb
+ lastDraftCN = draftCNDbH.getLastKey();
+
+ // Get the generalized state associated with the current last DraftCN
+ // and initializes from it the startStates table
+ String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
+ if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
+ {
+ domainsServerStateForLastSeqnum = MultiDomainServerState.
+ splitGenStateToServerStates(lastSeqnumGenState);
+ }
+ }
+
+ // Domain by domain
+ Iterator<ReplicationServerDomain> rsdi = this.getDomainIterator();
+ if (rsdi != null)
+ {
+ while (rsdi.hasNext())
+ {
+ // process a domain
+ ReplicationServerDomain rsd = rsdi.next();
+
+ if (excludedServiceIDs.contains(rsd.getBaseDn()))
+ continue;
+
+ // for this domain, have the state in the replchangelog
+ // where the last DraftCN update is
+ ServerState domainServerStateForLastSeqnum;
+ if ((domainsServerStateForLastSeqnum == null) ||
+ (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null))
+ {
+ domainServerStateForLastSeqnum = new ServerState();
+ }
+ else
+ {
+ domainServerStateForLastSeqnum =
+ domainsServerStateForLastSeqnum.get(rsd.getBaseDn());
+ }
+
+ // Count the number of (eligible) changes from this place
+ // to the eligible CN (cross server)
+ long ec = rsd.getEligibleCount(
+ domainServerStateForLastSeqnum, crossDomainEligibleCN);
+
+ // the state from which we started is the one BEFORE the lastdraftCN
+ // so we must decrement 1 to the EligibleCount
+ if ((ec>0) && (DraftCNdbIsEmpty==false))
+ ec--;
+
+ // cumulates on domains
+ lastDraftCN += ec;
+
+ // DraftCN Db is empty and there are eligible updates in the replication
+ // changelog then init first DraftCN
+ if ((ec>0) && (firstDraftCN==0))
+ firstDraftCN = 1;
+ }
+ }
+ return new int[]{firstDraftCN, lastDraftCN};
+ }
+
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4f844d4..46ed7dd 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -212,6 +212,7 @@
public void put(UpdateMsg update, ServerHandler sourceHandler)
throws IOException
{
+
ChangeNumber cn = update.getChangeNumber();
short id = cn.getServerId();
sourceHandler.updateServerState(update);
@@ -3089,8 +3090,8 @@
/**
* This methods count the changes, server by server :
- * - from a start point (cn taken from the provided startState)
- * - to an end point (the provided endCN).
+ * - from a serverState start point
+ * - to (inclusive) an end point (the provided endCN).
* @param startState The provided start server state.
* @param endCN The provided end change number.
* @return The number of changes between startState and endCN.
@@ -3116,12 +3117,14 @@
try
{
ri = h.generateIterator(startState.getMaxChangeNumber(sid));
- startCN = ri.getChange().getChangeNumber();
+ if (ri.next()==true)
+ {
+ startCN = ri.getChange().getChangeNumber();
+ }
}
catch(Exception e)
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
- // no change found (purge from CL)
startCN = null;
}
finally
@@ -3136,19 +3139,20 @@
if (startCN != null)
{
// Set on the change related to the endCN
- ChangeNumber upperCN;
+ ChangeNumber upperCN = null;
try
{
// Build a changenumber for this very server, with the timestamp
// of the endCN
ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid);
ri = h.generateIterator(f);
- upperCN = ri.getChange().getChangeNumber();
+ if (ri.next()==true)
+ {
+ upperCN = ri.getChange().getChangeNumber();
+ }
}
catch(Exception e)
{
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- // no new change
upperCN = h.getLastChange();
}
finally
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
index 85ff760..a2339f1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -200,38 +200,104 @@
@Test(enabled=true)
public void ECLReplicationServerTest()
{
+ // --
+ // First set of test are in the cookie mode
+
+ // Test that private backend is excluded from ECL
ECLOnPrivateBackend();replicationServer.clearDb();
+
+ // Test remote API (ECL through replication protocol) with empty ECL
ECLRemoteEmpty();replicationServer.clearDb();
+
+ // Test with empty changelog
ECLEmpty();replicationServer.clearDb();
- ECLAllOps();replicationServer.clearDb();
- ECLRemoteNonEmpty();replicationServer.clearDb();
- ECLTwoDomains();replicationServer.clearDb();
- ECLPsearch(true, false);replicationServer.clearDb();
- ECLPsearch(false, false);replicationServer.clearDb();
- ECLSimulPsearches();replicationServer.clearDb();
+
+ // Test all types of ops.
+ ECLAllOps(); // Do not clean the db for the next test
+ // First and last should be ok whenever a request has been done or not
+ // in compat mode.
+ ECLCompatTestLimits(1,4);replicationServer.clearDb();
+
+ // Test remote API (ECL through replication protocol) with NON empty ECL
+ ECLRemoteNonEmpty();replicationServer.clearDb();
+
+ // Test with a mix of domains, a mix of DSes
+ ECLTwoDomains();replicationServer.clearDb();
+
+ // Persistent search with changesOnly request
+ ECLPsearch(true, false);replicationServer.clearDb();
+
+ // Persistent search with init values request
+ ECLPsearch(false, false);replicationServer.clearDb();
+
+ // Simultaneous psearches
+ ECLSimultaneousPsearches();replicationServer.clearDb();
+
+ // Test eligible count method.
+ ECLGetEligibleCountTest();replicationServer.clearDb();
+
// TODO:ECL Test SEARCH abandon and check everything shutdown and cleaned
// TODO:ECL Test PSEARCH abandon and check everything shutdown and cleaned
// TODO:ECL Test invalid DN in cookie returns UNWILLING + message
// TODO:ECL Test notif control returned contains the cookie
// TODO:ECL Test the attributes list and values returned in ECL entries
// TODO:ECL Test search -s base, -s one
+
+ // Test directly from the java obect that the changeTimeHeartbeatState
+ // stored are ok.
ChangeTimeHeartbeatTest();replicationServer.clearDb();
+
+ // Test the different forms of filter that are parsed in order to
+ // optimize the request.
ECLFilterTest();
+ // --
+ // Second set of test are in the draft compat mode
+
+ // Empty replication changelog
ECLCompatEmpty();
+
+ // Request from an invalid draft change number
ECLCompatBadSeqnum();
- ECLCompatWriteReadAllOps(1);
- ECLCompatWriteReadAllOps(5);
+
+ // Write changes and read ECL from start
+ int ts = ECLCompatWriteReadAllOps(1);
+
+ // Write additional changes and read ECL from a provided draft change number
+ ts = ECLCompatWriteReadAllOps(5);
+
+ // Test request from a provided change number
ECLCompatReadFrom(6);
+
+ // Test request from a provided change number interval
ECLCompatReadFromTo(5,7);
+
+ // Test first and last draft changenumber
ECLCompatTestLimits(1,8);
+
+ // Test first and last draft changenumber, a dd a new change, do not
+ // search again the ECL, but search fro first and last
+ ECLCompatTestLimitsAndAdd(1,8, ts);
+
+ // Test DraftCNDb is purged when replication change log is purged
ECLCompatPurge();
+
+ // Test first and last are updated
ECLCompatTestLimits(0,0);
+
+ // Persistent search in changesOnly mode
ECLPsearch(true, true);replicationServer.clearDb();
+
+ // Persistent search in init + changes mode
ECLPsearch(false, true);
+
+ // Test Filter on replication csn
+ // TODO: test with optimization when code done.
ECLFilterOnReplicationCsn();replicationServer.clearDb();
- ECLSimulPsearches();replicationServer.clearDb();
+
+ // Test simultaneous persistent searches in draft compat mode.
+ ECLSimultaneousPsearches();replicationServer.clearDb();
}
@@ -592,6 +658,21 @@
assertTrue(entries != null);
assertTrue(entries.size()==0);
+ //
+ // Test lastExternalChangelogCookie attribute of the ECL
+ //
+ /* FIXME: uncomment when fix available
+ ExternalChangeLogSessionImpl session =
+ new ExternalChangeLogSessionImpl(replicationServer);
+ MultiDomainServerState expectedLastCookie =
+ new MultiDomainServerState("o=test:;");
+ MultiDomainServerState lastCookie = session.getLastCookie();
+ assertTrue(expectedLastCookie.equalsTo(lastCookie),
+ " ExpectedLastCookie=" + expectedLastCookie +
+ " lastCookie=" + lastCookie);
+ assertLastCookieEquals(tn, expectedLastCookie);
+ */
+
// Cleaning
if (domain2 != null)
MultimasterReplication.deleteDomain(baseDn2);
@@ -917,47 +998,8 @@
assertTrue(expectedLastCookie.equalsTo(lastCookie),
" ExpectedLastCookie=" + expectedLastCookie +
" lastCookie=" + lastCookie);
-
- //
- LinkedHashSet<String> lastcookieattribute = new LinkedHashSet<String>();
- lastcookieattribute.add("lastExternalChangelogCookie");
-
- searchOp = connection.processSearch(
- ByteString.valueOf(""),
- SearchScope.BASE_OBJECT,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- 0, // Size limit
- 0, // Time limit
- false, // Types only
- LDAPFilter.decode("(objectclass=*)"),
- lastcookieattribute,
- NO_CONTROL,
- null);
-
- assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
- searchOp.getErrorMessage().toString()
- + searchOp.getAdditionalLogMessage());
- cookie = "";
- entries = searchOp.getSearchEntries();
- if (entries != null)
- {
- for (SearchResultEntry resultEntry : entries)
- {
- debugInfo(tn, "Result entry=\n" + resultEntry.toLDIFString());
- ldifWriter.writeEntry(resultEntry);
- try
- {
- List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie");
- cookie = l.get(0).iterator().next().toString();
- }
- catch(NullPointerException e)
- {}
- }
- }
-
- assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)),
- " Expected last cookie attribute value:" + expectedLastCookie +
- " Read from server: " + cookie + " are equal :");
+ assertLastCookieEquals(tn, expectedLastCookie);
+
s1test.stop();
s1test2.stop();
s2test.stop();
@@ -973,6 +1015,61 @@
debugInfo(tn, "Ending test successfully");
}
+ private void assertLastCookieEquals(String tn,
+ MultiDomainServerState expectedLastCookie)
+ {
+ String cookie = "";
+ LDIFWriter ldifWriter = getLDIFWriter();
+
+ //
+ LinkedHashSet<String> lastcookieattribute = new LinkedHashSet<String>();
+ lastcookieattribute.add("lastExternalChangelogCookie");
+
+ try
+ {
+ InternalSearchOperation searchOp =
+ connection.processSearch(
+ ByteString.valueOf(""),
+ SearchScope.BASE_OBJECT,
+ DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, // Size limit
+ 0, // Time limit
+ false, // Types only
+ LDAPFilter.decode("(objectclass=*)"),
+ lastcookieattribute,
+ NO_CONTROL,
+ null);
+
+ assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
+ searchOp.getErrorMessage().toString()
+ + searchOp.getAdditionalLogMessage());
+ LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
+ if (entries != null)
+ {
+ for (SearchResultEntry resultEntry : entries)
+ {
+ ldifWriter.writeEntry(resultEntry);
+ try
+ {
+ List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie");
+ cookie = l.get(0).iterator().next().toString();
+ }
+ catch(NullPointerException e)
+ {}
+ }
+
+ }
+ }
+ catch(Exception e)
+ {
+ fail("Ending test " + tn + " with exception:\n"
+ + stackTraceToSingleLineString(e));
+ }
+ assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)),
+ " Expected last cookie attribute value:" + expectedLastCookie +
+ " Read from server: " + cookie + " are equal :");
+ }
+
// simple update to be received
private void ECLAllOps()
{
@@ -1516,9 +1613,9 @@
/**
* Test parallel simultaneous psearch with different filters.
*/
- private void ECLSimulPsearches()
+ private void ECLSimultaneousPsearches()
{
- String tn = "ECLSimulPsearches";
+ String tn = "ECLSimultaneousPsearches";
debugInfo(tn, "Starting test \n\n");
Socket s1, s2, s3 = null;
boolean compatMode = false;
@@ -2304,10 +2401,11 @@
}
}
- private void ECLCompatWriteReadAllOps(int firstDraftChangeNumber)
+ private int ECLCompatWriteReadAllOps(int firstDraftChangeNumber)
{
String tn = "ECLCompatWriteReadAllOps/" + String.valueOf(firstDraftChangeNumber);
debugInfo(tn, "Starting test\n\n");
+ int ts = 1;
try
{
@@ -2318,7 +2416,6 @@
DN.decode(TEST_ROOT_DN_STRING), (short) 1201,
100, replicationServerPort,
1000, true);
- int ts = 1;
String user1entryUUID = "11111111-1112-1113-1114-111111111115";
String baseUUID = "22222222-2222-2222-2222-222222222222";
@@ -2583,6 +2680,7 @@
+ stackTraceToSingleLineString(e));
}
debugInfo(tn, "Ending test with success");
+ return ts;
}
private void ECLCompatReadFrom(int firstDraftChangeNumber)
@@ -2987,4 +3085,134 @@
}
debugInfo(tn, "Ending test with success");
}
+
+ private void ECLCompatTestLimitsAndAdd(int expectedFirst, int expectedLast,
+ int ts)
+ {
+ String tn = "ECLCompatTestLimitsAndAdd";
+ debugInfo(tn, "Starting test\n\n");
+ try
+ {
+ ECLCompatTestLimits(expectedFirst, expectedLast);
+
+ // Creates broker on o=test
+ ReplicationBroker server01 = openReplicationSession(
+ DN.decode(TEST_ROOT_DN_STRING), (short) 1201,
+ 100, replicationServerPort,
+ 1000, true);
+
+ String user1entryUUID = "11111111-1112-1113-1114-111111111115";
+
+ // Publish DEL
+ ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201);
+ DeleteMsg delMsg =
+ new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1,
+ user1entryUUID);
+ server01.publish(delMsg);
+ debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+
+ ECLCompatTestLimits(expectedFirst, expectedLast+1);
+
+ server01.stop();
+ }
+ catch(Exception e)
+ {
+ fail("Ending "+tn+" test with exception:\n"
+ + stackTraceToSingleLineString(e));
+ }
+ debugInfo(tn, "Ending test with success");
+ }
+
+ private void ECLGetEligibleCountTest()
+ {
+ String tn = "ECLGetEligibleCountTest";
+ debugInfo(tn, "Starting test\n\n");
+ String user1entryUUID = "11111111-1112-1113-1114-111111111115";
+ try
+ {
+ // The replication changelog is empty
+ ReplicationServerDomain rsdtest =
+ replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
+ long count = rsdtest.getEligibleCount(
+ new ServerState(),
+ new ChangeNumber(TimeThread.getTime(), 1, (short)1201));
+ assertEquals(count, 0);
+
+ // Creates broker on o=test
+ ReplicationBroker server01 = openReplicationSession(
+ DN.decode(TEST_ROOT_DN_STRING), (short) 1201,
+ 100, replicationServerPort,
+ 1000, true);
+
+ // Publish 1 message
+ ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), 1, (short)1201);
+ DeleteMsg delMsg =
+ new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1,
+ user1entryUUID);
+ server01.publish(delMsg);
+ debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+ sleep(300);
+
+ count = rsdtest.getEligibleCount(
+ new ServerState(),
+ new ChangeNumber(TimeThread.getTime(), 1, (short)1201));
+ assertEquals(count, 1);
+
+ // Publish 1 message
+ ChangeNumber cn2 = new ChangeNumber(TimeThread.getTime(), 2, (short)1201);
+ delMsg =
+ new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn2,
+ user1entryUUID);
+ server01.publish(delMsg);
+ debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+ sleep(300);
+
+ count = rsdtest.getEligibleCount(
+ new ServerState(),
+ new ChangeNumber(TimeThread.getTime(), 1, (short)1201));
+ assertEquals(count, 2);
+
+ count = rsdtest.getEligibleCount(
+ new ServerState(), cn1);
+ assertEquals(count, 1);
+
+ ServerState ss = new ServerState();
+ ss.update(cn1);
+ count = rsdtest.getEligibleCount(ss, cn1);
+ assertEquals(count, 0);
+
+ count = rsdtest.getEligibleCount(ss, cn2);
+ assertEquals(count, 1);
+
+ ss.update(cn2);
+ count = rsdtest.getEligibleCount(ss,
+ new ChangeNumber(TimeThread.getTime(), 4, (short)1201));
+ assertEquals(count, 0);
+
+ // Publish 1 message
+ ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), 3, (short)1201);
+ delMsg =
+ new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn3,
+ user1entryUUID);
+ server01.publish(delMsg);
+ debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+ sleep(300);
+
+ ss.update(cn2);
+ count = rsdtest.getEligibleCount(ss,
+ new ChangeNumber(TimeThread.getTime(), 4, (short)1201));
+ assertEquals(count, 1);
+
+
+ server01.stop();
+
+ }
+ catch(Exception e)
+ {
+ fail("Ending "+tn+" test with exception:\n"
+ + stackTraceToSingleLineString(e));
+ }
+ debugInfo(tn, "Ending test with success");
+ }
+
}
\ No newline at end of file
--
Gitblit v1.10.0