opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -114,7 +114,7 @@ */ public String dumpState() { return this.getClass().getCanonicalName() + return getClass().getCanonicalName() + "[" + "[draftCompat=" + draftCompat + "] [persistent=" + isPersistent + @@ -170,7 +170,8 @@ .append(rsd).append("] [nextMsg=").append(nextMsg).append("(") .append(nextMsg != null ? new Date(nextMsg.getChangeNumber().getTime()).toString():"") .append(")" + "] [nextNonEligibleMsg=").append(nextNonEligibleMsg) .append(")") .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg) .append("] [startState=").append(startState).append("] [stopState=") .append(stopState).append("] [currentState=").append(currentState) .append("]]"); @@ -248,9 +249,9 @@ + " getNextEligibleMessageForDomain(" + opid+ ") " + "newMsg isEligible=" + isEligible + " since " + "newMsg=[" + newMsg.getChangeNumber() + " " + new Date(newMsg.getChangeNumber().getTime()).toString() + " " + new Date(newMsg.getChangeNumber().getTime()) + "] eligibleCN=[" + eligibleCN + " " + new Date(eligibleCN.getTime()).toString()+"]" + " " + new Date(eligibleCN.getTime())+"]" + dumpState()); if (isEligible) @@ -430,7 +431,7 @@ { // no chance to have a bad domain set here } this.initialize(startECLSessionMsg); initialize(startECLSessionMsg); } /** @@ -668,7 +669,7 @@ TRACER.debugCaught(DebugLogLevel.ERROR, de); if (draftCNDbIter != null) draftCNDbIter.releaseCursor(); throw(de); throw de; } catch(Exception e) { @@ -800,8 +801,8 @@ rsd.getStartState().getMaxChangeNumber(aServerId); ChangeNumber providedChange = newDomainCtxt.startState.getMaxChangeNumber(aServerId); if ((providedChange != null) && (providedChange.older(dbOldestChange))) if (providedChange != null && providedChange.older(dbOldestChange)) { cookieTooOld=true; } @@ -848,7 +849,7 @@ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get( missingDomains, "<" + (providedCookie + missingDomains)+ ">")); "<" + providedCookie + missingDomains + ">")); } domainCtxts = tmpSet.toArray(new DomainContext[tmpSet.size()]); @@ -1201,7 +1202,7 @@ { // loop until not interrupted } } while (((interrupted) || (!acquired)) && (!shutdownWriter)); } while ((interrupted || !acquired) && !shutdownWriter); if (msg != null) { incrementOutCount(); @@ -1271,7 +1272,7 @@ int iDom; boolean continueLooping = true; while ((continueLooping) && (searchPhase == INIT_PHASE)) while (continueLooping && searchPhase == INIT_PHASE) { // Step 1 & 2 if (searchPhase == INIT_PHASE) @@ -1590,7 +1591,7 @@ int oldest = -1; for (int i=0; i<domainCtxts.length; i++) { if ((domainCtxts[i].active)) if (domainCtxts[i].active) { // on the first loop, oldest==-1 // .msg is null when the previous (non blocking) nextMessage did opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -666,7 +666,7 @@ attrs.put(ocType, ldapAttrList); TRACER.debugInfo("State=" + exportContainer.getDbServerState().toString()); exportContainer.getDbServerState()); Attribute stateAttr = Attributes.create("state", exportContainer .getDbServerState().toString()); ldapAttrList.clear(); @@ -727,8 +727,8 @@ SearchFilter filter = searchOperation.getFilter(); previousChangeNumber = extractChangeNumber(filter); if ((previousChangeNumber == null) && (filter.getFilterType().equals(FilterType.AND))) if (previousChangeNumber == null && filter.getFilterType().equals(FilterType.AND)) { for (SearchFilter filterComponents: filter.getFilterComponents()) { @@ -818,7 +818,7 @@ if ( (filterType.equals(FilterType.GREATER_OR_EQUAL) || filterType.equals(FilterType.EQUALITY) ) && (filter.getAttributeType().equals(changeNumberAttrType))) filter.getAttributeType().equals(changeNumberAttrType)) { try { @@ -867,7 +867,7 @@ AddOperation addOperation = (AddOperation)msg.createOperation(conn); dn = DN.decode("puid=" + addMsg.getParentEntryUUID() + "+" + CHANGE_NUMBER + "=" + msg.getChangeNumber().toString() + "+" + CHANGE_NUMBER + "=" + msg.getChangeNumber() + "+" + msg.getDn() + "," + BASE_DN); Map<AttributeType,List<Attribute>> attrs = @@ -918,7 +918,7 @@ DeleteMsg delMsg = (DeleteMsg)msg; dn = DN.decode("uuid=" + msg.getEntryUUID() + "," + CHANGE_NUMBER + "=" + delMsg.getChangeNumber().toString()+ "," + CHANGE_NUMBER + "=" + delMsg.getChangeNumber() + "," + msg.getDn() +","+ BASE_DN); DeleteChangeRecordEntry changeRecord = @@ -941,7 +941,7 @@ ModifyOperation op = (ModifyOperation)msg.createOperation(conn); dn = DN.decode("uuid=" + msg.getEntryUUID() + "," + CHANGE_NUMBER + "=" + msg.getChangeNumber().toString()+ "," + CHANGE_NUMBER + "=" + msg.getChangeNumber() + "," + msg.getDn() +","+ BASE_DN); op.setInternalOperation(true); @@ -965,7 +965,7 @@ ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn); dn = DN.decode("uuid=" + msg.getEntryUUID() + "," + CHANGE_NUMBER + "=" + msg.getChangeNumber().toString()+ "," + CHANGE_NUMBER + "=" + msg.getChangeNumber() + "," + msg.getDn() +","+ BASE_DN); op.setInternalOperation(true); @@ -1286,7 +1286,7 @@ { for (Control c : requestControls) { if (c.getOID().equals(OID_INTERNAL_GROUP_MEMBERSHIP_UPDATE)) if (OID_INTERNAL_GROUP_MEMBERSHIP_UPDATE.equals(c.getOID())) { return; } @@ -1298,8 +1298,8 @@ try { DN backendBaseDN = DN.decode(BASE_DN); if ( (searchOperation.getScope().equals(SearchScope.BASE_OBJECT)) && (backendBaseDN.equals(searchOperation.getBaseDN())) ) if ( searchOperation.getScope().equals(SearchScope.BASE_OBJECT) && backendBaseDN.equals(searchOperation.getBaseDN()) ) { return; } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -28,10 +28,9 @@ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; import java.io.IOException; import java.util.*; @@ -67,7 +66,6 @@ * are removed and should they be needed again must be read from the backing * file * * * it runs a thread that is responsible for saving the messages * received to the disk and for trimming them * Decision to trim can be based on disk space or age of the message @@ -382,13 +380,11 @@ TRACER.debugInfo("In " + "Replication Server " + replicationServer.getReplicationPort() + " " + baseDn + " " + replicationServer.getServerId() + " for dn " + baseDn + ", update " + update.getChangeNumber().toString() + " for dn " + baseDn + ", update " + update.getChangeNumber() + " will not be sent to replication server " + Integer.toString(handler.getServerId()) + " with generation id " + Long.toString(handler.getGenerationId()) + " different from local " + "generation id " + Long.toString(generationId)); handler.getServerId() + " with generation id " + handler.getGenerationId() + " different from local " + "generation id " + generationId); continue; } @@ -447,20 +443,16 @@ { if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update " + update.getChangeNumber().toString() + " for dn " + baseDn + ", update " + update.getChangeNumber() + " will not be sent to directory server " + Integer.toString(handler.getServerId()) + " with generation id " + Long.toString(handler.getGenerationId()) + " different from local " + "generation id " + Long.toString(generationId)); handler.getServerId() + " with generation id " + handler.getGenerationId() + " different from local " + "generation id " + generationId); if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) TRACER.debugInfo("In RS " + replicationServer.getServerId() + " for dn " + baseDn + ", update " + update.getChangeNumber().toString() + " will not be sent to directory server " + Integer.toString(handler.getServerId()) + " for dn " + baseDn + ", update " + update.getChangeNumber() + " will not be sent to directory server " + handler.getServerId() + " as it is in full update"); } @@ -697,26 +689,20 @@ } List<Integer> expectedServers = new ArrayList<Integer>(); if (interestedInAcks) { if (sourceHandler.isDataServer()) if (interestedInAcks && sourceHandler.isDataServer()) { // Look for RS eligible for assured for (ReplicationServerHandler handler : replicationServers.values()) { if (handler.getGroupId() == groupId) if (handler.getGroupId() == groupId // No ack expected from a RS with different group id { if ((generationId > 0) && (generationId == handler.getGenerationId())) && generationId > 0 && (generationId == handler.getGenerationId())) // No ack expected from a RS with bad gen id { expectedServers.add(handler.getServerId()); } } } } } // Return computed structures PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo(); @@ -835,6 +821,7 @@ * Run when the assured timeout for an assured update message we are waiting * acks for occurs. */ @Override public void run() { ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn); @@ -857,11 +844,9 @@ ServerHandler origServer = expectedAcksInfo.getRequesterServer(); if (debugEnabled()) TRACER.debugInfo( "In RS " + Integer.toString(replicationServer.getServerId()) + " for " + baseDn + "In RS " + replicationServer.getServerId() + " for " + baseDn + ", sending timeout for assured update with change " + " number " + cn.toString() + " to server id " + Integer.toString(origServer.getServerId())); cn + " to server id " + origServer.getServerId()); try { origServer.send(finalAck); @@ -882,7 +867,7 @@ } // Increment assured counters boolean safeRead = (expectedAcksInfo instanceof SafeReadExpectedAcksInfo); expectedAcksInfo instanceof SafeReadExpectedAcksInfo; if (safeRead) { origServer.incrementAssuredSrReceivedUpdatesTimeout(); @@ -1246,9 +1231,8 @@ " has servers connected to it - will not reset generationId"); } if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus) && (generationId != -1)) if (!lDAPServersConnectedInTheTopology && !this.generationIdSavedStatus && generationId != -1) { changeGenerationId(-1, false); } @@ -1265,7 +1249,7 @@ { ReplicationServerHandler oldHandler = replicationServers.get(handler.getServerId()); if ((oldHandler != null)) if (oldHandler != null) { if (oldHandler.getServerAddressURL().equals( handler.getServerAddressURL())) @@ -1593,7 +1577,7 @@ // We log the error. The requestor will detect a timeout or // any other failure on the connection. logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( Integer.toString((msg.getDestination())))); Integer.toString(msg.getDestination()))); } } } @@ -1868,14 +1852,13 @@ { for (DataServerHandler handler : directoryServers.values()) { if ((notThisOne == null) || ((handler != notThisOne))) if ((notThisOne == null) || (handler != notThisOne)) // All except passed one { for (int i=1; i<=2; i++) { if (!handler.shuttingDown()) { if (handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) if (!handler.shuttingDown() && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) { TopologyMsg topoMsg=createTopologyMsgForDS(handler.getServerId()); try @@ -1887,16 +1870,14 @@ { if (i==2) { Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( baseDn, "directory", Integer.toString(handler.getServerId()), e.getMessage()); Message message = ERR_EXCEPTION_SENDING_TOPO_INFO .get(baseDn, "directory", Integer.toString(handler .getServerId()), e.getMessage()); logError(message); } } } } try { Thread.sleep(100); } catch(Exception e) {} } } @@ -1914,9 +1895,8 @@ { for (int i=1; i<=2; i++) { if (!handler.shuttingDown()) { if (handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) if (!handler.shuttingDown() && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) { try { @@ -1927,16 +1907,13 @@ { if (i==2) { Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( baseDn, "replication", Integer.toString(handler.getServerId()), e.getMessage()); Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get(baseDn, "replication", Integer.toString(handler.getServerId()), e.getMessage()); logError(message); } } } } try { Thread.sleep(100); } catch(Exception e) {} } } @@ -2852,7 +2829,7 @@ */ public boolean hasLock() { return (lock.getHoldCount() > 0); return lock.getHoldCount() > 0; } /** @@ -2920,7 +2897,7 @@ */ public boolean isRunningStatusAnalyzer() { return (statusAnalyzer != null); return statusAnalyzer != null; } /** @@ -2971,7 +2948,7 @@ */ public boolean isRunningMonitoringPublisher() { return (monitoringPublisher != null); return monitoringPublisher != null; } /** @@ -3229,16 +3206,13 @@ } ChangeNumber changelogLastCN = db.getLastChange(); if (changelogLastCN != null) { if ((eligibleCN == null) || (changelogLastCN.newer(eligibleCN))) if (changelogLastCN != null && (eligibleCN == null || changelogLastCN.newer(eligibleCN))) { eligibleCN = changelogLastCN; } } if ((heartbeatLastDN != null) && ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN)))) if (heartbeatLastDN != null && (eligibleCN == null || heartbeatLastDN.newer(eligibleCN))) { eligibleCN = heartbeatLastDN; } opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -145,7 +145,7 @@ if (debugEnabled()) { TRACER.debugInfo(getName() + "could not send a heartbeat." + e.getMessage() + e.toString()); e.getMessage() + e); } // This will be caught in another thread. } opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -540,7 +540,7 @@ // Unsupported message type: should not happen throw new IllegalArgumentException("Unexpected PDU type: " + msg.getClass().getName() + " :\n" + msg.toString()); msg.getClass().getName() + " :\n" + msg); } /** @@ -1167,9 +1167,8 @@ if (debugEnabled()) { TRACER.debugInfo("RB for dn " + baseDn + " and with server id " + Integer.toString(serverId) + " computed " + Integer.toString(nChanges) + " changes late."); TRACER.debugInfo("RB for dn " + baseDn + " and with server id " + serverId + " computed " + nChanges + " changes late."); } /* @@ -1276,8 +1275,7 @@ if (debugEnabled()) { TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" + serverStartMsg.toString() + "\nAND RECEIVED:\n" + msg.toString()); + serverStartMsg + "\nAND RECEIVED:\n" + msg); } // Wrap received message in a server info object @@ -1286,7 +1284,7 @@ // Sanity check String repDn = replServerInfo.getBaseDn(); if (!(this.baseDn.equals(repDn))) if (!this.baseDn.equals(repDn)) { errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn, this.baseDn); @@ -1412,8 +1410,8 @@ */ if (debugEnabled()) { TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString()); TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg); } // Alright set the timeout to the desired value @@ -1484,9 +1482,8 @@ if (debugEnabled()) { TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" + startSessionMsg.toString() + "\nAND RECEIVED:\n" + topologyMsg.toString()); TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n" + startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg); } // Alright set the timeout to the desired value @@ -2368,7 +2365,7 @@ } } } if ((!credit) && (currentWindowSemaphore.availablePermits() == 0)) if (!credit && currentWindowSemaphore.availablePermits() == 0) { synchronized (connectPhaseLock) { @@ -2829,19 +2826,15 @@ // These parameters needs to be renegotiated with the ReplicationServer // so if they have changed, that requires restarting the session with // the ReplicationServer. Boolean needToRestartSession = false; // A new session is necessary only when information regarding // the connection is modified if (this.replicationServerUrls == null boolean needToRestartSession = this.replicationServerUrls == null || replicationServers.size() != this.replicationServerUrls.size() || !replicationServers.containsAll(this.replicationServerUrls) || window != this.maxRcvWindow || heartbeatInterval != this.heartbeatInterval || groupId != this.groupId) { needToRestartSession = true; } || groupId != this.groupId; this.replicationServerUrls = replicationServers; this.rcvWindow = window;