opendj-sdk/opends/src/server/org/opends/server/replication/common/ExternalChangeLogSession.java
@@ -50,10 +50,4 @@ public abstract void close() throws DirectoryException; /** * Returns the last (newest) cookie value. * @return the last cookie value. */ public abstract MultiDomainServerState getLastCookie(); } opendj-sdk/opends/src/server/org/opends/server/replication/common/LastCookieVirtualProvider.java
@@ -27,6 +27,7 @@ package org.opends.server.replication.common; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -39,7 +40,8 @@ import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.SearchOperation; import org.opends.server.replication.server.ExternalChangeLogSessionImpl; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.AttributeValue; import org.opends.server.types.AttributeValues; import org.opends.server.types.ByteString; @@ -48,6 +50,7 @@ import org.opends.server.types.InitializationException; import org.opends.server.types.ResultCode; import org.opends.server.types.VirtualAttributeRule; import org.opends.server.util.ServerConstants; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; /** @@ -133,15 +136,21 @@ DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); if (eclwe!=null) { ExternalChangeLogSessionImpl eclsession = new ExternalChangeLogSessionImpl(eclwe.getReplicationServer()); // Set a list of excluded domains (also exclude 'cn=changelog' itself) ArrayList<String> excludedDomains = MultimasterReplication.getPrivateDomains(); if (!excludedDomains.contains( ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); String lastCookie = eclsession.getLastCookie().toString(); ReplicationServer rs = eclwe.getReplicationServer(); MultiDomainServerState lastCookie = rs.getLastECLCookie(excludedDomains); AttributeValue value = AttributeValues.create( ByteString.valueOf(lastCookie), ByteString.valueOf(lastCookie)); ByteString.valueOf(lastCookie.toString()), ByteString.valueOf(lastCookie.toString())); values=Collections.singleton(value); } return values; opendj-sdk/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
@@ -26,14 +26,11 @@ */ package org.opends.server.replication.server; import java.util.Iterator; import org.opends.server.replication.common.ExternalChangeLogSession; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.protocol.ECLUpdateMsg; import org.opends.server.replication.protocol.StartECLSessionMsg; import org.opends.server.types.DirectoryException; import org.opends.server.util.ServerConstants; /** * This interface defines a session used to search the external changelog @@ -97,29 +94,4 @@ { handler.getDomain().stopServer(handler); } /** * Returns the last (newest) cookie value. * @return the last cookie value. */ public MultiDomainServerState getLastCookie() { MultiDomainServerState result = new MultiDomainServerState(); // Initialize start state for all running domains with empty state Iterator<ReplicationServerDomain> rsdk = rs.getDomainIterator(); if (rsdk != null) { while (rsdk.hasNext()) { // process a domain ReplicationServerDomain rsd = rsdk.next(); if (rsd.getBaseDn().compareToIgnoreCase( ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0) continue; result.update(rsd.getBaseDn(), rsd.getEligibleState( rs.getEligibleCN())); } } return result; } } opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1699,4 +1699,32 @@ return new int[]{firstDraftCN, lastDraftCN}; } /** * Returns the last (newest) cookie value. * @param excludedServiceIDs The list of serviceIDs excluded from ECL. * @return the last cookie value. */ public MultiDomainServerState getLastECLCookie( ArrayList<String> excludedServiceIDs) { MultiDomainServerState result = new MultiDomainServerState(); // Initialize start state for all running domains with empty state Iterator<ReplicationServerDomain> rsdk = this.getDomainIterator(); if (rsdk != null) { while (rsdk.hasNext()) { // process a domain ReplicationServerDomain rsd = rsdk.next(); if ((excludedServiceIDs!=null) && (excludedServiceIDs.contains(rsd.getBaseDn()))) continue; result.update(rsd.getBaseDn(), rsd.getEligibleState( getEligibleCN())); } } return result; } } opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -578,6 +578,20 @@ debugInfo(tn, "Starting test"); try { ReplicationBroker server01 = openReplicationSession( DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 100, replicationServerPort, 1000, true); // create and publish 1 change on each suffix long time = TimeThread.getTime(); int ts = 1; ChangeNumber cn1 = new ChangeNumber(time, ts++, (short)1201); DeleteMsg delMsg1 = new DeleteMsg("o=" + tn + "1," + TEST_ROOT_DN_STRING, cn1, "ECLBasicMsg1uid"); server01.publish(delMsg1); debugInfo(tn, "publishes:" + delMsg1); // Initialize a second test backend o=test2, in addtion to o=test // Configure replication on this backend // Add the root entry in the backend @@ -628,9 +642,10 @@ for (SearchResultEntry resultEntry : entries) { // Expect debugInfo(tn, "Entry returned=" + resultEntry.toLDIFString()); debugInfo(tn, "Entry returned when test2 is public =" + resultEntry.toLDIFString()); } assertEquals(entries.size(),1, "Entries number returned by search"); assertEquals(entries.size(),2, "Entries number returned by search"); // // Set the backend private and do again a search on ECL that should @@ -651,27 +666,26 @@ controls, null); // Expect success but no entry returned // Expect success and only entry from o=test returned assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS, searchOp.getErrorMessage().toString() + searchOp.getAdditionalLogMessage()); entries = searchOp.getSearchEntries(); assertTrue(entries != null); assertTrue(entries.size()==0); assertTrue(entries.size()==1); if (entries != null) for (SearchResultEntry resultEntry : entries) { // Expect debugInfo(tn, "Entry returned when test2 is private =" + resultEntry.toLDIFString()); } // // Test lastExternalChangelogCookie attribute of the ECL // /* FIXME: uncomment when fix available ExternalChangeLogSessionImpl session = new ExternalChangeLogSessionImpl(replicationServer); // (does only refer to non private backend) MultiDomainServerState expectedLastCookie = new MultiDomainServerState("o=test:;"); MultiDomainServerState lastCookie = session.getLastCookie(); assertTrue(expectedLastCookie.equalsTo(lastCookie), " ExpectedLastCookie=" + expectedLastCookie + " lastCookie=" + lastCookie); new MultiDomainServerState("o=test:"+cn1+";"); assertLastCookieEquals(tn, expectedLastCookie); */ // Cleaning if (domain2 != null) @@ -679,6 +693,8 @@ if (replicationPlugin != null) DirectoryServer.deregisterSynchronizationProvider(replicationPlugin); removeTestBackend2(backend2); server01.stop(); } catch(Exception e) { @@ -990,14 +1006,8 @@ // // Test lastExternalChangelogCookie attribute of the ECL // ExternalChangeLogSessionImpl session = new ExternalChangeLogSessionImpl(replicationServer); MultiDomainServerState expectedLastCookie = new MultiDomainServerState("o=test:"+cn5+" "+cn9+";o=test2:"+cn3+" "+cn8+";"); MultiDomainServerState lastCookie = session.getLastCookie(); assertTrue(expectedLastCookie.equalsTo(lastCookie), " ExpectedLastCookie=" + expectedLastCookie + " lastCookie=" + lastCookie); assertLastCookieEquals(tn, expectedLastCookie); s1test.stop(); @@ -1745,21 +1755,21 @@ s1 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort()); org.opends.server.tools.LDAPReader r1 = new org.opends.server.tools.LDAPReader(s1); LDAPWriter w1 = new LDAPWriter(s1); s1.setSoTimeout(5000); s1.setSoTimeout(15000); bindAsManager(w1, r1); // Connects and bind s2 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort()); org.opends.server.tools.LDAPReader r2 = new org.opends.server.tools.LDAPReader(s2); LDAPWriter w2 = new LDAPWriter(s2); s2.setSoTimeout(5000); s2.setSoTimeout(15000); bindAsManager(w2, r2); // Connects and bind s3 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort()); org.opends.server.tools.LDAPReader r3 = new org.opends.server.tools.LDAPReader(s3); LDAPWriter w3 = new LDAPWriter(s3); s3.setSoTimeout(5000); s3.setSoTimeout(15000); bindAsManager(w3, r3); // Since we are going to be watching the post-response count, we need to