opends/src/messages/messages/replication.properties
@@ -541,3 +541,5 @@ replication server in the topology and distribute load more equally SEVERE_WARN_INVALID_SYNC_HIST_VALUE_214=The attribute value '%s' is not a valid \ synchronization history value SEVERE_ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD_215=Replication server RS(%d) \ failed to parse change record with changenumber %s from the database. Error: %s opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.common; @@ -121,12 +122,9 @@ if (obj instanceof ChangeNumber) { ChangeNumber cn = (ChangeNumber) obj; if ((this.seqnum == cn.seqnum) && (this.serverId == cn.serverId) && (this.timeStamp == cn.timeStamp) ) return true; else return false; return this.seqnum == cn.seqnum && this.serverId == cn.serverId && this.timeStamp == cn.timeStamp; } else return false; @@ -274,10 +272,7 @@ */ public Boolean older(ChangeNumber CN) { if (compare(this, CN) < 0) return true; return false; return compare(this, CN) < 0; } /** @@ -288,10 +283,7 @@ */ public Boolean olderOrEqual(ChangeNumber CN) { if (compare(this, CN) <= 0) return true; return false; return compare(this, CN) <= 0; } /** @@ -301,10 +293,7 @@ */ public boolean newerOrEquals(ChangeNumber CN) { if (compare(this, CN) >= 0) return true; return false; return compare(this, CN) >= 0; } /** @@ -314,10 +303,7 @@ */ public boolean newer(ChangeNumber CN) { if (compare(this, CN) > 0) return true; return false; return compare(this, CN) > 0; } /** opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
@@ -23,7 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.common; @@ -172,8 +172,7 @@ AttributeValues.create( ByteString.valueOf(first), ByteString.valueOf(first)); Set<AttributeValue> values=Collections.singleton(value); return values; return Collections.singleton(value); } opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
@@ -23,7 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.common; @@ -171,8 +171,7 @@ AttributeValues.create( ByteString.valueOf(last), ByteString.valueOf(last)); Set<AttributeValue> values =Collections.singleton(value); return values; return Collections.singleton(value); } opends/src/server/org/opends/server/replication/plugin/AttrHistoricalMultiple.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.plugin; @@ -105,13 +105,13 @@ * Duplicate an object. * ChangeNumber are duplicated by references * @return the duplicated object. * * Method only called in tests */ AttrHistoricalMultiple duplicate() { AttrHistoricalMultiple dup = new AttrHistoricalMultiple(this.deleteTime, this.lastUpdateTime, this.valuesHist); return dup; return new AttrHistoricalMultiple(this.deleteTime, this.lastUpdateTime, this.valuesHist); } /** opends/src/server/org/opends/server/replication/plugin/AttrHistoricalSingle.java
@@ -158,9 +158,7 @@ case DELETE: if (changeNumber.newer(addTime)) { if ((newValue == null) || ((newValue != null) && (newValue.equals(value))) || (value == null)) if (newValue == null || newValue.equals(value) || value == null) { if (changeNumber.newer(deleteTime)) { opends/src/server/org/opends/server/replication/plugin/AttrValueHistorical.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.plugin; @@ -118,9 +119,6 @@ */ public boolean isUpdate() { if (valueUpdateTime != null) return true; else return false; return valueUpdateTime != null; } } opends/src/server/org/opends/server/replication/plugin/HistoricalAttributeValue.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.plugin; @@ -196,15 +197,6 @@ } /** * Get the String form of the attribute. * @return The String form of the attribute. */ public String getStringValue() { return stringValue; } /** * Get the Attribute Value. * @return The Attribute Value. */ @@ -261,7 +253,7 @@ */ public boolean isADDOperation() { return ((attrType == null) && (ismodDN == false)); return attrType == null && !ismodDN; } /** @@ -273,6 +265,6 @@ */ public boolean isMODDNOperation() { return ((attrType == null) && (ismodDN == true)); return attrType == null && ismodDN; } } opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2012 ForgeRock AS. * Portions copyright 2012-2013 ForgeRock AS. */ package org.opends.server.replication.plugin; @@ -144,7 +144,7 @@ String csn = value.subSequence(csnIndex, csnIndex + 28).toString(); ByteStringBuilder builder = new ByteStringBuilder(14); builder.append(hexStringToByteArray(csn.substring(16, 20))); builder.append(hexStringToByteArray(csn.substring(00, 16))); builder.append(hexStringToByteArray(csn.substring(0, 16))); builder.append(hexStringToByteArray(csn.substring(20, 28))); return builder.toByteString(); } opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -115,7 +115,6 @@ import org.opends.server.types.operation.PreOperationDeleteOperation; import org.opends.server.types.operation.PreOperationModifyDNOperation; import org.opends.server.types.operation.PreOperationModifyOperation; import org.opends.server.types.operation.PreOperationOperation; import org.opends.server.util.LDIFReader; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; @@ -277,7 +276,7 @@ */ private volatile boolean done = true; private ServerStateFlush flushThread; private final ServerStateFlush flushThread; /** * The attribute name used to store the generation id in the backend. @@ -400,7 +399,7 @@ { done = false; while (shutdown == false) while (!shutdown) { try { @@ -584,21 +583,11 @@ /* * Create a new Persistent Server State that will be used to store * the last ChangeNmber seen from all LDAP servers in the topology. * the last ChangeNumber seen from all LDAP servers in the topology. */ state = new PersistentServerState(baseDn, serverId, getServerState()); /* Check if a ReplicaUpdateVector entry is present * if so, and no state is already initialized * translate the ruv into a serverState and * a generationId */ Long compatGenId = state.checkRUVCompat(); if (compatGenId != null) { generationId = compatGenId; saveGenerationId(generationId); } flushThread = new ServerStateFlush(); /* * ChangeNumberGenerator is used to create new unique ChangeNumbers @@ -620,7 +609,7 @@ // listen for changes on the configuration configuration.addChangeListener(this); // register as an AltertGenerator // register as an AlertGenerator DirectoryServer.registerAlertGenerator(this); } @@ -638,8 +627,8 @@ boolean needReconnection = false; byte newSdLevel = (byte) configuration.getAssuredSdLevel(); if ((isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)) && (newSdLevel != getAssuredSdLevel())) if (isAssured() && getAssuredMode() == AssuredMode.SAFE_DATA_MODE && newSdLevel != getAssuredSdLevel()) { needReconnection = true; } @@ -654,15 +643,13 @@ } break; case SAFE_DATA: if (!isAssured() || (isAssured() && (getAssuredMode() == AssuredMode.SAFE_READ_MODE))) if (!isAssured() || getAssuredMode() == AssuredMode.SAFE_READ_MODE) { needReconnection = true; } break; case SAFE_READ: if (!isAssured() || (isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE))) if (!isAssured() || getAssuredMode() == AssuredMode.SAFE_DATA_MODE) { needReconnection = true; } @@ -882,17 +869,12 @@ if (!found) { // The backend is probably empty: if there is some fractional // configuration in memory, we do not let the domain being connected, // otherwise, it's ok if (fractionalConfig.isFractional()) { return false; } else { return true; } /* The backend is probably empty: if there is some fractional configuration in memory, we do not let the domain being connected, otherwise, it's ok */ return !fractionalConfig.isFractional(); } /* @@ -1275,7 +1257,8 @@ * @param performFiltering Tells if the effective modifications should * be performed or if the call is just to analyze if there are some * inconsistency with fractional configuration * @return true if the operation is inconsistent with fractional configuration * @return true if the operation is inconsistent with fractional * configuration */ public boolean fractionalFilterOperation( PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering) @@ -1287,8 +1270,8 @@ { if (modifyDNOperation.deleteOldRDN()) { // The core will remove any occurence of attribute that was part of the // old RDN, nothing more to do. // The core will remove any occurrence of attribute that was part // of the old RDN, nothing more to do. return true; // Will not be used as analyze was not requested } } @@ -1307,9 +1290,9 @@ // No attributes to filter return false; /** /* * Analyze the old and new rdn to see if they are some attributes to be * removed: if the oldnRDN contains some forbidden attributes (for instance * removed: if the oldRDN contains some forbidden attributes (for instance * it is possible if the entry was created with an add operation and the * RDN used contains a forbidden attribute: in this case the attribute value * has been kept to be consistent with the dn of the entry.) that are no @@ -1342,10 +1325,12 @@ !newRdn.hasAttributeType(attributeType) && !modifyDNOperation.deleteOldRDN()) { // A forbidden attribute is in the old RDN and no more in the new RDN, // and it has not been requested to remove attributes from old RDN: // remove ourself the attribute from the entry to stay consistent with // fractional configuration /* * A forbidden attribute is in the old RDN and no more in the new RDN, * and it has not been requested to remove attributes from old RDN: * let's remove the attribute from the entry to stay consistent with * fractional configuration */ Modification modification = new Modification(ModificationType.DELETE, Attributes.empty(attributeType)); modifyDNOperation.addModification(modification); @@ -1451,11 +1436,13 @@ // entry as it is forbidden if (entryRdn.hasAttributeType(attributeType)) { // We must remove all values of the attributes map for this // attribute type but the one that has the value which is in the RDN // of the entry. In fact the (underlying )attribute list does not // suppot remove so we have to create a new list, keeping only the // attribute value which is the same as in the RDN /* We must remove all values of the attributes map for this attribute type but the one that has the value which is in the RDN of the entry. In fact the (underlying )attribute list does not support remove so we have to create a new list, keeping only the attribute value which is the same as in the RDN */ AttributeValue rdnAttributeValue = entryRdn.getAttributeValue(attributeType); List<Attribute> attrList = attributesMap.get(attributeType); @@ -1467,17 +1454,11 @@ Attribute attr = attrIt.next(); if (attr.contains(rdnAttributeValue)) { Iterator<AttributeValue> attrValues = attr.iterator(); while(attrValues.hasNext()) { AttributeValue attrValue = attrValues.next(); if (rdnAttributeValue.equals(attrValue)) { for (AttributeValue attrValue : attr) { if (rdnAttributeValue.equals(attrValue)) { // Keep the value we want sameAttrValue = attrValue; } else { } else { hasSomeAttributesToFilter = true; } } @@ -1492,17 +1473,19 @@ // Paranoia check: should never be the case as we should always // find the attribute/value pair matching the pair in the RDN { // Construct and store new atribute list // Construct and store new attribute list List<Attribute> newRdnAttrList = new ArrayList<Attribute>(); AttributeBuilder attrBuilder = new AttributeBuilder(attributeType); attrBuilder.add(sameAttrValue); newRdnAttrList.add(attrBuilder.toAttribute()); newRdnAttrLists.add(newRdnAttrList); // Store matching attribute type // The mapping will be done using object from rdnAttrTypes as key // and object from newRdnAttrLists (at same index) as value in // the user attribute map to be modified /* Store matching attribute type The mapping will be done using object from rdnAttrTypes as key and object from newRdnAttrLists (at same index) as value in the user attribute map to be modified */ rdnAttrTypes.add(attributeType); } } @@ -1516,7 +1499,7 @@ else { // The call was just to check : at least one attribute to filter // found, return immediatly the answer; // found, return immediately the answer; return true; } } @@ -1684,7 +1667,7 @@ continue; } // Is the current attribute part of the established list ? boolean foundAttribute = boolean foundAttribute = attributeName != null && fractionalConcernedAttributes.contains(attributeName.toLowerCase()); if (!foundAttribute) { @@ -1713,7 +1696,7 @@ else { // The call was just to check : at least one attribute to filter // found, return immediatly the answer; // found, return immediately the answer; return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES; } } @@ -1806,7 +1789,7 @@ PreOperationDeleteOperation deleteOperation) { if ((!deleteOperation.isSynchronizationOperation()) && (!brokerIsConnected(deleteOperation))) && (!brokerIsConnected())) { Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); return new SynchronizationProviderResult.StopProcessing( @@ -1834,7 +1817,7 @@ * Probably the original entry was renamed and replaced with * another entry. * We must not let the change proceed, return a negative * result and set the result code to NO_SUCH_OBJET. * result and set the result code to NO_SUCH_OBJECT. * When the operation will return, the thread that started the * operation will try to find the correct entry and restart a new * operation. @@ -1882,7 +1865,7 @@ PreOperationAddOperation addOperation) { if ((!addOperation.isSynchronizationOperation()) && (!brokerIsConnected(addOperation))) && (!brokerIsConnected())) { Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); return new SynchronizationProviderResult.StopProcessing( @@ -1980,16 +1963,14 @@ * Check that the broker associated to this ReplicationDomain has found * a Replication Server and that this LDAP server is therefore able to * process operations. * If not set the ResultCode and the response message, * If not, set the ResultCode, the response message, * interrupt the operation, and return false * * @param op The Operation that needs to be checked. * * @return true when it OK to process the Operation, false otherwise. * When false is returned the resultCode and the reponse message * When false is returned the resultCode and the response message * is also set in the Operation. */ private boolean brokerIsConnected(PreOperationOperation op) private boolean brokerIsConnected() { if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES)) { @@ -2020,7 +2001,7 @@ PreOperationModifyDNOperation modifyDNOperation) { if ((!modifyDNOperation.isSynchronizationOperation()) && (!brokerIsConnected(modifyDNOperation))) && (!brokerIsConnected())) { Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); return new SynchronizationProviderResult.StopProcessing( @@ -2076,7 +2057,7 @@ * Probably the original entry was renamed and replaced with * another entry. * We must not let the change proceed, return a negative * result and set the result code to NO_SUCH_OBJET. * result and set the result code to NO_SUCH_OBJECT. * When the operation will return, the thread that started the * operation will try to find the correct entry and restart a new * operation. @@ -2140,7 +2121,7 @@ PreOperationModifyOperation modifyOperation) { if ((!modifyOperation.isSynchronizationOperation()) && (!brokerIsConnected(modifyOperation))) && (!brokerIsConnected())) { Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString()); return new SynchronizationProviderResult.StopProcessing( @@ -2198,8 +2179,8 @@ Entry modifiedEntry = modifyOperation.getModifiedEntry(); if (ctx == null) { // No replication ctxt attached => not a replicated operation // - create a ctxt with : changeNumber, entryUUID // No replication ctx attached => not a replicated operation // - create a ctx with : changeNumber, entryUUID // - attach the context to the op ChangeNumber changeNumber = generateChangeNumber(modifyOperation); @@ -2210,7 +2191,7 @@ } else { // Replication ctxt attached => this is a replicated operation being // Replication ctx attached => this is a replicated operation being // replayed here, it is necessary to // - check if the entry has been renamed // - check for conflicts @@ -2225,7 +2206,7 @@ * Probably the original entry was renamed and replaced with * another entry. * We must not let the modification proceed, return a negative * result and set the result code to NO_SUCH_OBJET. * result and set the result code to NO_SUCH_OBJECT. * When the operation will return, the thread that started the * operation will try to find the correct entry and restart a new * operation. @@ -2347,7 +2328,7 @@ catch (NoSuchElementException e) { Message message = ERR_OPERATION_NOT_FOUND_IN_PENDING.get( curChangeNumber.toString(), op.toString()); op.toString(), curChangeNumber.toString()); logError(message); return; } @@ -2361,7 +2342,7 @@ generationIdSavedStatus = false; } if (generationIdSavedStatus != true) if (!generationIdSavedStatus) { this.saveGenerationId(generationId); } @@ -2448,27 +2429,27 @@ attrs, null); LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); Entry entrytoRename = null; ChangeNumber entrytoRenameDate = null; Entry entryToRename = null; ChangeNumber entryToRenameCN = null; for (SearchResultEntry entry : entries) { EntryHistorical history = EntryHistorical.newInstanceFromEntry(entry); if (entrytoRename == null) if (entryToRename == null) { entrytoRename = entry; entrytoRenameDate = history.getDNDate(); entryToRename = entry; entryToRenameCN = history.getDNDate(); } else if (!history.addedOrRenamedAfter(entrytoRenameDate)) else if (!history.addedOrRenamedAfter(entryToRenameCN)) { // this conflict is older than the previous, keep it. entrytoRename = entry; entrytoRenameDate = history.getDNDate(); entryToRename = entry; entryToRenameCN = history.getDNDate(); } } if (entrytoRename != null) if (entryToRename != null) { DN entryDN = entrytoRename.getDN(); DN entryDN = entryToRename.getDN(); ModifyDNOperationBasis newOp = renameEntry( entryDN, freedDN.getRDN(), freedDN.getParent(), false); @@ -2601,7 +2582,7 @@ } } catch (InterruptedException e) { // stop waiting when interrupted. Thread.currentThread().interrupt(); } } @@ -2924,12 +2905,12 @@ * search if the entry has been renamed, and return the new dn * of the entry. */ DN newdn = findEntryDN(entryUUID); if (newdn != null) DN newDN = findEntryDN(entryUUID); if (newDN != null) { // There is an entry with the same unique id as this modify operation // replay the modify using the current dn of this entry. msg.setDn(newdn.toString()); msg.setDn(newDN.toString()); numResolvedNamingConflicts.incrementAndGet(); return false; } @@ -2974,15 +2955,7 @@ // current RDN value(s); mod.setModificationType(ModificationType.REPLACE); Attribute newAttribute = mod.getAttribute(); AttributeBuilder attrBuilder; if (newAttribute == null) { attrBuilder = new AttributeBuilder(modAttrType); } else { attrBuilder = new AttributeBuilder(newAttribute); } AttributeBuilder attrBuilder = new AttributeBuilder(newAttribute); attrBuilder.add(currentRDN.getAttributeValue(modAttrType)); mod.setAttribute(attrBuilder.toAttribute()); } @@ -3224,8 +3197,9 @@ /* * This entry is the base dn of the backend. * It is quite surprising that the operation result be NO_SUCH_OBJECT. * There is nothing more we can do except TODO log a * There is nothing more we can do except log a * message for the repair tool to look at this problem. * TODO : Log the message */ return true; } @@ -3265,7 +3239,7 @@ * - two adds are done on different servers but with the * same target DN. * - the same ADD is being replayed for the second time on this server. * if the nsunique ID already exist, assume this is a replay and * if the entryUUID already exist, assume this is a replay and * don't do anything * if the entry unique id do not exist, generate conflict. */ @@ -3315,11 +3289,10 @@ attrs.add(ENTRYUUID_ATTRIBUTE_NAME); attrs.add(EntryHistorical.HISTORICAL_ATTRIBUTE_NAME); SearchFilter ALLMATCH; ALLMATCH = SearchFilter.createFilterFromString("(objectClass=*)"); InternalSearchOperation op = conn.processSearch(entryDN, SearchScope.SINGLE_LEVEL, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, ALLMATCH, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, SearchFilter.createFilterFromString("(objectClass=*)"), attrs); if (op.getResultCode() == ResultCode.SUCCESS) @@ -3501,7 +3474,6 @@ * @param dn The original DN of the entry. * * @return The generated RDN for a conflicting entry. * @throws DirectoryException */ private RDN generateDeleteConflictDn(String entryUUID, DN dn) { @@ -3584,19 +3556,10 @@ state.clearInMemory(); state.loadState(); // Check to see if a Ruv needs to be translated Long compatGenId = state.checkRUVCompat(); generator.adjust(state.getMaxChangeNumber(serverId)); // Retrieves the generation ID associated with the data imported if (compatGenId != null) { generationId = compatGenId; saveGenerationId(generationId); } else generationId = loadGenerationId(); generationId = loadGenerationId(); } /** @@ -3780,14 +3743,8 @@ } if (search.getResultCode() != ResultCode.SUCCESS) { if (search.getResultCode() == ResultCode.NO_SUCH_OBJECT) { // nothing initialized yet // don't log an error generationID will be computed. } else { // if (search.getResultCode() != ResultCode.NO_SUCH_OBJECT) { // This is an error. Message message = ERR_SEARCHING_GENERATION_ID.get( search.getResultCode().getResultCodeName() + " " + search.getErrorMessage(), @@ -4042,16 +3999,11 @@ } catch (DirectoryException de) { if ((ros != null) && (ros.getNumExportedEntries() >= entryCount)) { // This is the normal end when computing the generationId // We can interrupt the export only by an IOException } else if (ros == null || ros.getNumExportedEntries() < entryCount) { Message message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject()); ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject()); logError(message); throw new DirectoryException( ResultCode.OTHER, message, null); @@ -4077,10 +4029,10 @@ } // Release the shared lock on the backend. String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); try { String lockFile = LockFileManager.getBackendLockFileName(backend); StringBuilder failureReason = new StringBuilder(); if (! LockFileManager.releaseLock(lockFile, failureReason)) { Message message = WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get( @@ -4291,9 +4243,9 @@ } // From the domainDN retrieves the replication domain LDAPReplicationDomain sdomain = LDAPReplicationDomain domain = MultimasterReplication.findDomain(baseDn, null); if (sdomain == null) if (domain == null) { break; } @@ -4304,7 +4256,7 @@ throw new DirectoryException(ResultCode.OTHER, message); } replicationDomain = sdomain; replicationDomain = domain; } if (replicationDomain == null) @@ -4430,14 +4382,8 @@ * This has no negative impact because the changes on schema should * not produce conflicts. */ if (baseDn.compareTo(DirectoryServer.getSchemaDN()) == 0) { solveConflictFlag = false; } else { solveConflictFlag = configuration.isSolveConflicts(); } solveConflictFlag = baseDn.compareTo(DirectoryServer.getSchemaDN()) != 0 && configuration.isSolveConflicts(); try { @@ -4517,7 +4463,6 @@ public void start() { // Create the ServerStateFlush thread flushThread = new ServerStateFlush(); flushThread.start(); startListenService(); @@ -4553,7 +4498,7 @@ /** * Store the provided ECL configuration for the domain. * @param domCfg The provided configuration. * @throws ConfigException When an error occured. * @throws ConfigException When an error occurred. */ public void storeECLConfiguration(ReplicationDomainCfg domCfg) throws ConfigException @@ -4568,7 +4513,7 @@ { try { eclDomCfg = domCfg.getExternalChangelogDomain(); } catch(Exception e) {} } catch(Exception e) { /* do nothing */ } // domain with no config entry only when running unit tests if (eclDomCfg == null) { @@ -4697,7 +4642,7 @@ // normally the RS should have been updated by other RSes except for // very last changes lost if the local connection was broken // ... hence the RS we are connected to should not be empty // ... or if it is empty, it is due to a volontary reset // ... or if it is empty, it is due to a voluntary reset // and we don't want to update it with our changes that could be huge. if ((replServerMaxChangeNumber != null) && (replServerMaxChangeNumber.getSeqnum()!=0)) @@ -5041,10 +4986,10 @@ } /** * Called by synchronize post op plugin in order to add the entry historized * Called by synchronize post op plugin in order to add the entry historical * attributes to the UpdateMsg. * @param msg * @param op * @param msg an replication update message * @param op the operation in progress * @throws DirectoryException */ private void addEntryAttributesForCL(UpdateMsg msg, @@ -5103,14 +5048,14 @@ { // Potential fast-path for delete operations. LinkedList<Attribute> attributes = new LinkedList<Attribute>(); for (List<Attribute> alist : entry.getUserAttributes().values()) for (List<Attribute> attributeList : entry.getUserAttributes().values()) { attributes.addAll(alist); attributes.addAll(attributeList); } Attribute ocattr = entry.getObjectClassAttribute(); if (ocattr != null) Attribute objectClassAttribute = entry.getObjectClassAttribute(); if (objectClassAttribute != null) { attributes.add(ocattr); attributes.add(objectClassAttribute); } return attributes; } @@ -5422,11 +5367,11 @@ Map<String, List<String>> fractionalSpecificClassesAttributes, List<String> fractionalAllClassesAttributes) throws ConfigException { int fractional_mode = NOT_FRACTIONAL; int fractionalMode; // Determine if fractional-exclude or fractional-include property is used // : only one of them is allowed Iterator<String> fracConfIt = null; Iterator<String> iterator; // Deduce the wished fractional mode if ((exclIt != null) && exclIt.hasNext()) @@ -5438,16 +5383,16 @@ } else { fractional_mode = EXCLUSIVE_FRACTIONAL; fracConfIt = exclIt; fractionalMode = EXCLUSIVE_FRACTIONAL; iterator = exclIt; } } else { if ((inclIt != null) && inclIt.hasNext()) { fractional_mode = INCLUSIVE_FRACTIONAL; fracConfIt = inclIt; fractionalMode = INCLUSIVE_FRACTIONAL; iterator = inclIt; } else { @@ -5455,11 +5400,11 @@ } } while (fracConfIt.hasNext()) while (iterator.hasNext()) { // Parse a value with the form class:attr1,attr2... // or *:attr1,attr2... String fractCfgStr = fracConfIt.next(); String fractCfgStr = iterator.next(); StringTokenizer st = new StringTokenizer(fractCfgStr, ":"); int nTokens = st.countTokens(); if (nTokens < 2) @@ -5505,7 +5450,7 @@ } } } return fractional_mode; return fractionalMode; } // Return type of the parseFractionalConfig method @@ -5546,7 +5491,7 @@ FractionalConfig fractionalConfig1, FractionalConfig fractionalConfig2) throws ConfigException { // Comapre base DNs just to be consistent // Compare base DNs just to be consistent if (!fractionalConfig1.getBaseDn().equals(fractionalConfig2.getBaseDn())) return false; @@ -5574,9 +5519,9 @@ if (specificClassesAttributes1.size() != specificClassesAttributes2.size()) return false; // Check consistency of specific classes attributes /* * Check consistency of specific classes attributes * * For each class in specificClassesAttributes1, check that the attribute * list is equivalent to specificClassesAttributes2 attribute list */ @@ -5702,7 +5647,7 @@ for (SearchResultEntry entry : entries) { long maxTimeToRun = endDate - TimeThread.getTime(); if (maxTimeToRun<0) if (maxTimeToRun < 0) { Message errMsg = Message.raw(Category.SYNC, Severity.NOTICE, " end date reached"); opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.plugin; @@ -85,7 +85,7 @@ * * It also extends the SynchronizationProvider class in order to have some * replication code running during the operation process * as pre-op, conflictRsolution, and post-op. * as pre-op, conflictResolution, and post-op. */ public class MultimasterReplication extends SynchronizationProvider<ReplicationSynchronizationProviderCfg> @@ -140,7 +140,7 @@ * Don't run the special replication code on Operation that are * specifically marked as don't synchronize. */ if ((pluginOp != null) && (pluginOp instanceof Operation)) if (pluginOp != null && pluginOp instanceof Operation) { Operation op = ((Operation) pluginOp); @@ -154,7 +154,7 @@ * so that the core server let the operation modify the entryuuid * and ds-sync-hist attributes. * They are also tagged as dontSynchronize so that the replication * code running later do not generate ChnageNumber, solve conflicts * code running later do not generate ChangeNumber, solve conflicts * and forward the operation to the replication server. */ for (Control c : op.getRequestControls()) @@ -163,10 +163,12 @@ { op.setSynchronizationOperation(true); op.setDontSynchronize(true); // remove this control from the list of controls since // it has now been processed and the local backend will // fail if it finds a control that it does not know about and // that is marked as critical. /* remove this control from the list of controls since it has now been processed and the local backend will fail if it finds a control that it does not know about and that is marked as critical. */ List<Control> controls = op.getRequestControls(); controls.remove(c); return null; @@ -175,7 +177,7 @@ } LDAPReplicationDomain domain = null; LDAPReplicationDomain domain; DN temp = dn; do { @@ -333,7 +335,7 @@ } /** * Stope the threads that are waiting for incoming update messages. * Stop the threads that are waiting for incoming update messages. */ private synchronized static void stopReplayThreads() { @@ -345,7 +347,14 @@ for (ReplayThread replayThread : replayThreads) { replayThread.waitForShutdown(); try { replayThread.join(); } catch(InterruptedException e) { Thread.currentThread().interrupt(); } } replayThreads.clear(); } @@ -377,7 +386,7 @@ } catch (ConfigException e) { // we should never get to this point because the configEntry has // already been validated in configAddisAcceptable // already been validated in isConfigurationAddAcceptable() return new ConfigChangeResult(ResultCode.CONSTRAINT_VIOLATION, false); } } @@ -544,7 +553,7 @@ DN operationDN = modifyDNOperation.getEntryDN(); LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation); if ((domain == null) || (!domain.solveConflict())) if (domain == null || !domain.solveConflict()) return new SynchronizationProviderResult.ContinueProcessing(); // The historical object is retrieved from the attachment created @@ -563,7 +572,7 @@ historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); // Add to the operation the historical attribute : "dn:changeNumger:moddn" // Add to the operation the historical attribute : "dn:changeNumber:moddn" historicalInformation.setHistoricalAttrToOperation(modifyDNOperation); return new SynchronizationProviderResult.ContinueProcessing(); @@ -586,7 +595,7 @@ if (!addOperation.isSynchronizationOperation()) domain.doPreOperation(addOperation); // Add to the operation the historical attribute : "dn:changeNumger:add" // Add to the operation the historical attribute : "dn:changeNumber:add" EntryHistorical.setHistoricalAttrToOperation(addOperation); return new SynchronizationProviderResult.ContinueProcessing(); @@ -777,12 +786,9 @@ private void genericPostOperation(PostOperationOperation operation, DN dn) { LDAPReplicationDomain domain = findDomain(dn, operation); if (domain == null) return; domain.synchronize(operation); return; if (domain != null) { domain.synchronize(operation); } } /** opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2012 ForgeRock AS. * Portions copyright 2012-2013 ForgeRock AS. */ package org.opends.server.replication.plugin; import org.opends.messages.Message; @@ -35,15 +35,12 @@ import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Iterator; import org.opends.server.core.DeleteOperationBasis; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyOperationBasis; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPAttribute; import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.protocols.ldap.LDAPModification; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; @@ -55,7 +52,6 @@ import org.opends.server.types.DN; import org.opends.server.types.DereferencePolicy; import org.opends.server.types.DirectoryException; import org.opends.server.types.LDAPException; import org.opends.server.types.ModificationType; import org.opends.server.types.RawModification; import org.opends.server.types.ResultCode; @@ -64,7 +60,7 @@ import org.opends.server.types.SearchScope; /** * This class implements a ServerState that is stored on the backends * This class implements a ServerState that is stored in the backend * used to store the synchronized data and that is therefore persistent * across server reboot. */ @@ -73,7 +69,6 @@ private final DN baseDn; private final InternalClientConnection conn = InternalClientConnection.getRootConnection(); private final ByteString asn1BaseDn; private final int serverId; private final ServerState state; @@ -83,16 +78,6 @@ */ protected static final String REPLICATION_STATE = "ds-sync-state"; /** * The attribute name used to store the entryUUID. */ private static final String ENTRY_UUID = "entryUUID"; /** * The attribute name used to store the RUV elements. */ private static final String REPLICATION_RUV_ELEMENT = "nsds50ruv"; /** * create a new ServerState. * @param baseDn The baseDN for which the ServerState is created @@ -103,12 +88,12 @@ this.baseDn = baseDn; this.serverId = serverId; this.state = new ServerState(); this.asn1BaseDn = ByteString.valueOf(baseDn.toString()); loadState(); } /** * Create a new PersistenServerState based on an already existing ServerState. * Create a new PersistentServerState based on an already existing * ServerState. * * @param baseDn The baseDN for which the ServerState is created. * @param serverId The serverId. @@ -119,7 +104,6 @@ this.baseDn = baseDn; this.serverId = serverId; this.state = state; this.asn1BaseDn = ByteString.valueOf(baseDn.toString()); loadState(); } @@ -170,16 +154,14 @@ */ public void loadState() { SearchResultEntry stateEntry = null; // try to load the state from the base entry. stateEntry = searchBaseEntry(); SearchResultEntry stateEntry = searchBaseEntry(); if (stateEntry == null) { // The base entry does not exist yet // in the database or was deleted. Try to read the ServerState // from the configuration instead. /* The base entry does not exist yet in the database or was deleted. Try to read the ServerState from the configuration instead. */ stateEntry = searchConfigEntry(); } @@ -205,50 +187,47 @@ */ private SearchResultEntry searchBaseEntry() { LDAPFilter filter; try { filter = LDAPFilter.decode("objectclass=*"); } catch (LDAPException e) { // can not happen return null; } /* * Search the database entry that is used to periodically * save the ServerState */ LinkedHashSet<String> attributes = new LinkedHashSet<String>(1); attributes.add(REPLICATION_STATE); InternalSearchOperation search = conn.processSearch(asn1BaseDn, SearchScope.BASE_OBJECT, DereferencePolicy.DEREF_ALWAYS, 0, 0, false, filter,attributes); if (((search.getResultCode() != ResultCode.SUCCESS)) && ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT))) { Message message = ERR_ERROR_SEARCHING_RUV. get(search.getResultCode().getResultCodeName(), search.toString(), search.getErrorMessage(), baseDn.toString()); logError(message); return null; } SearchResultEntry stateEntry = null; if (search.getResultCode() == ResultCode.SUCCESS) { SearchFilter filter = SearchFilter.createFilterFromString("objectclass=*"); /* * Read the serverState from the REPLICATION_STATE attribute * Search the database entry that is used to periodically * save the ServerState */ LinkedList<SearchResultEntry> result = search.getSearchEntries(); if (!result.isEmpty()) LinkedHashSet<String> attributes = new LinkedHashSet<String>(1); attributes.add(REPLICATION_STATE); InternalSearchOperation search = conn.processSearch(baseDn, SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, filter, attributes); if (((search.getResultCode() != ResultCode.SUCCESS)) && ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT))) { stateEntry = result.getFirst(); Message message = ERR_ERROR_SEARCHING_RUV. get(search.getResultCode().getResultCodeName(), search.toString(), search.getErrorMessage(), baseDn.toString()); logError(message); return null; } SearchResultEntry stateEntry = null; if (search.getResultCode() == ResultCode.SUCCESS) { // Read the serverState from the REPLICATION_STATE attribute LinkedList<SearchResultEntry> result = search.getSearchEntries(); if (!result.isEmpty()) { stateEntry = result.getFirst(); } } return stateEntry; } return stateEntry; catch (DirectoryException e) { // cannot happen return null; } } /** @@ -276,15 +255,12 @@ if (op.getResultCode() == ResultCode.SUCCESS) { /* * Read the serverState from the REPLICATION_STATE attribute */ // Read the serverState from the REPLICATION_STATE attribute LinkedList<SearchResultEntry> resultEntries = op.getSearchEntries(); if (!resultEntries.isEmpty()) { SearchResultEntry resultEntry = resultEntries.getFirst(); return resultEntry; return resultEntries.getFirst(); } } return null; @@ -455,31 +431,23 @@ dbMaxCn = serverStateMaxCn; for (SearchResultEntry resEntry : op.getSearchEntries()) { List<Attribute> attrs = resEntry.getAttribute(histType); Iterator<AttributeValue> iav = attrs.get(0).iterator(); try for (AttributeValue attrValue : resEntry.getAttribute(histType).get(0)) { while (true) { AttributeValue attrVal = iav.next(); HistoricalAttributeValue histVal = new HistoricalAttributeValue(attrVal.toString()); ChangeNumber cn = histVal.getCn(); HistoricalAttributeValue histVal = new HistoricalAttributeValue(attrValue.toString()); ChangeNumber cn = histVal.getCn(); if ((cn != null) && (cn.getServerId() == serverId)) if ((cn != null) && (cn.getServerId() == serverId)) { // compare the csn regarding the maxCn we know and // store the biggest if (ChangeNumber.compare(dbMaxCn, cn) < 0) { // compare the csn regarding the maxCn we know and // store the biggest if (ChangeNumber.compare(dbMaxCn, cn) < 0) { dbMaxCn = cn; } dbMaxCn = cn; } } } catch(Exception e) { } } if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0) @@ -495,215 +463,6 @@ } } /** * Check if a ReplicaUpdateVector entry is present * if so, translate the ruv into a serverState and * a generationId. * @return the generationId translated from the RUV * entry, 0 if no RUV is present */ public Long checkRUVCompat() { Long genId = null; SearchResultEntry ruvEntry = null; try { // Search the RUV in the DB ruvEntry = searchRUVEntry(); if (ruvEntry == null) return null; // Check if the serverState is already initialized if( !isServerStateInitilized()) { // Translate the ruv to serverState // and GenerationId genId = initializeStateWithRUVEntry(ruvEntry); } } catch (Exception e) { Message message = NOTE_ERR_WHILE_TRYING_TO_DECODE_RUV_IN_STATE.get( baseDn.toString()); logError(message); } // In any case, remove the RUV entry // if it exists DeleteOperationBasis del = new DeleteOperationBasis(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), null, ByteString.valueOf(ruvEntry.getDN().toNormalizedString())); // Run the internal operation del.setInternalOperation(true); del.setSynchronizationOperation(true); del.setDontSynchronize(true); del.run(); return genId; } /** * Initialize the serverState and the GenerationId based on a RUV * entry. * @param ruvEntry the entry to translate into a serverState. * @return the generationId translated from the RUV entry. */ private Long initializeStateWithRUVEntry(SearchResultEntry ruvEntry) { Long genId = null; String value = null; String csn = null; AttributeType ruvElementType = DirectoryServer.getAttributeType(REPLICATION_RUV_ELEMENT); if (ruvElementType == null) return null; for (Attribute attr : ruvEntry.getAttribute(ruvElementType)) { Iterator<AttributeValue> it = attr.iterator(); while (it.hasNext()) { value = it.next().toString(); // Search for the GenerationId if (value.startsWith("{replicageneration} ")) { // Get only the timestamp present in the CSN String replicaGen = value.substring(20, 28); genId = Long.parseLong(replicaGen,16); } else { // Translate the other elements into serverState if (value.startsWith("{replica ")) { String[] bits = value.split(" "); // Need to take into account when a purl is empty if (bits.length > 3) { if (bits[2].contains("ldap")) { // the ldap url is present so the max csn is the 5th element // Example : // {replica 5 ldap://host:port} 494b6635000000050000 4aeab8f300 // 0000050000 csn = bits[4]; } else { // no ldap url so the max csn is the 4th element // Example : // {replica 31842} 4a0d1ff700017c620000 4a926b6500007c620000 csn = bits[3]; } String temp = csn.substring(0, 8); Long timeStamp = Long.parseLong(temp, 16); temp = csn.substring(8, 12); Integer seqNum = Integer.parseInt(temp, 16); temp = csn.substring(12, 16); Integer replicaId = Integer.parseInt(temp, 16); // No need to take into account the subSeqNum ChangeNumber cn = new ChangeNumber(timeStamp*1000, seqNum, replicaId); this.update(cn); } } } } } return genId; } /** * Check if the server State is initialized by searching * the attribute type REPLICATION_STATE in the root entry. * @return true if the serverState is initialized, false * otherwise */ private boolean isServerStateInitilized() { SearchResultEntry resultEntry = searchBaseEntry(); AttributeType synchronizationStateType = DirectoryServer.getAttributeType(REPLICATION_STATE); List<Attribute> attrs = resultEntry.getAttribute(synchronizationStateType); return (attrs != null); } /** * Search the database entry that represent a serverState * using the RUV format (compatibility mode). * @return the corresponding RUV entry, null otherwise */ private SearchResultEntry searchRUVEntry() { LDAPFilter filter; SearchResultEntry ruvEntry = null; // Search the RUV entry try { filter = LDAPFilter.decode("objectclass=ldapSubEntry"); } catch (LDAPException e) { // can not happen return null; } LinkedHashSet<String> attributes = new LinkedHashSet<String>(1); attributes.add(ENTRY_UUID); attributes.add(REPLICATION_RUV_ELEMENT); InternalSearchOperation search = conn.processSearch(asn1BaseDn, SearchScope.SUBORDINATE_SUBTREE, DereferencePolicy.DEREF_ALWAYS, 0, 0, false, filter,attributes); if (((search.getResultCode() != ResultCode.SUCCESS)) && ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT))) return null; if (search.getResultCode() == ResultCode.SUCCESS) { /* * Search the ldapSubEntry with the entryUUID equals * to "ffffffff-ffff-ffff-ffff-ffffffffffff" */ LinkedList<SearchResultEntry> result = search.getSearchEntries(); if (!result.isEmpty()) { for (SearchResultEntry ldapSubEntry : result) { List<Attribute> attrs = ldapSubEntry.getAttribute(ENTRY_UUID.toLowerCase()); if (attrs != null) { Iterator<AttributeValue> iav = attrs.get(0).iterator(); AttributeValue attrVal = iav.next(); if (attrVal.toString(). equalsIgnoreCase("ffffffff-ffff-ffff-ffff-ffffffffffff")) { ruvEntry = ldapSubEntry; break; } } } } } return ruvEntry; } /** * Get the largest ChangeNumber seen for a given LDAP server ID. * opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -23,6 +23,7 @@ * * * Copyright 2007-2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.plugin; @@ -58,7 +59,7 @@ * One of this object is instantiated for each ReplicationDomain. * */ public class RemotePendingChanges public final class RemotePendingChanges { /** * A map used to store the pending changes. opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2008 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.plugin; import org.opends.server.replication.protocol.LDAPUpdateMsg; @@ -57,7 +57,6 @@ private final BlockingQueue<UpdateToReplay> updateToReplayQueue; private volatile boolean shutdown = false; private volatile boolean done = false; private static int count = 0; /** @@ -91,12 +90,11 @@ TRACER.debugInfo("Replication Replay thread starting."); } UpdateToReplay updateToreplay = null; while (!shutdown) { try { UpdateToReplay updateToreplay; // Loop getting an updateToReplayQueue from the update message queue and // replaying matching changes while ( (!shutdown) && @@ -119,27 +117,9 @@ logError(message); } } done = true; if (debugEnabled()) { TRACER.debugInfo("Replication Replay thread stopping."); } } /** * Wait for the completion of this thread. */ public void waitForShutdown() { try { while ((done == false) && (this.isAlive())) { Thread.sleep(50); } } catch (InterruptedException e) { // exit the loop if this thread is interrupted. } } } opends/src/server/org/opends/server/replication/plugin/ReplicationRepairRequestControl.java
@@ -23,6 +23,7 @@ * * * Copyright 2008 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.plugin; @@ -47,7 +48,7 @@ public class ReplicationRepairRequestControl extends Control { /** * ControlDecoder implentation to decode this control from a ByteString. * ControlDecoder implementation to decode this control from a ByteString. */ private final static class Decoder implements ControlDecoder<ReplicationRepairRequestControl> @@ -84,7 +85,6 @@ public static final String OID_REPLICATION_REPAIR_CONTROL = "1.3.6.1.4.1.26027.1.5.2"; /** * Creates a new instance of the replication repair request control with the * default settings. @@ -95,8 +95,6 @@ } /** * Creates a new instance of the replication repair control with the * provided information. @@ -111,7 +109,7 @@ } /** * Writes this control's value to an ASN.1 writer. The value (if any) must be * Writes this control value to an ASN.1 writer. The value (if any) must be * written as an ASN1OctetString. * * @param writer The ASN.1 writer to use. @@ -122,8 +120,6 @@ // No value element } /** * Appends a string representation of this replication repair request control * to the provided buffer. opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.protocol; @@ -176,31 +177,13 @@ pos += length + 1; /* Read the hasTimeout flag */ if (in[pos++] == 1) { hasTimeout = true; } else { hasTimeout = false; } hasTimeout = in[pos++] == 1; /* Read the hasWrongStatus flag */ if (in[pos++] == 1) { hasWrongStatus = true; } else { hasWrongStatus = false; } hasWrongStatus = in[pos++] == 1; /* Read the hasReplayError flag */ if (in[pos++] == 1) { hasReplayError = true; } else { hasReplayError = false; } hasReplayError = in[pos++] == 1; /* Read the list of failed server ids */ while (pos < in.length) @@ -318,7 +301,7 @@ */ public String errorsToString() { String idList = null; String idList; if (failedServers.size() > 0) { idList = "["; @@ -334,12 +317,10 @@ idList="none"; } String ackErrorStr = "hasTimeout: " + (hasTimeout ? "yes" : "no") + ", " + return "hasTimeout: " + (hasTimeout ? "yes" : "no") + ", " + "hasWrongStatus: " + (hasWrongStatus ? "yes" : "no") + ", " + "hasReplayError: " + (hasReplayError ? "yes" : "no") + ", " + "concerned server ids: " + idList; return ackErrorStr; } } opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -552,8 +552,7 @@ */ public List<Attribute> getAttributes() throws LDAPException, ASN1Exception { List<Attribute> attrs = decodeAttributes(encodedAttributes); return attrs; return decodeAttributes(encodedAttributes); } /** opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -65,7 +65,7 @@ isSubtreeDelete = true; } catch(Exception e) {} {/* do nothing */} } /** opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.protocol; @@ -128,10 +129,7 @@ // data length = in.length - (pos + 1); this.entryByteArray = new byte[length]; for (int i=0; i<length; i++) { entryByteArray[i] = in[pos+i]; } System.arraycopy(in, pos, entryByteArray, 0, length); } catch (UnsupportedEncodingException e) { opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -87,7 +87,7 @@ } /** * Creates a new UpdateMsg with the given informations. * Creates a new UpdateMsg with the given information. * * @param ctx The replication Context of the operation for which the * update message must be created,. @@ -103,7 +103,7 @@ } /** * Creates a new UpdateMessage with the given informations. * Creates a new UpdateMessage with the given information. * * @param cn The ChangeNumber of the operation for which the * UpdateMessage is created. @@ -491,9 +491,9 @@ /* Read the changeNumber */ int pos = 2; int length = getNextLength(encodedMsg, pos); String changenumberStr = new String(encodedMsg, pos, length, "UTF-8"); String changeNumberStr = new String(encodedMsg, pos, length, "UTF-8"); pos += length + 1; changeNumber = new ChangeNumber(changenumberStr); changeNumber = new ChangeNumber(changeNumberStr); /* Read the dn */ length = getNextLength(encodedMsg, pos); @@ -506,10 +506,7 @@ pos += length + 1; /* Read the assured information */ if (encodedMsg[pos++] == 1) assuredFlag = true; else assuredFlag = false; assuredFlag = encodedMsg[pos++] == 1; /* Read the assured mode */ assuredMode = AssuredMode.valueOf(encodedMsg[pos++]); @@ -556,15 +553,12 @@ /* read the changeNumber */ int pos = 1; int length = getNextLength(encodedMsg, pos); String changenumberStr = new String(encodedMsg, pos, length, "UTF-8"); String changeNumberStr = new String(encodedMsg, pos, length, "UTF-8"); pos += length + 1; changeNumber = new ChangeNumber(changenumberStr); changeNumber = new ChangeNumber(changeNumberStr); /* read the assured information */ if (encodedMsg[pos++] == 1) assuredFlag = true; else assuredFlag = false; assuredFlag = encodedMsg[pos++] == 1; /* read the dn */ length = getNextLength(encodedMsg, pos); @@ -628,7 +622,7 @@ /** * Decode a provided byte array as a list of RawAttribute. * @param in The provided byte array. * @return The list of Rawattribute objects. * @return The list of RawAttribute objects. * @throws LDAPException when it occurs. * @throws ASN1Exception when it occurs. */ opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -331,14 +331,13 @@ public byte[] getBytes_V45(short reqProtocolVersion) throws UnsupportedEncodingException { int bodyLength = 0; byte[] byteNewSuperior = null; byte[] byteNewSuperiorId = null; // calculate the length necessary to encode the parameters byte[] byteNewRdn = newRDN.getBytes("UTF-8"); bodyLength = byteNewRdn.length + 1 + 1; int bodyLength = byteNewRdn.length + 1 + 1; if (newSuperior != null) { @@ -438,10 +437,7 @@ pos += length + 1; /* get the deleteoldrdn flag */ if (in[pos] == 0) deleteOldRdn = false; else deleteOldRdn = true; deleteOldRdn = in[pos] != 0; pos++; // For easiness (no additional method), simply compare PDU type to @@ -501,10 +497,7 @@ pos += length + 1; /* get the deleteoldrdn flag */ if (in[pos] == 0) deleteOldRdn = false; else deleteOldRdn = true; deleteOldRdn = in[pos] != 0; pos++; // Read mods len opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -168,21 +168,6 @@ } if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2) { String mods = ""; try { ArrayList<RawModification> ldapmods = decodeRawMods(encodedMods); for (RawModification mod : ldapmods) { mods += mod.toString(); } } catch (LDAPException e) { } catch (ASN1Exception e) { } return "ModifyMsg content: " + " protocolVersion: " + protocolVersion + " dn: " + dn + @@ -191,8 +176,8 @@ " assuredFlag: " + assuredFlag + " assuredMode: " + assuredMode + " safeDataLevel: " + safeDataLevel + " size: " + encodedMods.length + mods; " size: " + encodedMods.length; /* Do not append mods, they can be too long */ } opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -23,6 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.protocol; @@ -45,7 +46,7 @@ /** * This message is part of the replication protocol. * RS1 sends a MonitorRequestMessage to RS2 to requests its monitoring * informations. * information. * When RS2 receives a MonitorRequestMessage from RS1, RS2 responds with a * MonitorMsg. */ @@ -64,7 +65,7 @@ /** * Data structure to manage the state of this replication server * and the state informations for the servers connected to it. * and the state information for the servers connected to it. * */ class SubTopoMonitorData @@ -120,7 +121,7 @@ } /** * Sets the informations of an LDAP server. * Sets the information of an LDAP server. * @param serverId The serverID. * @param state The server state. * @param approxFirstMissingDate The approximation of the date @@ -313,8 +314,7 @@ } asn1Reader.readEndSequence(); } catch(Exception e) { { /* do nothing */ } } @@ -374,7 +374,7 @@ } writer.writeEndSequence(); // then the LDAP server datas // then the LDAP server data Set<Integer> servers = data.ldapStates.keySet(); for (Integer sid : servers) { @@ -512,11 +512,10 @@ sd.state.toString() + "]" + " afmd=" + sd.approxFirstMissingDate + "]"; } String me = this.getClass().getCanonicalName() + return this.getClass().getCanonicalName() + "[ sender=" + this.senderID + " destination=" + this.destination + " data=[" + stateS + "]" + "]"; return me; } } opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -121,8 +121,7 @@ */ public static short minWithCurrent(short version) { short newVersion = (version < currentVersion ? version : currentVersion); return newVersion; return (version < currentVersion ? version : currentVersion); } } opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -23,7 +23,7 @@ * * * Copyright 2008 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -136,8 +136,8 @@ } else { this.sslCipherSuites = new String[sslProtocols.size()]; sslProtocols.toArray(this.sslCipherSuites); this.sslCipherSuites = new String[sslCipherSuites.size()]; sslCipherSuites.toArray(this.sslCipherSuites); } this.sslEncryption = sslEncryption; opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.protocol; @@ -140,7 +141,7 @@ * @param buffer The encode form of the ReplicationMsg. * @param version The version to use to decode the msg. * * @return The generated SycnhronizationMessage. * @return The generated SynchronizationMessage. * * @throws DataFormatException If the encoded form was not a valid msg. * @throws UnsupportedEncodingException If UTF8 is not supported. @@ -153,7 +154,7 @@ throws DataFormatException, UnsupportedEncodingException, NotSupportedOldVersionPDUException { ReplicationMsg msg = null; ReplicationMsg msg; switch (buffer[0]) { case MSG_TYPE_SERVER_START_V1: opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -23,6 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.protocol; @@ -66,7 +67,6 @@ * Server after being connected to a replication server for a given * replication domain. * * @param baseDn The base DN. * @param maxReceiveDelay The max receive delay for this server. * @param maxReceiveQueue The max receive Queue for this server. * @param maxSendDelay The max Send Delay from this server. @@ -77,18 +77,18 @@ * @param protocolVersion The replication protocol version of the creator. * @param generationId The generationId for this server. * @param sslEncryption Whether to continue using SSL to encrypt messages * after the start messages have been exchanged. * after the start messages have been exchanged. * @param groupId The group id of the DS for this DN */ public ServerStartECLMsg(String baseDn, int maxReceiveDelay, int maxReceiveQueue, int maxSendDelay, int maxSendQueue, int windowSize, long heartbeatInterval, ServerState serverState, short protocolVersion, long generationId, boolean sslEncryption, byte groupId) public ServerStartECLMsg(int maxReceiveDelay, int maxReceiveQueue, int maxSendDelay, int maxSendQueue, int windowSize, long heartbeatInterval, ServerState serverState, short protocolVersion, long generationId, boolean sslEncryption, byte groupId) { super(protocolVersion, generationId); opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.protocol; @@ -405,7 +406,7 @@ "\nprotocolVersion: " + protocolVersion + "\ngenerationId: " + generationId + "\ngroupId: " + groupId + "\nbaseDn: " + baseDn.toString() + "\nbaseDn: " + baseDn + "\nheartbeatInterval: " + heartbeatInterval + "\nmaxReceiveDelay: " + maxReceiveDelay + "\nmaxReceiveQueue: " + maxReceiveQueue + opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -23,12 +23,14 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.protocol; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collections; import java.util.zip.DataFormatException; import org.opends.server.replication.common.ChangeNumber; @@ -63,8 +65,6 @@ */ public final static short REQUEST_TYPE_EQUALS_REPL_CHANGE_NUMBER = 2; /** * This specifies that the request on the ECL is a PERSISTENT search * with changesOnly = false. @@ -82,8 +82,6 @@ */ public final static short PERSISTENT_CHANGES_ONLY = 2; // The type of request as defined by REQUEST_TYPE_... private short eclRequestType; @@ -182,10 +180,7 @@ if (excludedDNsString.length()>0) { String[] excludedDNsStr = excludedDNsString.split(";"); for (String excludedDNStr : excludedDNsStr) { this.excludedServiceIDs.add(excludedDNStr); } Collections.addAll(this.excludedServiceIDs, excludedDNsStr); } pos += length + 1; @@ -219,7 +214,7 @@ @Override public byte[] getBytes() { String excludedSIDsString = new String(); String excludedSIDsString = ""; for (String excludedServiceID : excludedServiceIDs) { excludedSIDsString = excludedSIDsString.concat(excludedServiceID+";"); opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.protocol; @@ -176,10 +177,8 @@ { /* first byte is the type */ boolean foundMatchingType = false; for (int i = 0; i < types.length; i++) { if (types[i] == encodedMsg[0]) { for (byte type : types) { if (type == encodedMsg[0]) { foundMatchingType = true; break; } opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -23,7 +23,7 @@ * * * Copyright 2008-2009 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -389,13 +389,7 @@ status = ServerStatus.valueOf(in[1]); /* Read the assured flag */ if (in[2] == 1) { assuredFlag = true; } else { assuredFlag = false; } assuredFlag = in[2] == 1; /* Read the assured mode */ assuredMode = AssuredMode.valueOf(in[3]); @@ -403,7 +397,7 @@ /* Read the safe data level */ safeDataLevel = in[4]; /* Read the refferals URLs */ /* Read the referrals URLs */ int pos = 5; referralsURLs = new ArrayList<String>(); while (pos < in.length) opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -23,7 +23,7 @@ * * * Copyright 2007-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -117,13 +117,7 @@ /* Read DS assured flag */ boolean assuredFlag; if (in[pos++] == 1) { assuredFlag = true; } else { assuredFlag = false; } assuredFlag = in[pos++] == 1; /* Read DS assured mode */ AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]); @@ -188,7 +182,7 @@ } /* Read Protocol version */ protocolVersion = Short.valueOf(in[pos++]); protocolVersion = (short)in[pos++]; } /* Now create DSInfo and store it in list */ opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.protocol; @@ -77,7 +78,7 @@ {} /** * Creates a new UpdateMsg with the given informations. * Creates a new UpdateMsg with the given information. * * @param bytes A Byte Array with the encoded form of the message. * @@ -157,16 +158,8 @@ @Override public boolean equals(Object obj) { if (obj != null) { if (obj.getClass() != this.getClass()) return false; return changeNumber.equals(((UpdateMsg) obj).changeNumber); } else { return false; } return obj != null && obj.getClass() == this.getClass() && changeNumber.equals(((UpdateMsg) obj).changeNumber); } /** @@ -343,10 +336,7 @@ changeNumber = new ChangeNumber(changenumberStr); /* Read the assured information */ if (encodedMsg[pos++] == 1) assuredFlag = true; else assuredFlag = false; assuredFlag = encodedMsg[pos++] == 1; /* Read the assured mode */ assuredMode = AssuredMode.valueOf(encodedMsg[pos++]); opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; import org.opends.messages.MessageBuilder; @@ -106,7 +106,7 @@ private DbMonitorProvider dbMonitor = new DbMonitorProvider(); private boolean shutdown = false; private boolean done = false; private DirectoryThread thread = null; private DirectoryThread thread; private final Object flushLock = new Object(); private ReplicationServer replicationServer; @@ -141,7 +141,7 @@ this.baseDn = baseDn; trimAge = replicationServer.getTrimAge(); queueMaxSize = queueSize; queueLowmark = queueSize * 1 / 5; queueLowmark = queueSize / 5; queueHimark = queueSize * 4 / 5; queueMaxBytes = 200 * queueMaxSize; queueLowmarkBytes = 200 * queueLowmark; @@ -281,9 +281,7 @@ { flush(); } ReplicationIterator it = new ReplicationIterator(serverId, db, changeNumber, this); return it; return new ReplicationIterator(serverId, db, changeNumber, this); } /** @@ -313,7 +311,7 @@ */ public void shutdown() { if (shutdown == true) if (shutdown) { return; } @@ -325,14 +323,14 @@ } synchronized (this) { while (done == false) { /* Can this be replaced with thread.join() ? */ while (!done) { try { this.wait(); } catch (Exception e) {} { /* do nothing */} } } @@ -351,7 +349,7 @@ */ public void run() { while (shutdown == false) while (!shutdown) { try { @@ -367,7 +365,9 @@ { msgQueue.wait(1000); } catch (InterruptedException e) { } { Thread.currentThread().interrupt(); } } } } catch (Exception end) opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -23,7 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; @@ -95,7 +95,7 @@ try { DatabaseEntry key = new ReplicationDraftCNKey(draftCN); DatabaseEntry data = new DraftCNData(draftCN, DatabaseEntry data = new DraftCNData( value, domainBaseDN, changeNumber); // Use a transaction so that we can override durability. @@ -625,8 +625,7 @@ try { String str = decodeUTF8(key.getData()); int draftCN = new Integer(str); return draftCN; return Integer.valueOf(str); } catch (Exception e) { opends/src/server/org/opends/server/replication/server/DraftCNData.java
@@ -23,7 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2010-2011 ForgeRock AS. * Portions Copyright 2010-2013 ForgeRock AS. */ package org.opends.server.replication.server; @@ -51,13 +51,12 @@ /** * Creates a record to be stored in the DraftCNDB. * @param draftCN The DraftCN key. * @param value The value (cookie). * @param serviceID The serviceID (domain DN). * @param changeNumber The replication change number. */ public DraftCNData(int draftCN, String value, String serviceID, ChangeNumber changeNumber) public DraftCNData(String value, String serviceID, ChangeNumber changeNumber) { String record = value + FIELD_SEPARATOR + serviceID @@ -156,8 +155,8 @@ */ public void toString(StringBuilder buffer) { buffer.append("DraftCNData : [value=" + value); buffer.append("] [serviceID=" + serviceID); buffer.append("] [changeNumber=" + changeNumber + "]"); buffer.append("DraftCNData : [value=").append(value); buffer.append("] [serviceID=").append(serviceID); buffer.append("] [changeNumber=").append(changeNumber).append("]"); } } opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -23,7 +23,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; @@ -70,10 +70,6 @@ * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); // A dedicated thread loops trim(). // trim() : deletes from the DB a number of changes that are older than a // certain date. // static int NO_KEY = 0; private DraftCNDB db; @@ -82,17 +78,21 @@ private DbMonitorProvider dbMonitor = new DbMonitorProvider(); private boolean shutdown = false; private boolean trimDone = false; private DirectoryThread thread = null; private ReplicationServer replicationServer; /* A dedicated thread loops trim(). trim() : deletes from the DB a number of changes that are older than a certain date. */ private DirectoryThread thread; /** * * The trim age in milliseconds. Changes record in the change DB that * are older than this age are removed. * */ private long trimAge; private ReplicationServer replicationServer; /** * Creates a new dbHandler associated to a given LDAP server. * @@ -113,7 +113,7 @@ firstkey = db.readFirstDraftCN(); lastkey = db.readLastDraftCN(); // Triming thread // Trimming thread thread = new DirectoryThread(this, "Replication DraftCN db "); thread.start(); @@ -204,7 +204,7 @@ cursor.close(); } catch(Exception e) { { /* do nothing */ } } @@ -226,9 +226,7 @@ public DraftCNDbIterator generateIterator(int startDraftCN) throws DatabaseException, Exception { DraftCNDbIterator it = new DraftCNDbIterator(db, startDraftCN); return it; return new DraftCNDbIterator(db, startDraftCN); } /** @@ -236,7 +234,7 @@ */ public void shutdown() { if (shutdown == true) if (shutdown) { return; } @@ -248,14 +246,14 @@ } synchronized (this) { while (trimDone == false) { /* Can we just do a thread.join() ? */ while (!trimDone) { try { this.wait(); } catch (Exception e) {} { /* do nothing */ } } } @@ -271,7 +269,7 @@ */ public void run() { while (shutdown == false) while (!shutdown) { try { trim(); @@ -282,7 +280,9 @@ { this.wait(1000); } catch (InterruptedException e) { } { Thread.currentThread().interrupt(); } } } catch (Exception end) { @@ -393,7 +393,7 @@ continue; } ServerState cnVector = null; ServerState cnVector; try { Map<String,ServerState> cnStartStates = opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -23,7 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; @@ -145,7 +145,7 @@ { while (true) { // wait to be resumed or shutdowned // wait to be resumed or shutdown if ((suspended) && (!shutdown)) { synchronized(this) @@ -224,12 +224,7 @@ ECLUpdateMsg update = null; while (true) { if (shutdown) { return; } if (suspended) if (shutdown || suspended) { return; } @@ -267,14 +262,13 @@ { // except if we are in persistent search Thread.sleep(200); continue; } } else { // Publish the update to the remote server using a protocol version he // it supports publish(update, protocolVersion); publish(update); update = null; } } @@ -292,7 +286,7 @@ /** * Publish a change either on the protocol session or to a persistent search. */ private void publish(ECLUpdateMsg msg, short reqProtocolVersion) private void publish(ECLUpdateMsg msg) throws IOException { if (debugEnabled()) opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java
@@ -23,6 +23,7 @@ * * * Copyright 2008-2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.server; @@ -47,10 +48,10 @@ public abstract class ExpectedAcksInfo { // The server handler of the server that sent the assured update message and // to whow we want to return the final ack // to who we want to return the final ack private ServerHandler requesterServerHandler = null; // The requested assured mode of matcching update message // The requested assured mode of matching update message private AssuredMode assuredMode = null; /** @@ -63,8 +64,8 @@ * This is used for concurrent access to this object by either the assured * timeout task or the code for processing an ack for the matching update * message. This should be set to true when the treatment of the expected * acks is completed or an ack timeout has occured and we are going to remove * this object from the map where it is stored. * acks is completed or an ack timeout has occurred and we are going to * remove this object from the map where it is stored. */ private boolean completed = false; opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -23,7 +23,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; @@ -87,15 +87,16 @@ private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; // DS safe data level (relevant if assured mode is safe data) private byte safeDataLevel = (byte) -1; // The prococol version // The protocol version private short protocolVersion = -1; private Set<String> eclInclude = new HashSet<String>(); private Set<String> eclIncludeForDeletes = new HashSet<String>(); /** * Creates a new LighweightServerHandler with the provided serverid, connected * to the remote Replication Server represented by replServerHandler. * Creates a new LightweightServerHandler with the provided serverid, * connected to the remote Replication Server represented by * replServerHandler. * * @param replServerHandler The server handler of the RS this remote DS is * connected to @@ -150,11 +151,9 @@ */ public DSInfo toDSInfo() { DSInfo dsInfo = new DSInfo(serverId, replicationServerId, generationId, return new DSInfo(serverId, replicationServerId, generationId, status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, eclInclude, eclIncludeForDeletes, protocolVersion); return dsInfo; } /** opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -23,7 +23,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; @@ -180,7 +180,7 @@ while ((msgQueue.count() > maxQueueSize) || (msgQueue.bytesCount() > maxQueueBytesSize)) { setFollowing(false); following = false; msgQueue.removeFirst(); } } @@ -272,9 +272,9 @@ protected UpdateMsg getNextMessage(boolean synchronous) { UpdateMsg msg; while (activeConsumer == true) while (activeConsumer) { if (following == false) if (!following) { /* this server is late with regard to some other masters * in the topology or just joined the topology. @@ -376,7 +376,7 @@ if ((msgQueue.count() < maxQueueSize) && (msgQueue.bytesCount() < maxQueueBytesSize)) { setFollowing(true); following = true; } } } else @@ -392,7 +392,7 @@ if (msgQueue.contains(msg)) { /* we finally catch up with the regular queue */ setFollowing(true); following = true; lateQueue.clear(); UpdateMsg msg1; do @@ -417,11 +417,11 @@ } synchronized (msgQueue) { if (following == true) if (following) { try { while (msgQueue.isEmpty() && (following == true)) while (msgQueue.isEmpty() && following) { if (!synchronous) return null; @@ -465,7 +465,7 @@ ChangeNumber result = null; synchronized (msgQueue) { if (isFollowing()) if (following) { if (msgQueue.isEmpty()) { @@ -479,13 +479,14 @@ { if (lateQueue.isEmpty()) { // isFollowing is false AND lateQueue is empty // We may be at the very moment when the writer has emptyed the // lateQueue when it sent the last update. The writer will fill again // the lateQueue when it will send the next update but we are not yet // there. So let's take the last change not sent directly from // the db. /* following is false AND lateQueue is empty We may be at the very moment when the writer has emptied the lateQueue when it sent the last update. The writer will fill again the lateQueue when it will send the next update but we are not yet there. So let's take the last change not sent directly from the db. */ ReplicationIteratorComparator comparator = new ReplicationIteratorComparator(); SortedSet<ReplicationIterator> iteratorSortedSet = @@ -500,9 +501,11 @@ // get an iterator in this server db from that last change ReplicationIterator iterator = replicationServerDomain.getChangelogIterator(serverId, lastCsn); // if that iterator has changes, then it is a candidate // it is added in the sorted list at a position given by its // current change (see ReplicationIteratorComparator). /* if that iterator has changes, then it is a candidate it is added in the sorted list at a position given by its current change (see ReplicationIteratorComparator). */ if (iterator != null) { if (iterator.getChange() != null) @@ -558,7 +561,7 @@ * When the server is up to date or close to be up to date, * the number of updates to be sent is the size of the receive queue. */ if (isFollowing()) if (following) return msgQueue.count(); else { @@ -622,16 +625,6 @@ } /** * Check if the LDAP server can follow the speed of the other servers. * @return true when the server has all the not yet sent changes * in its queue. */ public boolean isFollowing() { return following; } /** * Set that the consumer is now becoming inactive and thus getNextMessage * should not return any UpdateMsg any more. * @param active the provided state of the consumer. @@ -641,14 +634,6 @@ this.activeConsumer = active; } /** * Set the following flag of this server. * @param following the value that should be set. */ private void setFollowing(boolean following) { this.following = following; } /** * Set the initial value of the serverState for this handler. opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -23,7 +23,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2012 ForgeRock AS * Portions Copyright 2012-2013 ForgeRock AS */ package org.opends.server.replication.server; @@ -84,11 +84,6 @@ private ConcurrentHashMap<Integer, Long> missingChanges = new ConcurrentHashMap<Integer, Long>(); // For each RS server, an approximation of the date of the first missing // change private ConcurrentHashMap<Integer, Long> fmRSDate = new ConcurrentHashMap<Integer, Long>(); private ConcurrentHashMap<Integer, Long> missingChangesRS = new ConcurrentHashMap<Integer, Long>(); @@ -162,47 +157,38 @@ // Regarding each other LSj // Sum the difference : max(LSj) - state(LSi) Iterator<Integer> lsiStateItr = this.LDAPStates.keySet().iterator(); while (lsiStateItr.hasNext()) { Integer lsiSid = lsiStateItr.next(); for (Integer lsiSid : this.LDAPStates.keySet()) { ServerState lsiState = this.LDAPStates.get(lsiSid); Long lsiMissingChanges = (long)0; if (lsiState != null) { Iterator<Integer> lsjMaxItr = this.maxCNs.keySet().iterator(); while (lsjMaxItr.hasNext()) { Integer lsjSid = lsjMaxItr.next(); Long lsiMissingChanges = (long) 0; if (lsiState != null) { for (Integer lsjSid : this.maxCNs.keySet()) { ChangeNumber lsjMaxCN = this.maxCNs.get(lsjSid); ChangeNumber lsiLastCN = lsiState.getMaxChangeNumber(lsjSid); int missingChangesLsiLsj = ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN); ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN); if (debugEnabled()) { if (debugEnabled()) { mds += "+ diff("+lsjMaxCN+"-" +lsiLastCN+")="+missingChangesLsiLsj; "+ diff(" + lsjMaxCN + "-" + lsiLastCN + ")=" + missingChangesLsiLsj; } // Regarding a DS that is generating changes. If it is a local DS1, // we get its server state, store it, then retrieve server states of // remote DSs. When a remote server state is coming, it may contain // a change number for DS1 which is newer than the one we locally // stored in the server state of DS1. To prevent seeing DS1 has // missing changes whereas it is wrong, we replace the value with 0 // if it is a low value. We cannot overwrite big values as they may be // useful for a local server retrieving changes it generated earlier, // when it is recovering from an old snapshot and the local RS is // sending him the changes it is missing. /* Regarding a DS that is generating changes. If it is a local DS1, we get its server state, store it, then retrieve server states of remote DSs. When a remote server state is coming, it may contain a change number for DS1 which is newer than the one we locally stored in the server state of DS1. To prevent seeing DS1 has missing changes whereas it is wrong, we replace the value with 0 if it is a low value. We cannot overwrite big values as they may be useful for a local server retrieving changes it generated earlier, when it is recovering from an old snapshot and the local RS is sending him the changes it is missing. */ if (lsjSid.equals(lsiSid)) { if (missingChangesLsiLsj <= 50) { if (missingChangesLsiLsj <= 50) { missingChangesLsiLsj = 0; if (debugEnabled()) { if (debugEnabled()) { mds += " (diff replaced by 0 as for server id " + lsiSid + ")"; } } @@ -211,11 +197,10 @@ lsiMissingChanges += missingChangesLsiLsj; } } if (debugEnabled()) { if (debugEnabled()) { mds += "=" + lsiMissingChanges; } this.missingChanges.put(lsiSid,lsiMissingChanges); this.missingChanges.put(lsiSid, lsiMissingChanges); } // Computes the missing changes counters for RS : @@ -227,21 +212,17 @@ Long lsiMissingChanges = (long)0; if (lsiState != null) { Iterator<Integer> lsjMaxItr = this.maxCNs.keySet().iterator(); while (lsjMaxItr.hasNext()) { int lsjSid = lsjMaxItr.next(); for (Integer lsjSid : this.maxCNs.keySet()) { ChangeNumber lsjMaxCN = this.maxCNs.get(lsjSid); ChangeNumber lsiLastCN = lsiState.getMaxChangeNumber(lsjSid); int missingChangesLsiLsj = ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN); ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN); if (debugEnabled()) { if (debugEnabled()) { mds += "+ diff("+lsjMaxCN+"-" +lsiLastCN+")="+missingChangesLsiLsj; "+ diff(" + lsjMaxCN + "-" + lsiLastCN + ")=" + missingChangesLsiLsj; } lsiMissingChanges += missingChangesLsiLsj; } @@ -269,42 +250,25 @@ { String mds = "Monitor data=\n"; // RS data Iterator<Integer> rsite = fmRSDate.keySet().iterator(); while (rsite.hasNext()) { Integer sid = rsite.next(); mds += "\nfmRSDate(" + sid + ")=\t "+ "afmd=" + fmRSDate.get(sid); } // maxCNs Iterator<Integer> itc = maxCNs.keySet().iterator(); while (itc.hasNext()) { Integer sid = itc.next(); for (Integer sid : maxCNs.keySet()) { ChangeNumber cn = maxCNs.get(sid); mds += "\nmaxCNs(" + sid + ")= " + cn.toStringUI(); } // LDAP data Iterator<Integer> lsite = LDAPStates.keySet().iterator(); while (lsite.hasNext()) { Integer sid = lsite.next(); for (Integer sid : LDAPStates.keySet()) { ServerState ss = LDAPStates.get(sid); mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString() + "] afmd=" + this.getApproxFirstMissingDate(sid); + "] afmd=" + this.getApproxFirstMissingDate(sid); mds += " missingDelay=" + this.getApproxDelay(sid); mds +=" missingCount=" + missingChanges.get(sid); mds += " missingCount=" + missingChanges.get(sid); } // RS data rsite = RSStates.keySet().iterator(); while (rsite.hasNext()) { Integer sid = rsite.next(); for (Integer sid : RSStates.keySet()) { ServerState ss = RSStates.get(sid); mds += "\nRSData(" + sid + ")=\t" + "state=[" + ss.toString() + "] missingCount=" + missingChangesRS.get(sid); @@ -321,10 +285,7 @@ */ public void setMaxCNs(ServerState state) { Iterator<Integer> it = state.iterator(); while (it.hasNext()) { int sid = it.next(); for (Integer sid : state) { ChangeNumber newCN = state.getMaxChangeNumber(sid); setMaxCN(sid, newCN); } @@ -352,17 +313,6 @@ } /** * Get the highest know change number of the LDAP server with the provided * serverId. * @param serverId The server ID. * @return The highest change number. */ public ChangeNumber getMaxCN(int serverId) { return maxCNs.get(serverId); } /** * Get the state of the LDAP server with the provided serverId. * @param serverId The server ID. * @return The server state. @@ -454,9 +404,7 @@ */ public long getRSApproxFirstMissingDate(int serverId) { Long res; if ((res = fmRSDate.get(serverId)) != null) return res; // For now, we do store RS first missing change date return 0; } } opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -23,7 +23,7 @@ * * * Copyright 2009-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; @@ -184,7 +184,7 @@ { int FACTOR = 40; // Wait for 2 seconds before interrupting the thread int n = 0; while ((done == false) && (this.isAlive())) while ((!done) && (this.isAlive())) { Thread.sleep(50); n++; opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
@@ -23,6 +23,7 @@ * * * Copyright 2008-2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.server; @@ -93,7 +94,7 @@ System.arraycopy(origBytes, 0, bytes, 0, origBytes.length); int maxLen = bytes.length; int pos = -1; int pos; int nZeroFound = 0; // Number of 0 value found boolean found = false; @@ -143,7 +144,6 @@ System.arraycopy(origBytes, 0, bytes, 0, origBytes.length); maxLen = bytes.length; pos = -1; nZeroFound = 0; // Number of 0 value found found = false; @@ -183,13 +183,6 @@ } else { if (!(realUpdateMsg instanceof UpdateMsg)) { // Should never happen throw new UnsupportedEncodingException( "Unknown underlying real message type."); } /** * Prepare VLATEST serialized form of the message: * Get the encoding form of the real message then overwrite the assured @@ -204,7 +197,7 @@ System.arraycopy(origBytes, 0, bytes, 0, origBytes.length); int maxLen = bytes.length; int pos = -1; int pos; int nZeroFound = 0; // Number of 0 value found boolean found = false; opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -151,11 +151,6 @@ private ReplicationServer server; /** * The configuration of this backend. */ private BackendCfg cfg; /** * The number of milliseconds between job progress reports. */ private long progressInterval = 10000; @@ -214,7 +209,7 @@ if (config != null) { Validator.ensureTrue(config instanceof BackendCfg); cfg = (BackendCfg)config; BackendCfg cfg = (BackendCfg) config; DN[] newBaseDNs = new DN[cfg.getBaseDN().size()]; cfg.getBaseDN().toArray(newBaseDNs); setBaseDNs(newBaseDNs); @@ -655,7 +650,7 @@ attrs); ldifWriter.writeChangeRecord(changeRecord); } catch (Exception e) {} catch (Exception e) { /* do nothing */ } for (ReplicationServerDomain exportContainer : exportContainers) { @@ -718,7 +713,7 @@ for (int serverId : rsd.getServers()) { if (exportConfig != null && exportConfig.isCancelled()) { { // Abort if cancelled break; } @@ -764,7 +759,7 @@ while (ri.getChange() != null) { if (exportConfig != null && exportConfig.isCancelled()) { { // abort if cancelled break; } if (searchOperation != null) @@ -988,8 +983,7 @@ LDIFWriter ldifWriter2 = writer.getLDIFWriter(); ldifWriter2.writeChangeRecord(changeRecord); LDIFReader reader = writer.getLDIFReader(); Entry modDNEntry = reader.readEntry(); entry = modDNEntry; entry = reader.readEntry(); } } opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -23,9 +23,11 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import static org.opends.server.loggers.ErrorLogger.logError; @@ -75,7 +77,7 @@ // // A counter record has to follow the order of the db, so it needs to have // a changenumber key that follow the order. // A counter record must have its own chagenumber key since the Db does not // A counter record must have its own changenumber key since the Db does not // support duplicate key (it is a compatibility breaker character of the DB). // // We define 2 conditions to store a counter record : @@ -813,9 +815,10 @@ return null; } ChangeNumber cn = null; try { ChangeNumber cn = new ChangeNumber( cn = new ChangeNumber( decodeUTF8(key.getData())); if (ReplicationDB.isaCounter(cn)) { @@ -833,9 +836,14 @@ * happen if the database is corrupted. There is not much more that we * can do at this point except trying to continue with the next * record. In such case, it is therefore possible that we miss some * changes. TODO. log an error message. TODO : REPAIR : Such problem * should be handled by the repair functionality. * changes. * TODO : This should be handled by the repair functionality. */ Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD .get(replicationServer.getServerId(), (cn == null ? "" : cn.toString()), e.getMessage()); logError(message); } } return currentChange; @@ -1085,7 +1093,6 @@ /** * Encode the provided counter value in a database entry. * @param entry The provided entry. * @return The database entry with the counter value encoded inside. */ static private DatabaseEntry encodeCounterValue(int value) opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; import org.opends.messages.*; @@ -168,9 +168,6 @@ if (str[0].equals(GENERATION_ID_TAG)) { long generationId; String baseDn; try { // <generationId> @@ -186,7 +183,7 @@ + "<" + str[1] + ">")); } baseDn = str[2]; String baseDn = str[2]; if (debugEnabled()) TRACER.debugInfo( @@ -213,7 +210,7 @@ status = cursor.getFirst(key, data, LockMode.DEFAULT); while (status == OperationStatus.SUCCESS) { String stringData = null; String stringData; try { stringData = new String(data.getData(), "UTF-8"); @@ -234,7 +231,7 @@ String[] str = stringData.split(FIELD_SEPARATOR, 2); if (!str[0].equals(GENERATION_ID_TAG)) { int serverId = -1; int serverId; try { // <serverId> @@ -561,14 +558,14 @@ txn.abort(); } catch(Exception e) {} { /* do nothing */ } } } /** * Get or create a db to manage integer change number associated * to multidomain server state. * TODO:ECL how to manage compatibilty of this db with new domains * TODO:ECL how to manage compatibility of this db with new domains * added or removed ? * @return the retrieved or created db. * @throws DatabaseException when a problem occurs. @@ -583,8 +580,7 @@ DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(true); Database db = dbEnvironment.openDatabase(null, stringId, dbConfig); return db; return dbEnvironment.openDatabase(null, stringId, dbConfig); } } opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; @@ -106,7 +106,7 @@ */ public boolean next() { boolean hasNext = false; boolean hasNext; currentChange = cursor.next(); // can return null opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; @@ -75,7 +75,7 @@ /** * ReplicationServer Listener. This singleton is the main object of the * replication server It waits for the incoming connections and create listener * replication server. It waits for the incoming connections and create listener * and publisher objects for connection with LDAP servers and with replication * servers It is responsible for creating the replication server * replicationServerDomain and managing it @@ -251,7 +251,7 @@ { backendConfigEntryDN = DN.decode( "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config"); } catch (Exception e) {} } catch (Exception e) { /* do nothing */ } // Creates the backend associated to this ReplicationServer // if it does not exist. @@ -293,7 +293,7 @@ listenSocket.getLocalPort()); logError(listenMsg); while ((shutdown == false) && (stopListen == false)) while (!shutdown && !stopListen) { // Wait on the replicationServer port. // Read incoming messages and create LDAP or ReplicationServer listener @@ -365,7 +365,7 @@ { TRACER.debugCaught(DebugLogLevel.ERROR, e); } if (shutdown == false) { if (!shutdown) { Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()); logError(message); @@ -1561,9 +1561,7 @@ public ExternalChangeLogSession createECLSession(StartECLSessionMsg msg) throws DirectoryException { ExternalChangeLogSessionImpl session = new ExternalChangeLogSessionImpl(this, msg); return session; return new ExternalChangeLogSessionImpl(this, msg); } /** @@ -1623,16 +1621,9 @@ { InetAddress localAddr = InetAddress.getLocalHost(); if (localPorts.contains(port) return localPorts.contains(port) && (InetAddress.getByName(hostname).isLoopbackAddress() || InetAddress.getByName(hostname).equals(localAddr))) { return true; } else { return false; } InetAddress.getByName(hostname).equals(localAddr)); } catch (UnknownHostException e) { opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -677,20 +677,14 @@ sourceHandler.sendAck(ack); } else { if (safeDataLevel != (byte) 0) { /** * level > 1 : We need further acks * The message will be posted in assured mode to eligible * servers. The embedded safe data level is not changed, and his * value will be used by a remote RS to determine if he must send * an ack (level > 1) or not (level = 1) */ interestedInAcks = true; } else { // Should never happen } /** * level > 1 : We need further acks * The message will be posted in assured mode to eligible * servers. The embedded safe data level is not changed, and his * value will be used by a remote RS to determine if he must send * an ack (level > 1) or not (level = 1) */ interestedInAcks = true; } } else { // A RS sent us the safe data message, for sure no further ack to wait @@ -1752,7 +1746,7 @@ monitorData.getApproxFirstMissingDate(replicaId), true); } // Add the informations about the Replication Servers // Add the information about the Replication Servers // currently in the topology. it = monitorData.rsIterator(); while (it.hasNext()) opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -70,8 +70,9 @@ * @return Whether the remote server requires encryption or not. * @throws DirectoryException When a problem occurs. */ public boolean processStartFromRemote(ReplServerStartMsg inReplServerStartMsg) throws DirectoryException private boolean processStartFromRemote( ReplServerStartMsg inReplServerStartMsg) throws DirectoryException { try { @@ -358,7 +359,7 @@ { /* Only protocol version above V1 has a phase 2 handshake NOW PROCEDE WITH SECOND PHASE OF HANDSHAKE: NOW PROCEED WITH SECOND PHASE OF HANDSHAKE: TopologyMsg then TopologyMsg (with a RS) wait and process Topo from remote RS */ @@ -524,21 +525,15 @@ // Remote RS sent his topo msg TopologyMsg inTopoMsg = (TopologyMsg) msg; // Store remore RS weight if it has one /* Store remote RS weight if it has one. * For protocol version < 4, use default value of 1 for weight */ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // List should only contain RS info for sender RSInfo rsInfo = inTopoMsg.getRsList().get(0); weight = rsInfo.getWeight(); } else { /* Remote RS uses protocol version prior to 4 : use default value for weight: 1 */ } /* if the remote RS and the local RS have the same genID then it's ok and nothing else to do @@ -569,11 +564,10 @@ private void checkGenerationId() { if (localGenerationId > 0) { // if the local RS is initialized { // the local RS is initialized if (generationId > 0) { // if the remote RS is initialized { // the remote RS is initialized. // If not, there's nothing to do anyway. if (generationId != localGenerationId) { // if the 2 RS have different generationID @@ -621,13 +615,6 @@ } } } else { /* The remote RS has no genId. We don't change anything for the current RS. */ } } else { opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
@@ -23,6 +23,7 @@ * * * Copyright 2008-2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.server; @@ -106,14 +107,10 @@ return false; } else { // Mark this ack received for the server expectedServersAckStatus.put(ackingServerId, true); numReceivedAcks++; if (numReceivedAcks == safeDataLevel) return true; else return false; return numReceivedAcks == safeDataLevel; } } @@ -128,7 +125,7 @@ { // Fill collected errors info ack.setHasTimeout(true); // Tell wich servers did not send an ack in time // Tell which servers did not send an ack in time List<Integer> failedServers = new ArrayList<Integer>(); Set<Integer> serverIds = expectedServersAckStatus.keySet(); serversInTimeout = new ArrayList<Integer>(); // Use next loop to fill it opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.server; @@ -170,11 +170,12 @@ /** * The associated ServerWriter that sends messages to the remote server. */ protected ServerReader reader; protected ServerWriter writer = null; /** * The associated ServerReader that receives messages from the remote server. */ protected ServerWriter writer = null; protected ServerReader reader; // window private int rcvWindow; @@ -366,7 +367,7 @@ session.setSoTimeout(0); } catch(Exception e) { { /* do nothing */ } // sendWindow MUST be created before starting the writer @@ -1148,10 +1149,9 @@ */ public RSInfo toRSInfo() { RSInfo rsInfo = new RSInfo(serverId, serverURL, generationId, groupId, weight); return rsInfo; return new RSInfo(serverId, serverURL, generationId, groupId, weight); } /** opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2008 Sun Microsystems, Inc. * Portions Copyright 2011 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.service; import org.opends.messages.Message; @@ -96,7 +96,7 @@ // queue while ((!shutdown) && ((updateMsg = repDomain.receive()) != null)) { if (repDomain.processUpdate(updateMsg) == true) if (repDomain.processUpdate(updateMsg)) { repDomain.processUpdateDoneSynchronous(updateMsg); } @@ -138,7 +138,7 @@ { int FACTOR = 40; // Wait for 2 seconds before interrupting the thread int n = 0; while ((done == false) && (this.isAlive())) while (!done && this.isAlive()) { Thread.sleep(50); n++; opends/src/server/org/opends/server/replication/service/ReplInputStream.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2008 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.service; @@ -124,10 +125,7 @@ copiedLength = len; } for (int i =0; i<copiedLength; i++) { b[off+i] = bytes[index+i]; } System.arraycopy(bytes, index, b, off, copiedLength); index += copiedLength; if (index == bytes.length) opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1234,7 +1234,7 @@ } else { serverStartMsg = new ServerStartECLMsg(baseDn, 0, 0, 0, 0, serverStartMsg = new ServerStartECLMsg(0, 0, 0, 0, maxRcvWindow, heartbeatInterval, state, ProtocolVersion.getCurrentVersion(), this.getGenerationID(), isSslEncryption, groupId); @@ -2605,13 +2605,14 @@ { synchronized (monitorResponse) { if (monitorResponse.get() == false) if (!monitorResponse.get()) { monitorResponse.wait(10000); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return replicaStates; } opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -23,7 +23,7 @@ * * * Copyright 2008-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2012 ForgeRock AS * Portions Copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.service; @@ -39,6 +39,7 @@ import java.io.OutputStream; import java.net.SocketTimeoutException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -165,8 +166,8 @@ * to be able to correlate all the coming back acks to the original * operation. */ private final SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs = new TreeMap<ChangeNumber, UpdateMsg>(); private final Map<ChangeNumber, UpdateMsg> waitingAckMsgs = new ConcurrentHashMap<ChangeNumber, UpdateMsg>(); /** @@ -243,7 +244,7 @@ // that have not been successfully acknowledged (either because of timeout, // wrong status or error at replay) for a particular server (DS or RS). String // format: <server id>:<number of failed updates> private Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates = new HashMap<Integer,Integer>(); // Number of updates received in Assured Mode, Safe Read request private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0); @@ -264,7 +265,7 @@ // Multiple values allowed: number of updates sent in Assured Mode, Safe Data, // that have not been successfully acknowledged because of timeout for a // particular RS. String format: <server id>:<number of failed updates> private Map<Integer, Integer> assuredSdServerTimeoutUpdates = private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = new HashMap<Integer,Integer>(); /** @@ -278,10 +279,12 @@ /* Status related monitoring fields */ // Indicates the date when the status changed. This may be used to indicate // the date the session with the current replication server started (when // status is NORMAL for instance). All the above assured monitoring fields // are also reset each time the status is changed /** * Indicates the date when the status changed. This may be used to indicate * the date the session with the current replication server started (when * status is NORMAL for instance). All the above assured monitoring fields * are also reset each time the status is changed */ private Date lastStatusChangeDate = new Date(); /** @@ -583,7 +586,7 @@ } /** * Returns informations about the DS server related to the provided serverId. * Returns information about the DS server related to the provided serverId. * based on the TopologyMsg we received when the remote replica connected or * disconnected. Return null when no server with the provided serverId is * connected. @@ -696,8 +699,7 @@ */ public void setURLs(Set<String> referralsUrl) { for (String url : referralsUrl) this.refUrls.add(url); this.refUrls.addAll(referralsUrl); } /** @@ -793,11 +795,12 @@ // Another server is exporting its entries to us InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg; // This must be done while we are still holding the // broker lock because we are now going to receive a // bunch of entries from the remote server and we // want the import thread to catch them and // not the ListenerThread. /* This must be done while we are still holding the broker lock because we are now going to receive a bunch of entries from the remote server and we want the import thread to catch them and not the ListenerThread. */ initialize(initTargetMsg, initTargetMsg.getSenderID()); } else if (msg instanceof ErrorMsg) @@ -805,15 +808,16 @@ ErrorMsg errorMsg = (ErrorMsg)msg; if (ieContext != null) { // This is an error termination for the 2 following cases : // - either during an export // - or before an import really started // For example, when we publish a request and the // replicationServer did not find the import source. // // A remote error during the import will be received in the // receiveEntryBytes() method. // /* This is an error termination for the 2 following cases : - either during an export - or before an import really started For example, when we publish a request and the replicationServer did not find the import source. A remote error during the import will be received in the receiveEntryBytes() method. */ if (debugEnabled()) TRACER.debugInfo( "[IE] processErrorMsg:" + this.serverID + @@ -827,9 +831,11 @@ } else { // Simply log - happen when the ErrorMsg relates to a previous // attempt of initialization while we have started a new one // on this side. /* Simply log - happen when the ErrorMsg relates to a previous attempt of initialization while we have started a new one on this side. */ logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails())); } } @@ -864,15 +870,17 @@ { // just retry } // Test if we have received and export request message and // if that's the case handle it now. // This must be done outside of the portion of code protected // by the broker lock so that we keep receiveing update // when we are doing and export and so that a possible // closure of the socket happening when we are publishing the // entries to the remote can be handled by the other // replay thread when they call this method and therefore the // broker.receive() method. /* Test if we have received and export request message and if that's the case handle it now. This must be done outside of the portion of code protected by the broker lock so that we keep receiving update when we are doing and export and so that a possible closure of the socket happening when we are publishing the entries to the remote can be handled by the other replay thread when they call this method and therefore the broker.receive() method. */ if (initReqMsg != null) { // Do this work in a thread to allow replay thread continue working @@ -898,8 +906,8 @@ * particular server in the list. This increments the counter of error for the * passed server, or creates an initial value of 1 error for it if the server * is not yet present in the map. * @param errorList * @param sid * @param errorsByServer map of number of errors per serverID * @param sid the ID of the server which produced an error */ private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer, Integer sid) @@ -916,7 +924,7 @@ { // Server already present in list, just increment number of // errors for the server int val = serverErrCount.intValue(); int val = serverErrCount; val++; errorsByServer.put(sid, val); } @@ -935,10 +943,7 @@ // Remove the message for pending ack list (this may already make the thread // that is waiting for the ack be aware of its reception) synchronized (waitingAckMsgs) { update = waitingAckMsgs.remove(changeNumber); } update = waitingAckMsgs.remove(changeNumber); // Signal waiting thread ack has been received if (update != null) @@ -957,8 +962,10 @@ if ( hasTimeout || hasReplayErrors || hasWrongStatus) { // Some problems detected: message not correclty reached every requested // servers. Log problem /* Some problems detected: message did not correctly reach every requested servers. Log problem */ Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get( serviceID, Integer.toString(serverID), update.toString(), ack.errorsToString()); @@ -1021,27 +1028,6 @@ } } /** * Retrieves a replication domain based on the baseDn. * * @param serviceID The identifier of the domain to retrieve. * * @return The domain retrieved. * * @throws DirectoryException When an error occurred or no domain * match the provided baseDn. */ static ReplicationDomain retrievesReplicationDomain(String serviceID) throws DirectoryException { ReplicationDomain replicationDomain = domains.get(serviceID); if (replicationDomain == null) { throw new DirectoryException(ResultCode.OTHER, ERR_NO_MATCHING_DOMAIN.get(serviceID)); } return replicationDomain; } /* * After this point the code is related to the Total Update. @@ -1051,7 +1037,7 @@ * This thread is launched when we want to export data to another server. * * When a task is created locally (so this local server is the initiator) * of the export (Exemple: dsreplication initialize-all), * of the export (Example: dsreplication initialize-all), * this thread is NOT used but the task thread is running the export instead). */ private class ExportThread extends DirectoryThread @@ -1095,9 +1081,11 @@ initWindow); } catch (DirectoryException de) { // An error message has been sent to the peer // This server is not the initiator of the export so there is // nothing more to do locally. /* An error message has been sent to the peer This server is not the initiator of the export so there is nothing more to do locally. */ } if (debugEnabled()) @@ -1211,29 +1199,6 @@ /** * Update the counters of the task for each entry processed during * an import or export. * @throws DirectoryException if an error occurred. */ public void updateCounters() throws DirectoryException { entryLeftCount--; if (initializeTask != null) { if (initializeTask instanceof InitializeTask) { ((InitializeTask)initializeTask).setLeft(entryLeftCount); } else if (initializeTask instanceof InitializeTargetTask) { ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount); } } } /** * Update the counters of the task for each entry processed during * an import or export. * * @param entriesDone The number of entries that were processed * since the last time this method was called. @@ -1379,7 +1344,7 @@ * on this server, and the {@code importBackend(InputStream)} * will be called on the remote server. * <p> * The InputStream and OutpuStream given as a parameter to those * The InputStream and OutputStream given as a parameter to those * methods will be connected through the replication protocol. * * @param target The server-id of the server that should be initialized. @@ -1394,10 +1359,7 @@ public void initializeRemote(int target, Task initTask) throws DirectoryException { initializeRemote(target, this.serverID, initTask, this.initWindow); } /** @@ -1426,10 +1388,12 @@ // Acquire and initialize the export context acquireIEContext(false); // We manage the list of servers to initialize in order : // - to test at the end that all expected servers have reconnected // after their import and with the right genId // - to update the task with the server(s) where this test failed /* We manage the list of servers to initialize in order : - to test at the end that all expected servers have reconnected after their import and with the right genId - to update the task with the server(s) where this test failed */ if (serverToInitialize == RoutableMsg.ALL_SERVERS) { @@ -1526,14 +1490,15 @@ { try { // Handling the errors during export /* Handling the errors during export // Note: we could have lost the connection and another thread // the listener one) has already managed to reconnect. // So we MUST rely on the test broker.isConnected() // ONLY to do 'wait to be reconnected by another thread' // (if not yet reconnected already). Note: we could have lost the connection and another thread the listener one) has already managed to reconnect. So we MUST rely on the test broker.isConnected() ONLY to do 'wait to be reconnected by another thread' (if not yet reconnected already). */ if (!broker.isConnected()) { // We are still disconnected, so we wait for the listener thread @@ -1550,14 +1515,16 @@ if ((initTask != null) && broker.isConnected() && (serverToInitialize != RoutableMsg.ALL_SERVERS)) { // NewAttempt case : In the case where // - it's not an InitializeAll // - AND the previous export attempt failed // - AND we are (now) connected // - and we own the task and this task is not an InitializeAll // Let's : // - sleep to let time to the other peer to reconnect if needed // - and launch another attempt /* NewAttempt case : In the case where - it's not an InitializeAll - AND the previous export attempt failed - AND we are (now) connected - and we own the task and this task is not an InitializeAll Let's : - sleep to let time to the other peer to reconnect if needed - and launch another attempt */ try { Thread.sleep(1000); } catch(Exception e){} logError(NOTE_RESENDING_INIT_TARGET.get( exportRootException.getLocalizedMessage())); @@ -1632,8 +1599,7 @@ int waitResultAttempt = 0; Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0); for (Integer sid : ieContext.startList) replicasWeAreWaitingFor.add(sid); replicasWeAreWaitingFor.addAll(ieContext.startList); if (debugEnabled()) TRACER.debugInfo( @@ -1657,8 +1623,11 @@ { // this one is still not doing the Full Update ... retry later done = false; try { Thread.sleep(100); } catch (InterruptedException e) {} try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } waitResultAttempt++; break; } @@ -1673,8 +1642,7 @@ while ((!done) && (waitResultAttempt<1200) // 2mn && (!broker.shuttingDown())); ieContext.failureList.addAll( Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0]))); ieContext.failureList.addAll(replicasWeAreWaitingFor); if (debugEnabled()) TRACER.debugInfo( @@ -1697,9 +1665,11 @@ TRACER.debugInfo( "[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor); // In case some new servers appear during the init, we want them to be // considered in the processing of sorting the successfully initialized // and the others /* In case some new servers appear during the init, we want them to be considered in the processing of sorting the successfully initialized and the others */ for (DSInfo dsi : getReplicasList()) replicasWeAreWaitingFor.add(dsi.getDsId()); @@ -1709,22 +1679,25 @@ done = true; short reconnectMaxDelayInSec = 10; short reconnectWait = 0; Integer[] servers = replicasWeAreWaitingFor.toArray(new Integer[0]); for (int serverId : servers) for (int serverId : replicasWeAreWaitingFor) { if (ieContext.failureList.contains(serverId)) { // this server has already been in error during initialization // dont't wait for it /* this server has already been in error during initialization don't wait for it */ continue; } DSInfo dsInfo = isRemoteDSConnected(serverId); if (dsInfo == null) { // this server is disconnected // may be for a long time if it crashed or had been stopped // may be just the time to reconnect after import : should be short /* this server is disconnected may be for a long time if it crashed or had been stopped may be just the time to reconnect after import : should be short */ if (++reconnectWait<reconnectMaxDelayInSec) { // let's still wait to give a chance to this server to reconnect @@ -1764,8 +1737,7 @@ } while ((!done) && (!broker.shuttingDown())); // infinite wait ieContext.failureList.addAll( Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0]))); ieContext.failureList.addAll(replicasWeAreWaitingFor); if (debugEnabled()) TRACER.debugInfo( @@ -1839,8 +1811,10 @@ } else { // When we are the exporter in the case of initializeAll // exporting must not be stopped on the first error. /* When we are the exporter in the case of initializeAll exporting must not be stopped on the first error. */ } } } @@ -1889,7 +1863,7 @@ } } // Check good sequentiality of msg received // Check good ordering of msg received if (msg instanceof EntryMsg) { EntryMsg entryMsg = (EntryMsg)msg; @@ -1899,7 +1873,7 @@ if (ieContext.exporterProtocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // check the msgCnt of the msg received to check sequenciality // check the msgCnt of the msg received to check ordering if (++ieContext.msgCnt != entryMsg.getMsgId()) { if (ieContext.getException() == null) @@ -1928,16 +1902,20 @@ } else if (msg instanceof DoneMsg) { // This is the normal termination of the import // No error is stored and the import is ended // by returning null /* This is the normal termination of the import No error is stored and the import is ended by returning null */ return null; } else if (msg instanceof ErrorMsg) { // This is an error termination during the import // The error is stored and the import is ended // by returning null /* This is an error termination during the import The error is stored and the import is ended by returning null */ if (ieContext.getException() == null) { ErrorMsg errMsg = (ErrorMsg)msg; @@ -1971,7 +1949,6 @@ } catch(Exception e) { // TODO: i18n if (ieContext.getException() == null) ieContext.setException(new DirectoryException(ResultCode.OTHER, ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage()))); @@ -1984,7 +1961,7 @@ * This is based on the hypothesis that the entries are separated * by a "\n\n" String. * * @param entryBytes * @param entryBytes the set of bytes containing one or more entries. * @return The number of entries in the provided byte[]. */ private int countEntryLimits(byte[] entryBytes) @@ -1997,7 +1974,7 @@ * This is based on the hypothesis that the entries are separated * by a "\n\n" String. * * @param entryBytes * @param entryBytes the set of bytes containing one or more entries. * @return The number of entries in the provided byte[]. */ private int countEntryLimits(byte[] entryBytes, int pos, int length) @@ -2029,7 +2006,8 @@ throws IOException { if (debugEnabled()) TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry); TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + Arrays.toString(lDIFEntry)); // build the message EntryMsg entryMessage = new EntryMsg( @@ -2039,9 +2017,11 @@ // Waiting the slowest loop while (!broker.shuttingDown()) { // If an error was raised - like receiving an ErrorMsg from a remote // server that have been stored by the listener thread in the ieContext, // we just abandon the export by throwing an exception. /* If an error was raised - like receiving an ErrorMsg from a remote server that have been stored by the listener thread in the ieContext, we just abandon the export by throwing an exception. */ if (ieContext.getException() != null) throw(new IOException(ieContext.getException().getMessage())); @@ -2094,7 +2074,8 @@ } // Waiting the slowest loop if (debugEnabled()) TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry); TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + Arrays.toString(lDIFEntry)); // publish the message boolean sent = broker.publish(entryMessage, false); @@ -2212,18 +2193,22 @@ errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID()); } // We must not test here whether the remote source is connected to // the topology by testing if it stands in the replicas list since. // In the case of a re-attempt of initialization, the listener thread is // running this method directly coming from initailize() method and did // not processed any topology message in between the failure and the // new attempt. /* We must not test here whether the remote source is connected to the topology by testing if it stands in the replicas list since. In the case of a re-attempt of initialization, the listener thread is running this method directly coming from initialize() method and did not processed any topology message in between the failure and the new attempt. */ try { // We must immediatly acquire a context to store the task inside // The context will be used when we (the listener thread) will receive // the InitializeTargetMsg, process the import, and at the end // update the task. /* We must immediately acquire a context to store the task inside The context will be used when we (the listener thread) will receive the InitializeTargetMsg, process the import, and at the end update the task. */ acquireIEContext(true); //test and set if no import already in progress ieContext.initializeTask = initTask; @@ -2234,11 +2219,13 @@ // Publish Init request msg broker.publish(ieContext.initReqMsgSent); // The normal success processing is now to receive InitTargetMsg then // entries from the remote server. // The error cases are : // - either local error immediatly caught below // - a remote error we will receive as an ErrorMsg /* The normal success processing is now to receive InitTargetMsg then entries from the remote server. The error cases are : - either local error immediately caught below - a remote error we will receive as an ErrorMsg */ } catch(DirectoryException de) { @@ -2272,15 +2259,15 @@ * * @param initTargetMsgReceived The message received from the remote server. * * @param requestorServerId The serverId of the server that requested the * @param requesterServerId The serverId of the server that requested the * initialization meaning the server where the * task has initially been created (this server, * or the remote server). */ void initialize(InitializeTargetMsg initTargetMsgReceived, int requestorServerId) int requesterServerId) { InitializeTask initFromtask = null; InitializeTask initFromTask = null; if (debugEnabled()) TRACER.debugInfo("[IE] Entering initialize - domain=" + this); @@ -2300,16 +2287,20 @@ // Acquire an import context if no already done (and initialize). if (initTargetMsgReceived.getInitiatorID() == this.serverID) { // The initTargetMsgReceived received is the answer to a request that // we (this server) sent previously. In this case, so the IEContext // has been already acquired when the request was published in order // to store the task (to be updated with the status at the end). /* The initTargetMsgReceived received is the answer to a request that we (this server) sent previously. In this case, so the IEContext has been already acquired when the request was published in order to store the task (to be updated with the status at the end). */ } else { // The initTargetMsgReceived is for an import initiated by the remote // server. // Test and set if no import already in progress /* The initTargetMsgReceived is for an import initiated by the remote server. Test and set if no import already in progress */ acquireIEContext(true); } @@ -2319,16 +2310,18 @@ ieContext.initWindow = initTargetMsgReceived.getInitWindow(); // Protocol version is -1 when not known. ieContext.exporterProtocolVersion = getProtocolVersion(source); initFromtask = (InitializeTask)ieContext.initializeTask; initFromTask = (InitializeTask)ieContext.initializeTask; // Lauch the import // Launch the import importBackend(new ReplInputStream(this)); } catch (DirectoryException e) { // Store the exception raised. It will be considered if no other exception // has been previously stored in the context /* Store the exception raised. It will be considered if no other exception has been previously stored in the context */ if (ieContext.getException() == null) ieContext.setException(e); } @@ -2339,30 +2332,37 @@ + " ends import with exception=" + ieContext.getException() + " connected=" + broker.isConnected()); // It is necessary to restart (reconnect to RS) for different reasons // - when everything went well, reconnect in order to exchange // new state, new generation ID // - when we have connection failure, reconnect to retry a new import // right here, right now // we never want retryOnFailure if we fails reconnecting in the restart. /* It is necessary to restart (reconnect to RS) for different reasons - when everything went well, reconnect in order to exchange new state, new generation ID - when we have connection failure, reconnect to retry a new import right here, right now we never want retryOnFailure if we fails reconnecting in the restart. */ broker.reStart(false); if (ieContext.getException() != null) { if (broker.isConnected() && (initFromtask != null) if (broker.isConnected() && (initFromTask != null) && (++ieContext.attemptCnt<2)) { // Worth a new attempt // since initFromtask is in this server, connection is ok /* Worth a new attempt since initFromTask is in this server, connection is ok */ try { // Wait for the exporter to stabilize - eventually reconnect as // well if it was connected to the same RS than the one we lost ... /* Wait for the exporter to stabilize - eventually reconnect as well if it was connected to the same RS than the one we lost ... */ Thread.sleep(1000); // Restart the whole import protocol exchange by sending again // the request /* Restart the whole import protocol exchange by sending again the request */ logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get( ieContext.getException().getLocalizedMessage())); @@ -2378,8 +2378,10 @@ } catch(Exception e) { // An error occurs when sending a new request for a new import. // This error is not stored, prefering to keep the initial one. /* An error occurs when sending a new request for a new import. This error is not stored, prefering to keep the initial one. */ logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get( e.getLocalizedMessage(), ieContext.getException().getLocalizedMessage())); @@ -2394,7 +2396,7 @@ TRACER.debugInfo("[IE] Domain=" + this + " ends initialization with exception=" + ieContext.getException() + " connected=" + broker.isConnected() + " task=" + initFromtask + " task=" + initFromTask + " attempt=" + ieContext.attemptCnt); try @@ -2402,24 +2404,28 @@ if (broker.isConnected() && (ieContext.getException() != null)) { // Let's notify the exporter ErrorMsg errorMsg = new ErrorMsg(requestorServerId, ErrorMsg errorMsg = new ErrorMsg(requesterServerId, ieContext.getException().getMessageObject()); broker.publish(errorMsg); } else // !broker.isConnected() { // Don't try to reconnect here. // The current running thread is the listener thread and will loop on // receive() that is expected to manage reconnects attempt. /* Don't try to reconnect here. The current running thread is the listener thread and will loop on receive() that is expected to manage reconnects attempt. */ } // Update the task that initiated the import must be the last thing. // Particularly, broker.restart() after import success must be done // before some other operations/tasks to be launched, // like resetting the generation ID. if (initFromtask != null) /* Update the task that initiated the import must be the last thing. Particularly, broker.restart() after import success must be done before some other operations/tasks to be launched, like resetting the generation ID. */ if (initFromTask != null) { initFromtask.updateTaskCompletionState(ieContext.getException()); initFromTask.updateTaskCompletionState(ieContext.getException()); } } finally @@ -2435,10 +2441,10 @@ } /** * Return the protocol version of the DS related to the provided serverid. * Return the protocol version of the DS related to the provided serverId. * Returns -1 when the protocol version is not known. * @param dsServerId The provided serverid. * @return The procotol version. * @param dsServerId The provided serverId. * @return The protocol version. */ short getProtocolVersion(int dsServerId) { @@ -2515,11 +2521,11 @@ private void checkGenerationID(long generationID) throws DirectoryException { boolean allset = true; boolean allSet = true; for (int i = 0; i< 50; i++) { allset = true; allSet = true; for (RSInfo rsInfo : getRsList()) { // the 'empty' RSes (generationId==-1) are considered as good citizens @@ -2532,16 +2538,16 @@ } catch (InterruptedException e) { } allset = false; allSet = false; break; } } if (allset) if (allSet) { break; } } if (!allset) if (!allSet) { ResultCode resultCode = ResultCode.OTHER; Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID); @@ -2735,9 +2741,11 @@ */ void processUpdateDoneSynchronous(UpdateMsg msg) { // Warning: in synchronous mode, no way to tell the replay of an update went // wrong Just put null in processUpdateDone so that if assured replication // is used the ack is sent without error at replay flag. /* Warning: in synchronous mode, no way to tell the replay of an update went wrong Just put null in processUpdateDone so that if assured replication is used the ack is sent without error at replay flag. */ processUpdateDone(msg, null); state.update(msg.getChangeNumber()); } @@ -2749,10 +2757,7 @@ */ public boolean isConnected() { if (broker != null) return broker.isConnected(); else return false; return broker != null && broker.isConnected(); } /** @@ -2764,10 +2769,7 @@ */ public boolean hasConnectionError() { if (broker != null) return broker.hasConnectionError(); else return true; return broker == null || broker.hasConnectionError(); } /** @@ -2852,24 +2854,16 @@ /** * Gets the number of updates sent in assured safe read mode that have not * been acknowledged per server. * @return The number of updates sent in assured safe read mode that have not * been acknowledged per server. * @return A copy of the map that contains the number of updates sent in * assured safe read mode that have not been acknowledged per server. */ public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates() { // Clone a snapshot with synchronized section to have a consistent view in // monitoring Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>(); synchronized(assuredSrServerNotAcknowledgedUpdates) { Set<Integer> keySet = assuredSrServerNotAcknowledgedUpdates.keySet(); for (Integer serverId : keySet) { Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId); snapshot.put(serverId, i); } return new HashMap<Integer, Integer>( assuredSrServerNotAcknowledgedUpdates); } return snapshot; } /** @@ -2937,24 +2931,16 @@ /** * Gets the number of updates sent in assured safe data mode that have not * been acknowledged due to timeout error per server. * @return The number of updates sent in assured safe data mode that have not * been acknowledged due to timeout error per server. * @return A copy of the map that contains the number of updates sent in * assured safe data mode that have not been acknowledged due to timeout * error per server. */ public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates() { // Clone a snapshot with synchronized section to have a consistent view in // monitoring Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>(); synchronized(assuredSdServerTimeoutUpdates) { Set<Integer> keySet = assuredSdServerTimeoutUpdates.keySet(); for (Integer serverId : keySet) { Integer i = assuredSdServerTimeoutUpdates.get(serverId); snapshot.put(serverId, i); } return new HashMap<Integer, Integer>(assuredSdServerTimeoutUpdates); } return snapshot; } /** @@ -2981,14 +2967,20 @@ assuredSrTimeoutUpdates = new AtomicInteger(0); assuredSrWrongStatusUpdates = new AtomicInteger(0); assuredSrReplayErrorUpdates = new AtomicInteger(0); assuredSrServerNotAcknowledgedUpdates = new HashMap<Integer,Integer>(); synchronized (assuredSrServerNotAcknowledgedUpdates) { assuredSrServerNotAcknowledgedUpdates.clear(); } assuredSrReceivedUpdates = new AtomicInteger(0); assuredSrReceivedUpdatesAcked = new AtomicInteger(0); assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0); assuredSdSentUpdates = new AtomicInteger(0); assuredSdAcknowledgedUpdates = new AtomicInteger(0); assuredSdTimeoutUpdates = new AtomicInteger(0); assuredSdServerTimeoutUpdates = new HashMap<Integer,Integer>(); synchronized (assuredSdServerTimeoutUpdates) { assuredSdServerTimeoutUpdates.clear(); } } /* @@ -3080,8 +3072,10 @@ { synchronized (sessionLock) { // Stop the broker first in order to prevent the listener from // reconnecting - see OPENDJ-457. /* Stop the broker first in order to prevent the listener from reconnecting - see OPENDJ-457. */ if (broker != null) { broker.stop(); @@ -3263,8 +3257,10 @@ { broker.updateWindowAfterReplay(); // Send an ack if it was requested and the group id is the same of the RS // one. Only Safe Read mode makes sense in DS for returning an ack. /* Send an ack if it was requested and the group id is the same of the RS one. Only Safe Read mode makes sense in DS for returning an ack. */ byte rsGroupId = broker.getRsGroupId(); if (msg.isAssured()) { @@ -3282,9 +3278,9 @@ if (replayErrorMsg != null) { // Mark the error in the ack // -> replay error occured // -> replay error occurred ackMsg.setHasReplayError(true); // -> replay error occured in our server // -> replay error occurred in our server List<Integer> idList = new ArrayList<Integer>(); idList.add(serverID); ackMsg.setFailedServers(idList); @@ -3306,8 +3302,10 @@ logError(errorMsg); } else { // In safe data mode assured update that comes up to a DS requires no // ack from a destinator DS. Safe data mode is based on RS acks only /* In safe data mode assured update that comes up to a DS requires no ack from a recipient DS. Safe data mode is based on RS acks only */ } } } @@ -3343,7 +3341,7 @@ * If assured configured, set message accordingly to request an ack in the * right assured mode. * No ack requested for a RS with a different group id. Assured * replication suported for the same locality, i.e: a topology working in * replication supported for the same locality, i.e: a topology working in * the same * geographical location). If we are connected to a RS which is not in our * locality, no need to ask for an ack. @@ -3355,12 +3353,11 @@ if (assuredMode == AssuredMode.SAFE_DATA_MODE) msg.setSafeDataLevel(assuredSdLevel); // Add the assured message to the list of update that are // waiting for acks synchronized (waitingAckMsgs) { waitingAckMsgs.put(msg.getChangeNumber(), msg); } /* Add the assured message to the list of update that are waiting for acks */ waitingAckMsgs.put(msg.getChangeNumber(), msg); } } @@ -3410,8 +3407,10 @@ { try { // WARNING: this timeout may be difficult to optimize: too low, it // may use too much CPU, too high, it may penalize performance... /* WARNING: this timeout may be difficult to optimize: too low, it may use too much CPU, too high, it may penalize performance... */ msg.wait(10); } catch (InterruptedException e) { @@ -3425,14 +3424,13 @@ // Timeout ? if ( (System.currentTimeMillis() - startTime) >= assuredTimeout ) { // Timeout occured, be sure that ack is not being received and if so, // remove the update from the wait list, log the timeout error and // also update assured monitoring counters /* Timeout occurred, be sure that ack is not being received and if so, remove the update from the wait list, log the timeout error and also update assured monitoring counters */ UpdateMsg update; synchronized (waitingAckMsgs) { update = waitingAckMsgs.remove(cn); } update = waitingAckMsgs.remove(cn); if (update != null) { @@ -3490,9 +3488,9 @@ } /** * Publish informations to the Replication Service (not assured mode). * Publish information to the Replication Service (not assured mode). * * @param msg The byte array containing the informations that should * @param msg The byte array containing the information that should * be sent to the remote entities. */ public void publish(byte[] msg) @@ -3501,10 +3499,11 @@ synchronized (this) { update = new UpdateMsg(generator.newChangeNumber(), msg); // If assured replication is configured, this will prepare blocking // mechanism. If assured replication is disabled, this returns // immediately /* If assured replication is configured, this will prepare blocking mechanism. If assured replication is disabled, this returns immediately */ prepareWaitForAckIfAssuredEnabled(update); publish(update); @@ -3512,16 +3511,18 @@ try { // If assured replication is enabled, this will wait for the matching // ack or time out. If assured replication is disabled, this returns // immediately /* If assured replication is enabled, this will wait for the matching ack or time out. If assured replication is disabled, this returns immediately */ waitForAckIfAssuredEnabled(update); } catch (TimeoutException ex) { // This exception may only be raised if assured replication is // enabled Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString( assuredTimeout), msg.toString()); assuredTimeout), update.toString()); logError(errorMsg); } } @@ -3557,10 +3558,7 @@ */ public boolean importInProgress() { if (ieContext == null) return false; else return ieContext.importInProgress; return ieContext != null && ieContext.importInProgress; } /** @@ -3572,10 +3570,7 @@ */ public boolean exportInProgress() { if (ieContext == null) return false; else return !ieContext.importInProgress; return ieContext != null && !ieContext.importInProgress; } /** opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
@@ -23,26 +23,20 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2013 ForgeRock AS. */ package org.opends.server.replication.plugin; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import org.opends.server.TestCaseUtils; import org.opends.server.core.AddOperationBasis; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ChangeNumberGenerator; import org.opends.server.replication.common.ServerState; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.ResultCode; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING; import static org.testng.Assert.assertEquals; /** * Test the PersistentServerState class. @@ -65,11 +59,11 @@ * retrieve ServerState to persistent storage. */ @Test(dataProvider = "suffix") public void persistenServerStateTest(String dn) public void persistentServerStateTest(String dn) throws Exception { /* * Create a new PersitentServerState, * Create a new PersistentServerState, * update it with 2 new ChangeNumbers with 2 different server Ids * save it * @@ -109,67 +103,4 @@ "cn1 has not been saved after clear for " + dn); } /** * Ensures that the Directory Server is able to * translate a ruv entry to a sever state. * * @throws Exception If an unexpected problem occurs. */ @SuppressWarnings("unchecked") @Test public void translateRuvEntryTest() throws Exception { LDAPReplicationDomain replDomain = null; try { String RuvString = "dn: nsuniqueid=ffffffff-ffffffff-ffffffff-ffffffff, o=test\n" +"objectClass: top\n" +"objectClass: ldapsubentry\n" +"objectClass: extensibleobject\n" +"nsds50ruv: {replicageneration} 49098853000000010000\n" +"nsds50ruv: {replica 3 ldap://kawax:3389} 491d517b000000030000 " +"491d564a000000030000\n" +"nsds50ruv: {replica 1 ldap://kawax:1389} 490989e8000000010000 " +"490989e8000000010000\n" +"ds6ruv: {PRIO 3 ldap://kawax:3389}\n" +"ds6ruv: {PRIO 1 ldap://kawax:1389}\n" +"entryUUID: ffffffff-ffff-ffff-ffff-ffffffffffff\n"; Entry RuvEntry = TestCaseUtils.entryFromLdifString(RuvString); AddOperationBasis addOp = new AddOperationBasis(InternalClientConnection. getRootConnection(), InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), null, RuvEntry.getDN(), RuvEntry.getObjectClasses(), RuvEntry.getUserAttributes(), RuvEntry.getOperationalAttributes()); addOp.setInternalOperation(true); addOp.run(); assertTrue(addOp.getResultCode() == ResultCode.SUCCESS); DomainFakeCfg domainConf = new DomainFakeCfg("o=test", 1, "localhost:3389"); replDomain = MultimasterReplication.createNewDomain(domainConf); replDomain.start(); // Then check serverSate and GenId assertTrue(replDomain.getGenerationID() == 1225361491); ServerState state = replDomain.getServerState(); assertTrue(state.getMaxChangeNumber( 1). compareTo(new ChangeNumber("0000011d4d42b240000100000000")) == 0); assertTrue(state.getMaxChangeNumber( 3). compareTo(new ChangeNumber("0000011d9a991110000300000000")) == 0); } finally { if (replDomain != null) MultimasterReplication.deleteDomain(DN.decode("o=test")); } } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2011-2012 ForgeRock AS * Portions copyright 2011-2013 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -681,21 +681,21 @@ ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), 1234567, 45678); ArrayList<Integer> fservers1 = new ArrayList<Integer>(); fservers1.add(new Integer(12345)); fservers1.add(new Integer(-12345)); fservers1.add(new Integer(31657)); fservers1.add(new Integer(-28456)); fservers1.add(new Integer(0)); fservers1.add(12345); fservers1.add(-12345); fservers1.add(31657); fservers1.add(-28456); fservers1.add(0); ArrayList<Integer> fservers2 = new ArrayList<Integer>(); ArrayList<Integer> fservers3 = new ArrayList<Integer>(); fservers3.add(new Integer(0)); fservers3.add(0); ArrayList<Integer> fservers4 = new ArrayList<Integer>(); fservers4.add(new Integer(100)); fservers4.add(new Integer(2000)); fservers4.add(new Integer(30000)); fservers4.add(new Integer(-100)); fservers4.add(new Integer(-2000)); fservers4.add(new Integer(-30000)); fservers4.add(100); fservers4.add(2000); fservers4.add(30000); fservers4.add(-100); fservers4.add(-2000); fservers4.add(-30000); return new Object[][] { {cn1, true, false, false, fservers1}, @@ -1421,7 +1421,7 @@ public void startECLMsgTest(int serverId, String baseDN, int window, ServerState state, long genId, boolean sslEncryption, byte groupId) throws Exception { ServerStartECLMsg msg = new ServerStartECLMsg(baseDN, ServerStartECLMsg msg = new ServerStartECLMsg( window, window, window, window, window, window, state, ProtocolVersion.getCurrentVersion(), genId, sslEncryption, groupId); ServerStartECLMsg newMsg = new ServerStartECLMsg(msg.getBytes()); @@ -1447,13 +1447,13 @@ { // data ChangeNumber changeNumber = new ChangeNumber(TimeThread.getTime(), 123, 45); String generalizedState = new String("fakegenstate"); String generalizedState = "fakegenstate"; ServerState state = new ServerState(); assertTrue(state.update(new ChangeNumber((long)75, 5,263))); short mode = 3; int firstDraftChangeNumber = 13; int lastDraftChangeNumber = 14; String myopid = new String("fakeopid"); String myopid = "fakeopid"; // create original StartECLSessionMsg msg = new StartECLSessionMsg(); msg.setChangeNumber(changeNumber);