opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -44,10 +44,16 @@ { /** * Type of request made to the External Changelog. */ public enum ECLRequestType { /** * This specifies that the ECL is requested from a provided cookie value * defined as a MultiDomainServerState. */ public final static short REQUEST_TYPE_FROM_COOKIE = 0; REQUEST_TYPE_FROM_COOKIE, /** * This specifies that the ECL is requested from a provided interval @@ -55,32 +61,39 @@ * and NOT replication CSNs). * TODO: not yet implemented */ public final static short REQUEST_TYPE_FROM_CHANGE_NUMBER = 1; REQUEST_TYPE_FROM_CHANGE_NUMBER, /** * This specifies that the ECL is requested only for the entry that have * a CSN matching the provided one. * This specifies that the ECL is requested only for the entry that have a * CSN matching the provided one. * TODO: not yet implemented */ public final static short REQUEST_TYPE_EQUALS_REPL_CHANGE_NUMBER = 2; REQUEST_TYPE_EQUALS_REPL_CHANGE_NUMBER } /** * Whether the current External Changelog search is persistent and requires to * receive only new changes or already existing changes as well. */ public enum Persistent { /** * This specifies that the request on the ECL is a PERSISTENT search with * changesOnly = false. * <p> * It will return the content of the changelog DB as it is now, plus any * subsequent changes. */ public final static short PERSISTENT = 0; PERSISTENT, /** * This specifies that the request on the ECL is a NOT a PERSISTENT search. * <p> * It will only return the content of the changelog DB as it is now, and stop. * It will NOT be turned into a persistent search that can return subsequent * changes. * It will only return the content of the changelog DB as it is now, and * stop. It will NOT be turned into a persistent search that can return * subsequent changes. */ public final static short NON_PERSISTENT = 1; NON_PERSISTENT, /** * This specifies that the request on the ECL is a PERSISTENT search with @@ -89,10 +102,11 @@ * It will only return subsequent changes that do not exist yet in the * changelog DB. */ public final static short PERSISTENT_CHANGES_ONLY = 2; PERSISTENT_CHANGES_ONLY } /** The type of request as defined by REQUEST_TYPE_... */ private short eclRequestType; private ECLRequestType eclRequestType; /** * When eclRequestType = FROM_COOKIE, specifies the provided cookie value. @@ -114,15 +128,14 @@ /** * Specifies whether the search is persistent and changesOnly. * * @see #NON_PERSISTENT * @see #PERSISTENT * @see #PERSISTENT_CHANGES_ONLY */ private short isPersistent = NON_PERSISTENT; private Persistent isPersistent = Persistent.NON_PERSISTENT; /** * A string helping debugging and tracing the client operation related when * This is a string identifying the operation, provided by the client part of * the ECL, used to help interpretation of messages logged. * <p> * It helps debugging and tracing the client operation related when * processing, on the RS side, a request on the ECL. */ private String operationId = ""; @@ -160,7 +173,8 @@ // start mode int length = getNextLength(in, pos); eclRequestType = Short.valueOf(new String(in, pos, length, "UTF-8")); int requestType = Integer.parseInt(new String(in, pos, length, "UTF-8")); eclRequestType = ECLRequestType.values()[requestType]; pos += length +1; length = getNextLength(in, pos); @@ -177,7 +191,8 @@ // persistentSearch mode length = getNextLength(in, pos); isPersistent = Short.valueOf(new String(in, pos, length, "UTF-8")); int persistent = Integer.parseInt(new String(in, pos, length, "UTF-8")); isPersistent = Persistent.values()[persistent]; pos += length + 1; // generalized state @@ -213,12 +228,12 @@ */ public StartECLSessionMsg() { eclRequestType = REQUEST_TYPE_FROM_COOKIE; eclRequestType = ECLRequestType.REQUEST_TYPE_FROM_COOKIE; crossDomainServerState = ""; firstChangeNumber = -1; lastChangeNumber = -1; csn = new CSN(0, 0, 0); isPersistent = NON_PERSISTENT; isPersistent = Persistent.NON_PERSISTENT; operationId = "-1"; excludedBaseDNs = new HashSet<String>(); } @@ -234,13 +249,13 @@ try { byte[] byteMode = toBytes(eclRequestType); byte[] byteMode = toBytes(eclRequestType.ordinal()); // FIXME JNR Changing the lines below to use long would require a protocol // version change. Leave it like this for now until the need arises. byte[] byteChangeNumber = toBytes((int) firstChangeNumber); byte[] byteStopChangeNumber = toBytes((int) lastChangeNumber); byte[] byteCSN = csn.toString().getBytes("UTF-8"); byte[] bytePsearch = toBytes(isPersistent); byte[] bytePsearch = toBytes(isPersistent.ordinal()); byte[] byteGeneralizedState = toBytes(crossDomainServerState); byte[] byteOperationId = toBytes(operationId); byte[] byteExcludedDNs = toBytes(excludedBaseDNsString); @@ -291,7 +306,7 @@ @Override public String toString() { return getClass().getCanonicalName() + " [" + return getClass().getSimpleName() + " [" + " requestType="+ eclRequestType + " persistentSearch=" + isPersistent + " csn=" + csn + @@ -312,8 +327,9 @@ } /** * Getter on the changer number stop. * @return the change number stop. * Specifies the last changer number requested. * * @return the last change number requested. */ public long getLastChangeNumber() { @@ -359,7 +375,7 @@ * Getter on the type of request. * @return the type of request. */ public short getECLRequestType() public ECLRequestType getECLRequestType() { return eclRequestType; } @@ -368,7 +384,7 @@ * Setter on the type of request. * @param eclRequestType the provided type of request. */ public void setECLRequestType(short eclRequestType) public void setECLRequestType(ECLRequestType eclRequestType) { this.eclRequestType = eclRequestType; } @@ -377,7 +393,7 @@ * Getter on the persistent property of the search request on the ECL. * @return the persistent property. */ public short isPersistent() public Persistent getPersistent() { return this.isPersistent; } @@ -386,7 +402,7 @@ * Setter on the persistent property of the search request on the ECL. * @param isPersistent the provided persistent property. */ public void setPersistent(short isPersistent) public void setPersistent(Persistent isPersistent) { this.isPersistent = isPersistent; } @@ -428,7 +444,7 @@ } /** * Getter on the list of excluded baseDNs. * Getter on the list of excluded baseDNs (like cn=admin, ...). * * @return the list of excluded baseDNs. */ opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -47,7 +47,10 @@ import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.protocol.ProtocolVersion.*; import static org.opends.server.replication.protocol.StartECLSessionMsg.*; import static org.opends.server.replication.protocol.StartECLSessionMsg .ECLRequestType.*; import static org.opends.server.replication.protocol.StartECLSessionMsg .Persistent.*; import static org.opends.server.util.StaticUtils.*; /** @@ -78,7 +81,7 @@ * * @see #getSearchPhase() */ public static int INIT_PHASE = 1; private static int INIT_PHASE = 1; /** * The persistent phase is only used for persistent searches on the External * ChangeLog. It comes after the {@link #INIT_PHASE} and sends back to the @@ -86,30 +89,17 @@ */ private static int PERSISTENT_PHASE = 2; /** * This is a string identifying the operation, provided by the client part of * the ECL, used to help interpretation of messages logged. */ private String operationId; private StartECLSessionMsg startECLSessionMsg; /** Cursor on the {@link ChangeNumberIndexDB}. */ private DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor; private boolean draftCompat = false; /** * Specifies the last changer number requested. */ private long lastChangeNumber = 0; /** * Specifies whether the change number db has been read until its end. */ private boolean isEndOfCNIndexDBReached = false; /** * Specifies whether the current search has been requested to be persistent * or not. */ private short isPersistent; /** * Specifies the current search phase : INIT or PERSISTENT. */ private int searchPhase = INIT_PHASE; @@ -124,10 +114,6 @@ * (thus becoming the "current" cookie just before the change is returned. */ private MultiDomainServerState previousCookie = new MultiDomainServerState(); /** * Specifies the excluded DNs (like cn=admin, ...). */ private Set<String> excludedBaseDNs = new HashSet<String>(); /** * Eligible CSN - only changes older or equal to eligibleCSN are published in @@ -144,13 +130,13 @@ * Provides a string representation of this object. * @return the string representation. */ public String dumpState() private String dumpState() { return getClass().getCanonicalName() + "[" + "[draftCompat=" + draftCompat + "] [persistent=" + isPersistent + "] [startChangeNumber=" + lastChangeNumber + "] [persistent=" + startECLSessionMsg.getPersistent() + "] [startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() + "] [isEndOfCNIndexDBReached=" + isEndOfCNIndexDBReached + "] [searchPhase=" + searchPhase + "] [startCookie=" + startCookie + @@ -716,7 +702,7 @@ // Initializes each and every domain with the next(first) eligible message // from the domain. for (DomainContext domainCtxt : domainCtxts) { domainCtxt.computeNextEligibleMessageForDomain(operationId); domainCtxt.computeNextEligibleMessageForDomain(getOperationId()); if (domainCtxt.nextMsg == null) domainCtxt.active = false; @@ -766,6 +752,7 @@ continue; // skip the excluded domains Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs(); if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString())) { // this is an excluded domain @@ -786,7 +773,7 @@ newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate(); // Assign the start state for the domain if (isPersistent == PERSISTENT_CHANGES_ONLY) if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY) { newDomainCtxt.startState = latestState; startStatesFromProvidedCookie.remove(domain.getBaseDN()); @@ -999,9 +986,9 @@ if (this.serverId != 0) { return eclServer + serverId + " " + serverURL + " " + getBaseDNString() + " " + operationId; + getBaseDNString() + " " + getOperationId(); } return eclServer + getClass().getCanonicalName() + " " + operationId; return eclServer + getClass().getCanonicalName() + " " + getOperationId(); } /** @@ -1032,10 +1019,8 @@ private void initialize(StartECLSessionMsg startECLSessionMsg) throws DirectoryException { this.operationId = startECLSessionMsg.getOperationId(); this.startECLSessionMsg = startECLSessionMsg; isPersistent = startECLSessionMsg.isPersistent(); lastChangeNumber = startECLSessionMsg.getLastChangeNumber(); searchPhase = INIT_PHASE; final String cookie = startECLSessionMsg.getCrossDomainServerState(); try @@ -1049,9 +1034,6 @@ ERR_INVALID_COOKIE_SYNTAX.get(cookie)); } excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs(); refreshEligibleCSN(); initializeChangelogSearch(startECLSessionMsg); if (session != null) @@ -1081,7 +1063,7 @@ // TODO:ECL Potential race condition if writer not yet resumed here } if (isPersistent == PERSISTENT_CHANGES_ONLY) if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY) { closeInitPhase(); } @@ -1089,7 +1071,7 @@ registerIntoDomain(); if (debugEnabled()) TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId TRACER.debugInfo(getClass().getCanonicalName() + " " + getOperationId() + " initialized: " + " " + dumpState() + " " + " " + domaimCtxtsToString("")); } @@ -1097,12 +1079,13 @@ private void initializeChangelogSearch(StartECLSessionMsg msg) throws DirectoryException { short requestType = msg.getECLRequestType(); if (requestType == REQUEST_TYPE_FROM_COOKIE) refreshEligibleCSN(); if (msg.getECLRequestType() == REQUEST_TYPE_FROM_COOKIE) { initializeCLSearchFromCookie(msg.getCrossDomainServerState()); } else if (requestType == REQUEST_TYPE_FROM_CHANGE_NUMBER) else if (msg.getECLRequestType() == REQUEST_TYPE_FROM_CHANGE_NUMBER) { initializeCLSearchFromChangeNumber(msg.getFirstChangeNumber()); } @@ -1240,7 +1223,7 @@ } if (oldestContext.active) { oldestContext.computeNextEligibleMessageForDomain(operationId); oldestContext.computeNextEligibleMessageForDomain(getOperationId()); } oldestChange = change; } @@ -1253,7 +1236,7 @@ + "looking for the generalized oldest change")); for (DomainContext domainCtxt : domainCtxts) { domainCtxt.computeNextEligibleMessageForDomain(operationId); domainCtxt.computeNextEligibleMessageForDomain(getOperationId()); } final DomainContext oldestContext = findDomainCtxtWithOldestChange(); @@ -1292,6 +1275,7 @@ private boolean isBeyondLastRequestedChangeNumber(final ECLUpdateMsg change) { final long lastChangeNumber = startECLSessionMsg.getLastChangeNumber(); return draftCompat && 0 < lastChangeNumber && lastChangeNumber < change.getChangeNumber(); @@ -1431,7 +1415,7 @@ // go to persistent phase if one for (DomainContext domainCtxt : domainCtxts) domainCtxt.active = true; if (this.isPersistent != NON_PERSISTENT) if (startECLSessionMsg.getPersistent() != NON_PERSISTENT) { // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE searchPhase = PERSISTENT_PHASE; @@ -1488,23 +1472,27 @@ */ public String getOperationId() { return operationId; return startECLSessionMsg.getOperationId(); } /** * Getter for the persistent property of the current search. * @return Whether the current search is persistent or not. * Returns whether the current search is a persistent search . * * @return true if the current search is a persistent search, false otherwise */ public short isPersistent() { return this.isPersistent; boolean isNonPersistent() { return startECLSessionMsg.getPersistent() == NON_PERSISTENT; } /** * Getter for the current search phase (INIT or PERSISTENT). * @return Whether the current search is persistent or not. * Returns whether the initialization phase has completed. * * @return true the initialization phase has completed, false otherwise */ public int getSearchPhase() { return this.searchPhase; boolean isInitPhaseDone() { return this.searchPhase != INIT_PHASE; } /** @@ -1512,6 +1500,7 @@ */ private void refreshEligibleCSN() { Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs(); eligibleCSN = replicationServer.getEligibleCSN(excludedBaseDNs); } opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -35,7 +35,6 @@ import org.opends.server.replication.protocol.DoneMsg; import org.opends.server.replication.protocol.ECLUpdateMsg; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.protocol.StartECLSessionMsg; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; @@ -209,8 +208,7 @@ if (update == null) { if (session != null && handler.getSearchPhase() != ECLServerHandler.INIT_PHASE) if (session != null && handler.isInitPhaseDone()) { // session is null in pusherOnly mode // Done is used to end phase 1 @@ -218,7 +216,7 @@ handler.getReplicationServerId(), handler.getServerId())); } if (handler.isPersistent() == StartECLSessionMsg.NON_PERSISTENT) if (handler.isNonPersistent()) { // publishing is normally stopped here... break; } opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java
@@ -55,6 +55,10 @@ import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.protocol.StartECLSessionMsg .ECLRequestType.*; import static org.opends.server.replication.protocol.StartECLSessionMsg .Persistent.*; import static org.opends.server.util.LDIFWriter.*; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.*; @@ -205,8 +209,7 @@ // Set default behavior as "from change number". // "from cookie" is set only when cookie is provided. startECLSessionMsg.setECLRequestType( StartECLSessionMsg.REQUEST_TYPE_FROM_CHANGE_NUMBER); startECLSessionMsg.setECLRequestType(REQUEST_TYPE_FROM_CHANGE_NUMBER); // Set a string operationId that will help correlate any error message // logged for this operation with the 'real' client operation. @@ -397,8 +400,7 @@ returnECLControl = true; if (cookie != null) { startECLSessionMsg.setECLRequestType( StartECLSessionMsg.REQUEST_TYPE_FROM_COOKIE); startECLSessionMsg.setECLRequestType(REQUEST_TYPE_FROM_COOKIE); startECLSessionMsg.setCrossDomainServerState(cookie.toString()); } } @@ -523,11 +525,9 @@ // If we're only interested in changes, then we don't actually want // to process the search now. if (psearchControl.getChangesOnly()) startECLSessionMsg.setPersistent( StartECLSessionMsg.PERSISTENT_CHANGES_ONLY); startECLSessionMsg.setPersistent(PERSISTENT_CHANGES_ONLY); else startECLSessionMsg.setPersistent( StartECLSessionMsg.PERSISTENT); startECLSessionMsg.setPersistent(PERSISTENT); } else if (OID_LDAP_SUBENTRIES.equals(oid)) { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -36,6 +36,8 @@ import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType; import org.opends.server.replication.protocol.StartECLSessionMsg.Persistent; import org.opends.server.types.*; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.localbackend.LocalBackendAddOperation; @@ -850,7 +852,7 @@ new WindowProbeMsg(msg.getBytes(getCurrentVersion())); } @DataProvider(name="createTopologyData") @DataProvider public Object [][] createTopologyData() throws Exception { List<String> urls1 = newList( @@ -893,13 +895,13 @@ List<RSInfo> rsList2 = newList(rsInfo1, rsInfo2, rsInfo3, rsInfo4); return new Object [][] { {dsList1, rsList1, a1}, {dsList2, rsList2, a2}, {dsList3, rsList1, a3}, {dsList3, null, null}, {null, rsList1, a1}, {null, null, a2}, {dsList4, rsList2, a3} {dsList1, rsList1}, {dsList2, rsList2}, {dsList3, rsList1}, {dsList3, null}, {null, rsList1}, {null, null}, {dsList4, rsList2} }; } @@ -907,7 +909,7 @@ * Test TopologyMsg encoding and decoding. */ @Test(enabled=true,dataProvider = "createTopologyData") public void topologyMsgTest(List<DSInfo> dsList, List<RSInfo> rsList, Set<String> attrs) public void topologyMsgTest(List<DSInfo> dsList, List<RSInfo> rsList) throws Exception { TopologyMsg msg = new TopologyMsg(dsList, rsList); @@ -1287,10 +1289,10 @@ StartECLSessionMsg msg = new StartECLSessionMsg(); msg.setCSN(csn); msg.setCrossDomainServerState("fakegenstate"); msg.setPersistent(StartECLSessionMsg.PERSISTENT); msg.setPersistent(Persistent.PERSISTENT); msg.setFirstChangeNumber(13); msg.setLastChangeNumber(14); msg.setECLRequestType((short) 3); msg.setECLRequestType(ECLRequestType.REQUEST_TYPE_EQUALS_REPL_CHANGE_NUMBER); msg.setOperationId("fakeopid"); String dn1 = "cn=admin data"; String dn2 = "cn=config"; @@ -1300,7 +1302,7 @@ StartECLSessionMsg newMsg = new StartECLSessionMsg(msg.getBytes(getCurrentVersion())); // test equality between the two copies assertEquals(msg.getCSN(), newMsg.getCSN()); assertEquals(msg.isPersistent(), newMsg.isPersistent()); assertEquals(msg.getPersistent(), newMsg.getPersistent()); assertEquals(msg.getFirstChangeNumber(), newMsg.getFirstChangeNumber()); assertEquals(msg.getECLRequestType(), newMsg.getECLRequestType()); assertEquals(msg.getLastChangeNumber(), newMsg.getLastChangeNumber());