opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
@@ -25,8 +25,8 @@ * Copyright 2009 Sun Microsystems, Inc. */ package org.opends.server.replication.common; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -39,7 +39,8 @@ import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.SearchOperation; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.AttributeValue; import org.opends.server.types.AttributeValues; import org.opends.server.types.ByteString; @@ -48,6 +49,7 @@ import org.opends.server.types.InitializationException; import org.opends.server.types.ResultCode; import org.opends.server.types.VirtualAttributeRule; import org.opends.server.util.ServerConstants; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; @@ -63,7 +65,6 @@ extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg> implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg> { private static final DebugTracer TRACER = getTracer(); // The current configuration for this virtual attribute provider. private UserDefinedVirtualAttributeCfg currentConfig; @@ -137,8 +138,19 @@ DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); if (eclwe!=null) { first = String.valueOf( eclwe.getReplicationServer().getFirstDraftChangeNumber()); // Set a list of excluded domains (also exclude 'cn=changelog' itself) ArrayList<String> excludedDomains = MultimasterReplication.getPrivateDomains(); if (!excludedDomains.contains( ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); ReplicationServer rs = eclwe.getReplicationServer(); int[] limits = rs.getECLDraftCNLimits( rs.getEligibleCN(), excludedDomains); first = String.valueOf(limits[0]); } } catch(Exception e) opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
@@ -27,6 +27,7 @@ package org.opends.server.replication.common; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -40,6 +41,8 @@ import org.opends.server.core.DirectoryServer; import org.opends.server.core.SearchOperation; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.AttributeValue; import org.opends.server.types.AttributeValues; import org.opends.server.types.ByteString; @@ -48,6 +51,7 @@ import org.opends.server.types.InitializationException; import org.opends.server.types.ResultCode; import org.opends.server.types.VirtualAttributeRule; import org.opends.server.util.ServerConstants; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; @@ -137,8 +141,18 @@ DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); if (eclwe!=null) { last = String.valueOf( eclwe.getReplicationServer().getLastDraftChangeNumber()); // Set a list of excluded domains (also exclude 'cn=changelog' itself) ArrayList<String> excludedDomains = MultimasterReplication.getPrivateDomains(); if (!excludedDomains.contains( ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); ReplicationServer rs = eclwe.getReplicationServer(); int[] limits = rs.getECLDraftCNLimits( rs.getEligibleCN(), excludedDomains); last = String.valueOf(limits[1]); } } catch(Exception e) opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -315,28 +315,28 @@ if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1) { return "ModifyDNMsg content: " + "\nprotocolVersion: " + protocolVersion + "\ndn: " + dn + "\nchangeNumber: " + changeNumber + "\nuniqueId: " + uniqueId + "\nassuredFlag: " + assuredFlag + "\nnewRDN: " + newRDN + "\nnewSuperior: " + newSuperior + "\ndeleteOldRdn: " + deleteOldRdn; " protocolVersion: " + protocolVersion + " dn: " + dn + " changeNumber: " + changeNumber + " uniqueId: " + uniqueId + " assuredFlag: " + assuredFlag + " newRDN: " + newRDN + " newSuperior: " + newSuperior + " deleteOldRdn: " + deleteOldRdn; } if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2) { return "ModifyDNMsg content: " + "\nprotocolVersion: " + protocolVersion + "\ndn: " + dn + "\nchangeNumber: " + changeNumber + "\nuniqueId: " + uniqueId + "\nnewRDN: " + newRDN + "\nnewSuperior: " + newSuperior + "\ndeleteOldRdn: " + deleteOldRdn + "\nassuredFlag: " + assuredFlag + "\nassuredMode: " + assuredMode + "\nsafeDataLevel: " + safeDataLevel; " protocolVersion: " + protocolVersion + " dn: " + dn + " changeNumber: " + changeNumber + " uniqueId: " + uniqueId + " newRDN: " + newRDN + " newSuperior: " + newSuperior + " deleteOldRdn: " + deleteOldRdn + " assuredFlag: " + assuredFlag + " assuredMode: " + assuredMode + " safeDataLevel: " + safeDataLevel; } return "!!! Unknown version: " + protocolVersion + "!!!"; } opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -197,22 +197,22 @@ if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1) { return "ModifyMsg content: " + "\nprotocolVersion: " + protocolVersion + "\ndn: " + dn + "\nchangeNumber: " + changeNumber + "\nuniqueId: " + uniqueId + "\nassuredFlag: " + assuredFlag; " protocolVersion: " + protocolVersion + " dn: " + dn + " changeNumber: " + changeNumber + " uniqueId: " + uniqueId + " assuredFlag: " + assuredFlag; } if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2) { return "ModifyMsg content: " + "\nprotocolVersion: " + protocolVersion + "\ndn: " + dn + "\nchangeNumber: " + changeNumber + "\nuniqueId: " + uniqueId + "\nassuredFlag: " + assuredFlag + "\nassuredMode: " + assuredMode + "\nsafeDataLevel: " + safeDataLevel; " protocolVersion: " + protocolVersion + " dn: " + dn + " changeNumber: " + changeNumber + " uniqueId: " + uniqueId + " assuredFlag: " + assuredFlag + " assuredMode: " + assuredMode + " safeDataLevel: " + safeDataLevel; } return "!!! Unknown version: " + protocolVersion + "!!!"; } opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -43,7 +43,7 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.server.DraftCNDB.*; import org.opends.server.replication.server.DraftCNDB.DraftCNDBCursor; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.InitializationException; @@ -81,7 +81,6 @@ private boolean shutdown = false; private boolean trimDone = false; private DirectoryThread thread = null; private final Object flushLock = new Object(); private ReplicationServer replicationServer; // The maximum number of retries in case of DatabaseDeadlock Exception. @@ -142,7 +141,8 @@ db.addEntry(key, value, serviceID, cn); if (debugEnabled()) TRACER.debugInfo("In DraftCNDbhandler.add, added: " TRACER.debugInfo( "In DraftCNDbhandler.add, added: " + " key=" + key + " value=" + value + " serviceID=" + serviceID opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -105,6 +105,7 @@ * Specifies the excluded DNs (like cn=admin, ...). */ public ArrayList<String> excludedServiceIDs = new ArrayList<String>(); //HashSet<String> excludedServiceIDs = new HashSet<String>(); /** * Eligible changeNumber - only changes older or equal to eligibleCN @@ -565,7 +566,8 @@ // Get the draftLimits (from the eligibleCN got at the beginning of // the operation. int[] limits = getECLDraftCNLimits(eligibleCN); int[] limits = replicationServer.getECLDraftCNLimits( eligibleCN, excludedServiceIDs); if (startDraftCN<=limits[1]) { @@ -630,7 +632,7 @@ continue; // skip the excluded domains if (isServiceIDExcluded(rsd.getBaseDn())) if (excludedServiceIDs.contains(rsd.getBaseDn())) continue; // Creates the new domain context @@ -829,11 +831,6 @@ throws DirectoryException { // //this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ? //this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ? //this.setConsumerActive(true); this.operationId = startECLSessionMsg.getOperationId(); this.setName(this.getClass().getCanonicalName()+ " " + operationId); @@ -1434,126 +1431,4 @@ eligibleCN = replicationServer.getEligibleCN(); } /* * Get first and last DraftCN * @param crossDomainEligibleCN * @return */ private int[] getECLDraftCNLimits(ChangeNumber crossDomainEligibleCN) throws DirectoryException { /* The content of the DraftCNdb depends on the SEARCH operations done before * requesting the DraftCN. If no operations, DraftCNdb is empty. * The limits we want to get are the "potential" limits if a request was * done, the DraftCNdb is probably not complete to do that. * * The first DraftCN is : * - the first record from the DraftCNdb * - if none because DraftCNdb empty, * then * if no change in replchangelog then return 0 * else return 1 (DraftCN that WILL be returned to next search) * * The last DraftCN is : * - initialized with the last record from the DraftCNdb (0 if none) * and consider the genState associated * - to the last DraftCN, we add the count of updates in the replchangelog * FROM that genState TO the crossDomainEligibleCN * (this diff is done domain by domain) */ int firstDraftCN; int lastDraftCN; boolean DraftCNdbIsEmpty; DraftCNDbHandler draftCNDbH = replicationServer.getDraftCNDbHandler(); ReplicationServer rs = replicationServerDomain.getReplicationServer(); // Get the first DraftCN from the DraftCNdb firstDraftCN = draftCNDbH.getFirstKey(); HashMap<String,ServerState> domainsServerStateForLastSeqnum = null; if (firstDraftCN < 1) { DraftCNdbIsEmpty=true; firstDraftCN = 0; lastDraftCN = 0; } else { DraftCNdbIsEmpty=false; // Get the last DraftCN from the DraftCNdb lastDraftCN = draftCNDbH.getLastKey(); // Get the generalized state associated with the current last DraftCN // and initializes from it the startStates table String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN); if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0)) { domainsServerStateForLastSeqnum = MultiDomainServerState. splitGenStateToServerStates(lastSeqnumGenState); } } // Domain by domain Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator(); if (rsdi != null) { while (rsdi.hasNext()) { // process a domain ReplicationServerDomain rsd = rsdi.next(); if (isServiceIDExcluded(rsd.getBaseDn())) continue; // for this domain, have the state in the replchangelog // where the last DraftCN update is ServerState domainServerStateForLastSeqnum; if ((domainsServerStateForLastSeqnum == null) || (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null)) { domainServerStateForLastSeqnum = new ServerState(); } else { domainServerStateForLastSeqnum = domainsServerStateForLastSeqnum.get(rsd.getBaseDn()); } // Count the number of (eligible) changes from this place // to the eligible CN (cross server) long ec = rsd.getEligibleCount( domainServerStateForLastSeqnum, crossDomainEligibleCN); // ... hum ... if ((ec>0) && (DraftCNdbIsEmpty==false)) ec--; // cumulates on domains lastDraftCN += ec; // DraftCN is empty and there are eligible updates in the repl changelog // then init first DraftCN if ((ec>0) && (firstDraftCN==0)) firstDraftCN = 1; } } return new int[]{firstDraftCN, lastDraftCN}; } private boolean isServiceIDExcluded(String serviceID) { // skip the excluded domains boolean excluded = false; for(String excludedServiceID : this.excludedServiceIDs) { if (excludedServiceID.equalsIgnoreCase(serviceID)) { excluded=true; break; } } return excluded; } } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -70,6 +71,8 @@ import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ExternalChangeLogSession; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ReplServerStartMsg; import org.opends.server.replication.protocol.ReplSessionSecurity; @@ -1584,4 +1587,116 @@ return ++lastGeneratedDraftCN; } /** * Get first and last DraftCN. * @param crossDomainEligibleCN The provided crossDomainEligibleCN used as * the upper limit for the lastDraftCN * @param excludedServiceIDs The serviceIDs that are excluded from the ECL. * @return The first and last draftCN. * @throws DirectoryException a. */ public int[] getECLDraftCNLimits( ChangeNumber crossDomainEligibleCN, ArrayList<String> excludedServiceIDs) throws DirectoryException { /* The content of the DraftCNdb depends on the SEARCH operations done before * requesting the DraftCN. If no operations, DraftCNdb is empty. * The limits we want to get are the "potential" limits if a request was * done, the DraftCNdb is probably not complete to do that. * * The first DraftCN is : * - the first record from the DraftCNdb * - if none because DraftCNdb empty, * then * if no change in replchangelog then return 0 * else return 1 (DraftCN that WILL be returned to next search) * * The last DraftCN is : * - initialized with the last record from the DraftCNdb (0 if none) * and consider the genState associated * - to the last DraftCN, we add the count of updates in the replchangelog * FROM that genState TO the crossDomainEligibleCN * (this diff is done domain by domain) */ int firstDraftCN; int lastDraftCN; boolean DraftCNdbIsEmpty; DraftCNDbHandler draftCNDbH = this.getDraftCNDbHandler(); // Get the first DraftCN from the DraftCNdb firstDraftCN = draftCNDbH.getFirstKey(); HashMap<String,ServerState> domainsServerStateForLastSeqnum = null; if (firstDraftCN < 1) { DraftCNdbIsEmpty=true; firstDraftCN = 0; lastDraftCN = 0; } else { DraftCNdbIsEmpty=false; // Get the last DraftCN from the DraftCNdb lastDraftCN = draftCNDbH.getLastKey(); // Get the generalized state associated with the current last DraftCN // and initializes from it the startStates table String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN); if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0)) { domainsServerStateForLastSeqnum = MultiDomainServerState. splitGenStateToServerStates(lastSeqnumGenState); } } // Domain by domain Iterator<ReplicationServerDomain> rsdi = this.getDomainIterator(); if (rsdi != null) { while (rsdi.hasNext()) { // process a domain ReplicationServerDomain rsd = rsdi.next(); if (excludedServiceIDs.contains(rsd.getBaseDn())) continue; // for this domain, have the state in the replchangelog // where the last DraftCN update is ServerState domainServerStateForLastSeqnum; if ((domainsServerStateForLastSeqnum == null) || (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null)) { domainServerStateForLastSeqnum = new ServerState(); } else { domainServerStateForLastSeqnum = domainsServerStateForLastSeqnum.get(rsd.getBaseDn()); } // Count the number of (eligible) changes from this place // to the eligible CN (cross server) long ec = rsd.getEligibleCount( domainServerStateForLastSeqnum, crossDomainEligibleCN); // the state from which we started is the one BEFORE the lastdraftCN // so we must decrement 1 to the EligibleCount if ((ec>0) && (DraftCNdbIsEmpty==false)) ec--; // cumulates on domains lastDraftCN += ec; // DraftCN Db is empty and there are eligible updates in the replication // changelog then init first DraftCN if ((ec>0) && (firstDraftCN==0)) firstDraftCN = 1; } } return new int[]{firstDraftCN, lastDraftCN}; } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -212,6 +212,7 @@ public void put(UpdateMsg update, ServerHandler sourceHandler) throws IOException { ChangeNumber cn = update.getChangeNumber(); short id = cn.getServerId(); sourceHandler.updateServerState(update); @@ -3089,8 +3090,8 @@ /** * This methods count the changes, server by server : * - from a start point (cn taken from the provided startState) * - to an end point (the provided endCN). * - from a serverState start point * - to (inclusive) an end point (the provided endCN). * @param startState The provided start server state. * @param endCN The provided end change number. * @return The number of changes between startState and endCN. @@ -3116,12 +3117,14 @@ try { ri = h.generateIterator(startState.getMaxChangeNumber(sid)); startCN = ri.getChange().getChangeNumber(); if (ri.next()==true) { startCN = ri.getChange().getChangeNumber(); } } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); // no change found (purge from CL) startCN = null; } finally @@ -3136,19 +3139,20 @@ if (startCN != null) { // Set on the change related to the endCN ChangeNumber upperCN; ChangeNumber upperCN = null; try { // Build a changenumber for this very server, with the timestamp // of the endCN ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid); ri = h.generateIterator(f); upperCN = ri.getChange().getChangeNumber(); if (ri.next()==true) { upperCN = ri.getChange().getChangeNumber(); } } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); // no new change upperCN = h.getLastChange(); } finally opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -200,38 +200,104 @@ @Test(enabled=true) public void ECLReplicationServerTest() { // -- // First set of test are in the cookie mode // Test that private backend is excluded from ECL ECLOnPrivateBackend();replicationServer.clearDb(); // Test remote API (ECL through replication protocol) with empty ECL ECLRemoteEmpty();replicationServer.clearDb(); // Test with empty changelog ECLEmpty();replicationServer.clearDb(); ECLAllOps();replicationServer.clearDb(); ECLRemoteNonEmpty();replicationServer.clearDb(); ECLTwoDomains();replicationServer.clearDb(); ECLPsearch(true, false);replicationServer.clearDb(); ECLPsearch(false, false);replicationServer.clearDb(); ECLSimulPsearches();replicationServer.clearDb(); // Test all types of ops. ECLAllOps(); // Do not clean the db for the next test // First and last should be ok whenever a request has been done or not // in compat mode. ECLCompatTestLimits(1,4);replicationServer.clearDb(); // Test remote API (ECL through replication protocol) with NON empty ECL ECLRemoteNonEmpty();replicationServer.clearDb(); // Test with a mix of domains, a mix of DSes ECLTwoDomains();replicationServer.clearDb(); // Persistent search with changesOnly request ECLPsearch(true, false);replicationServer.clearDb(); // Persistent search with init values request ECLPsearch(false, false);replicationServer.clearDb(); // Simultaneous psearches ECLSimultaneousPsearches();replicationServer.clearDb(); // Test eligible count method. ECLGetEligibleCountTest();replicationServer.clearDb(); // TODO:ECL Test SEARCH abandon and check everything shutdown and cleaned // TODO:ECL Test PSEARCH abandon and check everything shutdown and cleaned // TODO:ECL Test invalid DN in cookie returns UNWILLING + message // TODO:ECL Test notif control returned contains the cookie // TODO:ECL Test the attributes list and values returned in ECL entries // TODO:ECL Test search -s base, -s one // Test directly from the java obect that the changeTimeHeartbeatState // stored are ok. ChangeTimeHeartbeatTest();replicationServer.clearDb(); // Test the different forms of filter that are parsed in order to // optimize the request. ECLFilterTest(); // -- // Second set of test are in the draft compat mode // Empty replication changelog ECLCompatEmpty(); // Request from an invalid draft change number ECLCompatBadSeqnum(); ECLCompatWriteReadAllOps(1); ECLCompatWriteReadAllOps(5); // Write changes and read ECL from start int ts = ECLCompatWriteReadAllOps(1); // Write additional changes and read ECL from a provided draft change number ts = ECLCompatWriteReadAllOps(5); // Test request from a provided change number ECLCompatReadFrom(6); // Test request from a provided change number interval ECLCompatReadFromTo(5,7); // Test first and last draft changenumber ECLCompatTestLimits(1,8); // Test first and last draft changenumber, a dd a new change, do not // search again the ECL, but search fro first and last ECLCompatTestLimitsAndAdd(1,8, ts); // Test DraftCNDb is purged when replication change log is purged ECLCompatPurge(); // Test first and last are updated ECLCompatTestLimits(0,0); // Persistent search in changesOnly mode ECLPsearch(true, true);replicationServer.clearDb(); // Persistent search in init + changes mode ECLPsearch(false, true); // Test Filter on replication csn // TODO: test with optimization when code done. ECLFilterOnReplicationCsn();replicationServer.clearDb(); ECLSimulPsearches();replicationServer.clearDb(); // Test simultaneous persistent searches in draft compat mode. ECLSimultaneousPsearches();replicationServer.clearDb(); } @@ -592,6 +658,21 @@ assertTrue(entries != null); assertTrue(entries.size()==0); // // Test lastExternalChangelogCookie attribute of the ECL // /* FIXME: uncomment when fix available ExternalChangeLogSessionImpl session = new ExternalChangeLogSessionImpl(replicationServer); MultiDomainServerState expectedLastCookie = new MultiDomainServerState("o=test:;"); MultiDomainServerState lastCookie = session.getLastCookie(); assertTrue(expectedLastCookie.equalsTo(lastCookie), " ExpectedLastCookie=" + expectedLastCookie + " lastCookie=" + lastCookie); assertLastCookieEquals(tn, expectedLastCookie); */ // Cleaning if (domain2 != null) MultimasterReplication.deleteDomain(baseDn2); @@ -917,47 +998,8 @@ assertTrue(expectedLastCookie.equalsTo(lastCookie), " ExpectedLastCookie=" + expectedLastCookie + " lastCookie=" + lastCookie); // LinkedHashSet<String> lastcookieattribute = new LinkedHashSet<String>(); lastcookieattribute.add("lastExternalChangelogCookie"); searchOp = connection.processSearch( ByteString.valueOf(""), SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(objectclass=*)"), lastcookieattribute, NO_CONTROL, null); assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString() + searchOp.getAdditionalLogMessage()); cookie = ""; entries = searchOp.getSearchEntries(); if (entries != null) { for (SearchResultEntry resultEntry : entries) { debugInfo(tn, "Result entry=\n" + resultEntry.toLDIFString()); ldifWriter.writeEntry(resultEntry); try { List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie"); cookie = l.get(0).iterator().next().toString(); } catch(NullPointerException e) {} } } assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)), " Expected last cookie attribute value:" + expectedLastCookie + " Read from server: " + cookie + " are equal :"); assertLastCookieEquals(tn, expectedLastCookie); s1test.stop(); s1test2.stop(); s2test.stop(); @@ -973,6 +1015,61 @@ debugInfo(tn, "Ending test successfully"); } private void assertLastCookieEquals(String tn, MultiDomainServerState expectedLastCookie) { String cookie = ""; LDIFWriter ldifWriter = getLDIFWriter(); // LinkedHashSet<String> lastcookieattribute = new LinkedHashSet<String>(); lastcookieattribute.add("lastExternalChangelogCookie"); try { InternalSearchOperation searchOp = connection.processSearch( ByteString.valueOf(""), SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only LDAPFilter.decode("(objectclass=*)"), lastcookieattribute, NO_CONTROL, null); assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString() + searchOp.getAdditionalLogMessage()); LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); if (entries != null) { for (SearchResultEntry resultEntry : entries) { ldifWriter.writeEntry(resultEntry); try { List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie"); cookie = l.get(0).iterator().next().toString(); } catch(NullPointerException e) {} } } } catch(Exception e) { fail("Ending test " + tn + " with exception:\n" + stackTraceToSingleLineString(e)); } assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)), " Expected last cookie attribute value:" + expectedLastCookie + " Read from server: " + cookie + " are equal :"); } // simple update to be received private void ECLAllOps() { @@ -1516,9 +1613,9 @@ /** * Test parallel simultaneous psearch with different filters. */ private void ECLSimulPsearches() private void ECLSimultaneousPsearches() { String tn = "ECLSimulPsearches"; String tn = "ECLSimultaneousPsearches"; debugInfo(tn, "Starting test \n\n"); Socket s1, s2, s3 = null; boolean compatMode = false; @@ -2304,10 +2401,11 @@ } } private void ECLCompatWriteReadAllOps(int firstDraftChangeNumber) private int ECLCompatWriteReadAllOps(int firstDraftChangeNumber) { String tn = "ECLCompatWriteReadAllOps/" + String.valueOf(firstDraftChangeNumber); debugInfo(tn, "Starting test\n\n"); int ts = 1; try { @@ -2318,7 +2416,6 @@ DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 100, replicationServerPort, 1000, true); int ts = 1; String user1entryUUID = "11111111-1112-1113-1114-111111111115"; String baseUUID = "22222222-2222-2222-2222-222222222222"; @@ -2583,6 +2680,7 @@ + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); return ts; } private void ECLCompatReadFrom(int firstDraftChangeNumber) @@ -2987,4 +3085,134 @@ } debugInfo(tn, "Ending test with success"); } private void ECLCompatTestLimitsAndAdd(int expectedFirst, int expectedLast, int ts) { String tn = "ECLCompatTestLimitsAndAdd"; debugInfo(tn, "Starting test\n\n"); try { ECLCompatTestLimits(expectedFirst, expectedLast); // Creates broker on o=test ReplicationBroker server01 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 100, replicationServerPort, 1000, true); String user1entryUUID = "11111111-1112-1113-1114-111111111115"; // Publish DEL ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201); DeleteMsg delMsg = new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1, user1entryUUID); server01.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); ECLCompatTestLimits(expectedFirst, expectedLast+1); server01.stop(); } catch(Exception e) { fail("Ending "+tn+" test with exception:\n" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); } private void ECLGetEligibleCountTest() { String tn = "ECLGetEligibleCountTest"; debugInfo(tn, "Starting test\n\n"); String user1entryUUID = "11111111-1112-1113-1114-111111111115"; try { // The replication changelog is empty ReplicationServerDomain rsdtest = replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false); long count = rsdtest.getEligibleCount( new ServerState(), new ChangeNumber(TimeThread.getTime(), 1, (short)1201)); assertEquals(count, 0); // Creates broker on o=test ReplicationBroker server01 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 100, replicationServerPort, 1000, true); // Publish 1 message ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), 1, (short)1201); DeleteMsg delMsg = new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1, user1entryUUID); server01.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); sleep(300); count = rsdtest.getEligibleCount( new ServerState(), new ChangeNumber(TimeThread.getTime(), 1, (short)1201)); assertEquals(count, 1); // Publish 1 message ChangeNumber cn2 = new ChangeNumber(TimeThread.getTime(), 2, (short)1201); delMsg = new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn2, user1entryUUID); server01.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); sleep(300); count = rsdtest.getEligibleCount( new ServerState(), new ChangeNumber(TimeThread.getTime(), 1, (short)1201)); assertEquals(count, 2); count = rsdtest.getEligibleCount( new ServerState(), cn1); assertEquals(count, 1); ServerState ss = new ServerState(); ss.update(cn1); count = rsdtest.getEligibleCount(ss, cn1); assertEquals(count, 0); count = rsdtest.getEligibleCount(ss, cn2); assertEquals(count, 1); ss.update(cn2); count = rsdtest.getEligibleCount(ss, new ChangeNumber(TimeThread.getTime(), 4, (short)1201)); assertEquals(count, 0); // Publish 1 message ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), 3, (short)1201); delMsg = new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn3, user1entryUUID); server01.publish(delMsg); debugInfo(tn, " publishes " + delMsg.getChangeNumber()); sleep(300); ss.update(cn2); count = rsdtest.getEligibleCount(ss, new ChangeNumber(TimeThread.getTime(), 4, (short)1201)); assertEquals(count, 1); server01.stop(); } catch(Exception e) { fail("Ending "+tn+" test with exception:\n" + stackTraceToSingleLineString(e)); } debugInfo(tn, "Ending test with success"); } }