From 791840ef10ecb9f25b4c3b97eacbf848bf75a261 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Wed, 24 Apr 2013 12:44:51 +0000
Subject: [PATCH] Replication Cleanup.
---
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java | 19
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrHistoricalSingle.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 293 +++----
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 32
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 24
opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java | 9
opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java | 30
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 25
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java | 24
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java | 15
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java | 13
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java | 83 --
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationRepairRequestControl.java | 10
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplInputStream.java | 6
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java | 353 +-------
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java | 3
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java | 20
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java | 24
opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 12
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java | 6
opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java | 44
opendj-sdk/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java | 13
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrValueHistorical.java | 6
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 589 +++++++-------
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartMsg.java | 7
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java | 22
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 14
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 48
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java | 16
opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java | 6
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java | 33
opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java | 13
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java | 24
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 19
opendj-sdk/opends/src/messages/messages/replication.properties | 2
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalAttributeValue.java | 14
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java | 18
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNData.java | 13
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java | 12
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java | 126 --
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java | 13
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java | 61 -
opendj-sdk/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java | 9
opendj-sdk/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrHistoricalMultiple.java | 10
opendj-sdk/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java | 7
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java | 6
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java | 16
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AckMsg.java | 31
59 files changed, 818 insertions(+), 1,426 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 0f6a92d..066a26f 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -541,3 +541,5 @@
replication server in the topology and distribute load more equally
SEVERE_WARN_INVALID_SYNC_HIST_VALUE_214=The attribute value '%s' is not a valid \
synchronization history value
+SEVERE_ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD_215=Replication server RS(%d) \
+ failed to parse change record with changenumber %s from the database. Error: %s
\ No newline at end of file
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
index 03315d3..27347b7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.common;
@@ -121,12 +122,9 @@
if (obj instanceof ChangeNumber)
{
ChangeNumber cn = (ChangeNumber) obj;
- if ((this.seqnum == cn.seqnum) &&
- (this.serverId == cn.serverId) &&
- (this.timeStamp == cn.timeStamp) )
- return true;
- else
- return false;
+ return this.seqnum == cn.seqnum &&
+ this.serverId == cn.serverId &&
+ this.timeStamp == cn.timeStamp;
}
else
return false;
@@ -274,10 +272,7 @@
*/
public Boolean older(ChangeNumber CN)
{
- if (compare(this, CN) < 0)
- return true;
-
- return false;
+ return compare(this, CN) < 0;
}
/**
@@ -288,10 +283,7 @@
*/
public Boolean olderOrEqual(ChangeNumber CN)
{
- if (compare(this, CN) <= 0)
- return true;
-
- return false;
+ return compare(this, CN) <= 0;
}
/**
@@ -301,10 +293,7 @@
*/
public boolean newerOrEquals(ChangeNumber CN)
{
- if (compare(this, CN) >= 0)
- return true;
-
- return false;
+ return compare(this, CN) >= 0;
}
/**
@@ -314,10 +303,7 @@
*/
public boolean newer(ChangeNumber CN)
{
- if (compare(this, CN) > 0)
- return true;
-
- return false;
+ return compare(this, CN) > 0;
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
index 1219700..632457d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.common;
@@ -172,8 +172,7 @@
AttributeValues.create(
ByteString.valueOf(first),
ByteString.valueOf(first));
- Set<AttributeValue> values=Collections.singleton(value);
- return values;
+ return Collections.singleton(value);
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
index 5efb18f..0afcee7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.common;
@@ -171,8 +171,7 @@
AttributeValues.create(
ByteString.valueOf(last),
ByteString.valueOf(last));
- Set<AttributeValue> values =Collections.singleton(value);
- return values;
+ return Collections.singleton(value);
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrHistoricalMultiple.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrHistoricalMultiple.java
index e406df3..43e9e37 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrHistoricalMultiple.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrHistoricalMultiple.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -105,13 +105,13 @@
* Duplicate an object.
* ChangeNumber are duplicated by references
* @return the duplicated object.
+ *
+ * Method only called in tests
*/
AttrHistoricalMultiple duplicate()
{
- AttrHistoricalMultiple dup =
- new AttrHistoricalMultiple(this.deleteTime, this.lastUpdateTime,
- this.valuesHist);
- return dup;
+ return new AttrHistoricalMultiple(this.deleteTime, this.lastUpdateTime,
+ this.valuesHist);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrHistoricalSingle.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrHistoricalSingle.java
index b775d66..8cc59ea 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrHistoricalSingle.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrHistoricalSingle.java
@@ -158,9 +158,7 @@
case DELETE:
if (changeNumber.newer(addTime))
{
- if ((newValue == null) ||
- ((newValue != null) && (newValue.equals(value))) ||
- (value == null))
+ if (newValue == null || newValue.equals(value) || value == null)
{
if (changeNumber.newer(deleteTime))
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrValueHistorical.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrValueHistorical.java
index bf3d14d..ca2bf84 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrValueHistorical.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/AttrValueHistorical.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -118,9 +119,6 @@
*/
public boolean isUpdate()
{
- if (valueUpdateTime != null)
- return true;
- else
- return false;
+ return valueUpdateTime != null;
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalAttributeValue.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalAttributeValue.java
index 8948cf4..0b27028 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalAttributeValue.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalAttributeValue.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -196,15 +197,6 @@
}
/**
- * Get the String form of the attribute.
- * @return The String form of the attribute.
- */
- public String getStringValue()
- {
- return stringValue;
- }
-
- /**
* Get the Attribute Value.
* @return The Attribute Value.
*/
@@ -261,7 +253,7 @@
*/
public boolean isADDOperation()
{
- return ((attrType == null) && (ismodDN == false));
+ return attrType == null && !ismodDN;
}
/**
@@ -273,6 +265,6 @@
*/
public boolean isMODDNOperation()
{
- return ((attrType == null) && (ismodDN == true));
+ return attrType == null && ismodDN;
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
index eb9a014..675cc1f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2012 ForgeRock AS.
+ * Portions copyright 2012-2013 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -144,7 +144,7 @@
String csn = value.subSequence(csnIndex, csnIndex + 28).toString();
ByteStringBuilder builder = new ByteStringBuilder(14);
builder.append(hexStringToByteArray(csn.substring(16, 20)));
- builder.append(hexStringToByteArray(csn.substring(00, 16)));
+ builder.append(hexStringToByteArray(csn.substring(0, 16)));
builder.append(hexStringToByteArray(csn.substring(20, 28)));
return builder.toByteString();
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 86df5e6..f73fe0d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -115,7 +115,6 @@
import org.opends.server.types.operation.PreOperationDeleteOperation;
import org.opends.server.types.operation.PreOperationModifyDNOperation;
import org.opends.server.types.operation.PreOperationModifyOperation;
-import org.opends.server.types.operation.PreOperationOperation;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -277,7 +276,7 @@
*/
private volatile boolean done = true;
- private ServerStateFlush flushThread;
+ private final ServerStateFlush flushThread;
/**
* The attribute name used to store the generation id in the backend.
@@ -400,7 +399,7 @@
{
done = false;
- while (shutdown == false)
+ while (!shutdown)
{
try
{
@@ -584,21 +583,11 @@
/*
* Create a new Persistent Server State that will be used to store
- * the last ChangeNmber seen from all LDAP servers in the topology.
+ * the last ChangeNumber seen from all LDAP servers in the topology.
*/
state = new PersistentServerState(baseDn, serverId, getServerState());
- /* Check if a ReplicaUpdateVector entry is present
- * if so, and no state is already initialized
- * translate the ruv into a serverState and
- * a generationId
- */
- Long compatGenId = state.checkRUVCompat();
- if (compatGenId != null)
- {
- generationId = compatGenId;
- saveGenerationId(generationId);
- }
+ flushThread = new ServerStateFlush();
/*
* ChangeNumberGenerator is used to create new unique ChangeNumbers
@@ -620,7 +609,7 @@
// listen for changes on the configuration
configuration.addChangeListener(this);
- // register as an AltertGenerator
+ // register as an AlertGenerator
DirectoryServer.registerAlertGenerator(this);
}
@@ -638,8 +627,8 @@
boolean needReconnection = false;
byte newSdLevel = (byte) configuration.getAssuredSdLevel();
- if ((isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)) &&
- (newSdLevel != getAssuredSdLevel()))
+ if (isAssured() && getAssuredMode() == AssuredMode.SAFE_DATA_MODE &&
+ newSdLevel != getAssuredSdLevel())
{
needReconnection = true;
}
@@ -654,15 +643,13 @@
}
break;
case SAFE_DATA:
- if (!isAssured() ||
- (isAssured() && (getAssuredMode() == AssuredMode.SAFE_READ_MODE)))
+ if (!isAssured() || getAssuredMode() == AssuredMode.SAFE_READ_MODE)
{
needReconnection = true;
}
break;
case SAFE_READ:
- if (!isAssured() ||
- (isAssured() && (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)))
+ if (!isAssured() || getAssuredMode() == AssuredMode.SAFE_DATA_MODE)
{
needReconnection = true;
}
@@ -882,17 +869,12 @@
if (!found)
{
- // The backend is probably empty: if there is some fractional
- // configuration in memory, we do not let the domain being connected,
- // otherwise, it's ok
- if (fractionalConfig.isFractional())
- {
- return false;
- }
- else
- {
- return true;
- }
+ /*
+ The backend is probably empty: if there is some fractional
+ configuration in memory, we do not let the domain being connected,
+ otherwise, it's ok
+ */
+ return !fractionalConfig.isFractional();
}
/*
@@ -1275,7 +1257,8 @@
* @param performFiltering Tells if the effective modifications should
* be performed or if the call is just to analyze if there are some
* inconsistency with fractional configuration
- * @return true if the operation is inconsistent with fractional configuration
+ * @return true if the operation is inconsistent with fractional
+ * configuration
*/
public boolean fractionalFilterOperation(
PreOperationModifyDNOperation modifyDNOperation, boolean performFiltering)
@@ -1287,8 +1270,8 @@
{
if (modifyDNOperation.deleteOldRDN())
{
- // The core will remove any occurence of attribute that was part of the
- // old RDN, nothing more to do.
+ // The core will remove any occurrence of attribute that was part
+ // of the old RDN, nothing more to do.
return true; // Will not be used as analyze was not requested
}
}
@@ -1307,9 +1290,9 @@
// No attributes to filter
return false;
- /**
+ /*
* Analyze the old and new rdn to see if they are some attributes to be
- * removed: if the oldnRDN contains some forbidden attributes (for instance
+ * removed: if the oldRDN contains some forbidden attributes (for instance
* it is possible if the entry was created with an add operation and the
* RDN used contains a forbidden attribute: in this case the attribute value
* has been kept to be consistent with the dn of the entry.) that are no
@@ -1342,10 +1325,12 @@
!newRdn.hasAttributeType(attributeType) &&
!modifyDNOperation.deleteOldRDN())
{
- // A forbidden attribute is in the old RDN and no more in the new RDN,
- // and it has not been requested to remove attributes from old RDN:
- // remove ourself the attribute from the entry to stay consistent with
- // fractional configuration
+ /*
+ * A forbidden attribute is in the old RDN and no more in the new RDN,
+ * and it has not been requested to remove attributes from old RDN:
+ * let's remove the attribute from the entry to stay consistent with
+ * fractional configuration
+ */
Modification modification = new Modification(ModificationType.DELETE,
Attributes.empty(attributeType));
modifyDNOperation.addModification(modification);
@@ -1451,11 +1436,13 @@
// entry as it is forbidden
if (entryRdn.hasAttributeType(attributeType))
{
- // We must remove all values of the attributes map for this
- // attribute type but the one that has the value which is in the RDN
- // of the entry. In fact the (underlying )attribute list does not
- // suppot remove so we have to create a new list, keeping only the
- // attribute value which is the same as in the RDN
+ /*
+ We must remove all values of the attributes map for this
+ attribute type but the one that has the value which is in the RDN
+ of the entry. In fact the (underlying )attribute list does not
+ support remove so we have to create a new list, keeping only the
+ attribute value which is the same as in the RDN
+ */
AttributeValue rdnAttributeValue =
entryRdn.getAttributeValue(attributeType);
List<Attribute> attrList = attributesMap.get(attributeType);
@@ -1467,17 +1454,11 @@
Attribute attr = attrIt.next();
if (attr.contains(rdnAttributeValue))
{
- Iterator<AttributeValue> attrValues = attr.iterator();
- while(attrValues.hasNext())
- {
- AttributeValue attrValue = attrValues.next();
- if (rdnAttributeValue.equals(attrValue))
- {
+ for (AttributeValue attrValue : attr) {
+ if (rdnAttributeValue.equals(attrValue)) {
// Keep the value we want
sameAttrValue = attrValue;
- }
- else
- {
+ } else {
hasSomeAttributesToFilter = true;
}
}
@@ -1492,17 +1473,19 @@
// Paranoia check: should never be the case as we should always
// find the attribute/value pair matching the pair in the RDN
{
- // Construct and store new atribute list
+ // Construct and store new attribute list
List<Attribute> newRdnAttrList = new ArrayList<Attribute>();
AttributeBuilder attrBuilder =
new AttributeBuilder(attributeType);
attrBuilder.add(sameAttrValue);
newRdnAttrList.add(attrBuilder.toAttribute());
newRdnAttrLists.add(newRdnAttrList);
- // Store matching attribute type
- // The mapping will be done using object from rdnAttrTypes as key
- // and object from newRdnAttrLists (at same index) as value in
- // the user attribute map to be modified
+ /*
+ Store matching attribute type
+ The mapping will be done using object from rdnAttrTypes as key
+ and object from newRdnAttrLists (at same index) as value in
+ the user attribute map to be modified
+ */
rdnAttrTypes.add(attributeType);
}
}
@@ -1516,7 +1499,7 @@
else
{
// The call was just to check : at least one attribute to filter
- // found, return immediatly the answer;
+ // found, return immediately the answer;
return true;
}
}
@@ -1684,7 +1667,7 @@
continue;
}
// Is the current attribute part of the established list ?
- boolean foundAttribute =
+ boolean foundAttribute = attributeName != null &&
fractionalConcernedAttributes.contains(attributeName.toLowerCase());
if (!foundAttribute)
{
@@ -1713,7 +1696,7 @@
else
{
// The call was just to check : at least one attribute to filter
- // found, return immediatly the answer;
+ // found, return immediately the answer;
return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES;
}
}
@@ -1806,7 +1789,7 @@
PreOperationDeleteOperation deleteOperation)
{
if ((!deleteOperation.isSynchronizationOperation())
- && (!brokerIsConnected(deleteOperation)))
+ && (!brokerIsConnected()))
{
Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
return new SynchronizationProviderResult.StopProcessing(
@@ -1834,7 +1817,7 @@
* Probably the original entry was renamed and replaced with
* another entry.
* We must not let the change proceed, return a negative
- * result and set the result code to NO_SUCH_OBJET.
+ * result and set the result code to NO_SUCH_OBJECT.
* When the operation will return, the thread that started the
* operation will try to find the correct entry and restart a new
* operation.
@@ -1882,7 +1865,7 @@
PreOperationAddOperation addOperation)
{
if ((!addOperation.isSynchronizationOperation())
- && (!brokerIsConnected(addOperation)))
+ && (!brokerIsConnected()))
{
Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
return new SynchronizationProviderResult.StopProcessing(
@@ -1980,16 +1963,14 @@
* Check that the broker associated to this ReplicationDomain has found
* a Replication Server and that this LDAP server is therefore able to
* process operations.
- * If not set the ResultCode and the response message,
+ * If not, set the ResultCode, the response message,
* interrupt the operation, and return false
*
- * @param op The Operation that needs to be checked.
- *
* @return true when it OK to process the Operation, false otherwise.
- * When false is returned the resultCode and the reponse message
+ * When false is returned the resultCode and the response message
* is also set in the Operation.
*/
- private boolean brokerIsConnected(PreOperationOperation op)
+ private boolean brokerIsConnected()
{
if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
{
@@ -2020,7 +2001,7 @@
PreOperationModifyDNOperation modifyDNOperation)
{
if ((!modifyDNOperation.isSynchronizationOperation())
- && (!brokerIsConnected(modifyDNOperation)))
+ && (!brokerIsConnected()))
{
Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
return new SynchronizationProviderResult.StopProcessing(
@@ -2076,7 +2057,7 @@
* Probably the original entry was renamed and replaced with
* another entry.
* We must not let the change proceed, return a negative
- * result and set the result code to NO_SUCH_OBJET.
+ * result and set the result code to NO_SUCH_OBJECT.
* When the operation will return, the thread that started the
* operation will try to find the correct entry and restart a new
* operation.
@@ -2140,7 +2121,7 @@
PreOperationModifyOperation modifyOperation)
{
if ((!modifyOperation.isSynchronizationOperation())
- && (!brokerIsConnected(modifyOperation)))
+ && (!brokerIsConnected()))
{
Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
return new SynchronizationProviderResult.StopProcessing(
@@ -2198,8 +2179,8 @@
Entry modifiedEntry = modifyOperation.getModifiedEntry();
if (ctx == null)
{
- // No replication ctxt attached => not a replicated operation
- // - create a ctxt with : changeNumber, entryUUID
+ // No replication ctx attached => not a replicated operation
+ // - create a ctx with : changeNumber, entryUUID
// - attach the context to the op
ChangeNumber changeNumber = generateChangeNumber(modifyOperation);
@@ -2210,7 +2191,7 @@
}
else
{
- // Replication ctxt attached => this is a replicated operation being
+ // Replication ctx attached => this is a replicated operation being
// replayed here, it is necessary to
// - check if the entry has been renamed
// - check for conflicts
@@ -2225,7 +2206,7 @@
* Probably the original entry was renamed and replaced with
* another entry.
* We must not let the modification proceed, return a negative
- * result and set the result code to NO_SUCH_OBJET.
+ * result and set the result code to NO_SUCH_OBJECT.
* When the operation will return, the thread that started the
* operation will try to find the correct entry and restart a new
* operation.
@@ -2347,7 +2328,7 @@
catch (NoSuchElementException e)
{
Message message = ERR_OPERATION_NOT_FOUND_IN_PENDING.get(
- curChangeNumber.toString(), op.toString());
+ op.toString(), curChangeNumber.toString());
logError(message);
return;
}
@@ -2361,7 +2342,7 @@
generationIdSavedStatus = false;
}
- if (generationIdSavedStatus != true)
+ if (!generationIdSavedStatus)
{
this.saveGenerationId(generationId);
}
@@ -2448,27 +2429,27 @@
attrs, null);
LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
- Entry entrytoRename = null;
- ChangeNumber entrytoRenameDate = null;
+ Entry entryToRename = null;
+ ChangeNumber entryToRenameCN = null;
for (SearchResultEntry entry : entries)
{
EntryHistorical history = EntryHistorical.newInstanceFromEntry(entry);
- if (entrytoRename == null)
+ if (entryToRename == null)
{
- entrytoRename = entry;
- entrytoRenameDate = history.getDNDate();
+ entryToRename = entry;
+ entryToRenameCN = history.getDNDate();
}
- else if (!history.addedOrRenamedAfter(entrytoRenameDate))
+ else if (!history.addedOrRenamedAfter(entryToRenameCN))
{
// this conflict is older than the previous, keep it.
- entrytoRename = entry;
- entrytoRenameDate = history.getDNDate();
+ entryToRename = entry;
+ entryToRenameCN = history.getDNDate();
}
}
- if (entrytoRename != null)
+ if (entryToRename != null)
{
- DN entryDN = entrytoRename.getDN();
+ DN entryDN = entryToRename.getDN();
ModifyDNOperationBasis newOp = renameEntry(
entryDN, freedDN.getRDN(), freedDN.getParent(), false);
@@ -2601,7 +2582,7 @@
}
} catch (InterruptedException e)
{
- // stop waiting when interrupted.
+ Thread.currentThread().interrupt();
}
}
@@ -2924,12 +2905,12 @@
* search if the entry has been renamed, and return the new dn
* of the entry.
*/
- DN newdn = findEntryDN(entryUUID);
- if (newdn != null)
+ DN newDN = findEntryDN(entryUUID);
+ if (newDN != null)
{
// There is an entry with the same unique id as this modify operation
// replay the modify using the current dn of this entry.
- msg.setDn(newdn.toString());
+ msg.setDn(newDN.toString());
numResolvedNamingConflicts.incrementAndGet();
return false;
}
@@ -2974,15 +2955,7 @@
// current RDN value(s);
mod.setModificationType(ModificationType.REPLACE);
Attribute newAttribute = mod.getAttribute();
- AttributeBuilder attrBuilder;
- if (newAttribute == null)
- {
- attrBuilder = new AttributeBuilder(modAttrType);
- }
- else
- {
- attrBuilder = new AttributeBuilder(newAttribute);
- }
+ AttributeBuilder attrBuilder = new AttributeBuilder(newAttribute);
attrBuilder.add(currentRDN.getAttributeValue(modAttrType));
mod.setAttribute(attrBuilder.toAttribute());
}
@@ -3224,8 +3197,9 @@
/*
* This entry is the base dn of the backend.
* It is quite surprising that the operation result be NO_SUCH_OBJECT.
- * There is nothing more we can do except TODO log a
+ * There is nothing more we can do except log a
* message for the repair tool to look at this problem.
+ * TODO : Log the message
*/
return true;
}
@@ -3265,7 +3239,7 @@
* - two adds are done on different servers but with the
* same target DN.
* - the same ADD is being replayed for the second time on this server.
- * if the nsunique ID already exist, assume this is a replay and
+ * if the entryUUID already exist, assume this is a replay and
* don't do anything
* if the entry unique id do not exist, generate conflict.
*/
@@ -3315,11 +3289,10 @@
attrs.add(ENTRYUUID_ATTRIBUTE_NAME);
attrs.add(EntryHistorical.HISTORICAL_ATTRIBUTE_NAME);
- SearchFilter ALLMATCH;
- ALLMATCH = SearchFilter.createFilterFromString("(objectClass=*)");
InternalSearchOperation op =
conn.processSearch(entryDN, SearchScope.SINGLE_LEVEL,
- DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, ALLMATCH,
+ DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false,
+ SearchFilter.createFilterFromString("(objectClass=*)"),
attrs);
if (op.getResultCode() == ResultCode.SUCCESS)
@@ -3501,7 +3474,6 @@
* @param dn The original DN of the entry.
*
* @return The generated RDN for a conflicting entry.
- * @throws DirectoryException
*/
private RDN generateDeleteConflictDn(String entryUUID, DN dn)
{
@@ -3584,19 +3556,10 @@
state.clearInMemory();
state.loadState();
- // Check to see if a Ruv needs to be translated
- Long compatGenId = state.checkRUVCompat();
-
generator.adjust(state.getMaxChangeNumber(serverId));
// Retrieves the generation ID associated with the data imported
- if (compatGenId != null)
- {
- generationId = compatGenId;
- saveGenerationId(generationId);
- }
- else
- generationId = loadGenerationId();
+ generationId = loadGenerationId();
}
/**
@@ -3780,14 +3743,8 @@
}
if (search.getResultCode() != ResultCode.SUCCESS)
{
- if (search.getResultCode() == ResultCode.NO_SUCH_OBJECT)
- {
- // nothing initialized yet
- // don't log an error generationID will be computed.
- }
- else
- {
- //
+ if (search.getResultCode() != ResultCode.NO_SUCH_OBJECT)
+ { // This is an error.
Message message = ERR_SEARCHING_GENERATION_ID.get(
search.getResultCode().getResultCodeName() + " " +
search.getErrorMessage(),
@@ -4042,16 +3999,11 @@
}
catch (DirectoryException de)
{
- if ((ros != null) &&
- (ros.getNumExportedEntries() >= entryCount))
- {
- // This is the normal end when computing the generationId
- // We can interrupt the export only by an IOException
- }
- else
+ if (ros == null ||
+ ros.getNumExportedEntries() < entryCount)
{
Message message =
- ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
+ ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
logError(message);
throw new DirectoryException(
ResultCode.OTHER, message, null);
@@ -4077,10 +4029,10 @@
}
// Release the shared lock on the backend.
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
try
{
- String lockFile = LockFileManager.getBackendLockFileName(backend);
- StringBuilder failureReason = new StringBuilder();
if (! LockFileManager.releaseLock(lockFile, failureReason))
{
Message message = WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(
@@ -4291,9 +4243,9 @@
}
// From the domainDN retrieves the replication domain
- LDAPReplicationDomain sdomain =
+ LDAPReplicationDomain domain =
MultimasterReplication.findDomain(baseDn, null);
- if (sdomain == null)
+ if (domain == null)
{
break;
}
@@ -4304,7 +4256,7 @@
throw new DirectoryException(ResultCode.OTHER,
message);
}
- replicationDomain = sdomain;
+ replicationDomain = domain;
}
if (replicationDomain == null)
@@ -4430,14 +4382,8 @@
* This has no negative impact because the changes on schema should
* not produce conflicts.
*/
- if (baseDn.compareTo(DirectoryServer.getSchemaDN()) == 0)
- {
- solveConflictFlag = false;
- }
- else
- {
- solveConflictFlag = configuration.isSolveConflicts();
- }
+ solveConflictFlag = baseDn.compareTo(DirectoryServer.getSchemaDN()) != 0 &&
+ configuration.isSolveConflicts();
try
{
@@ -4517,7 +4463,6 @@
public void start()
{
// Create the ServerStateFlush thread
- flushThread = new ServerStateFlush();
flushThread.start();
startListenService();
@@ -4553,7 +4498,7 @@
/**
* Store the provided ECL configuration for the domain.
* @param domCfg The provided configuration.
- * @throws ConfigException When an error occured.
+ * @throws ConfigException When an error occurred.
*/
public void storeECLConfiguration(ReplicationDomainCfg domCfg)
throws ConfigException
@@ -4568,7 +4513,7 @@
{
try
{ eclDomCfg = domCfg.getExternalChangelogDomain();
- } catch(Exception e) {}
+ } catch(Exception e) { /* do nothing */ }
// domain with no config entry only when running unit tests
if (eclDomCfg == null)
{
@@ -4697,7 +4642,7 @@
// normally the RS should have been updated by other RSes except for
// very last changes lost if the local connection was broken
// ... hence the RS we are connected to should not be empty
- // ... or if it is empty, it is due to a volontary reset
+ // ... or if it is empty, it is due to a voluntary reset
// and we don't want to update it with our changes that could be huge.
if ((replServerMaxChangeNumber != null) &&
(replServerMaxChangeNumber.getSeqnum()!=0))
@@ -5041,10 +4986,10 @@
}
/**
- * Called by synchronize post op plugin in order to add the entry historized
+ * Called by synchronize post op plugin in order to add the entry historical
* attributes to the UpdateMsg.
- * @param msg
- * @param op
+ * @param msg an replication update message
+ * @param op the operation in progress
* @throws DirectoryException
*/
private void addEntryAttributesForCL(UpdateMsg msg,
@@ -5103,14 +5048,14 @@
{
// Potential fast-path for delete operations.
LinkedList<Attribute> attributes = new LinkedList<Attribute>();
- for (List<Attribute> alist : entry.getUserAttributes().values())
+ for (List<Attribute> attributeList : entry.getUserAttributes().values())
{
- attributes.addAll(alist);
+ attributes.addAll(attributeList);
}
- Attribute ocattr = entry.getObjectClassAttribute();
- if (ocattr != null)
+ Attribute objectClassAttribute = entry.getObjectClassAttribute();
+ if (objectClassAttribute != null)
{
- attributes.add(ocattr);
+ attributes.add(objectClassAttribute);
}
return attributes;
}
@@ -5422,11 +5367,11 @@
Map<String, List<String>> fractionalSpecificClassesAttributes,
List<String> fractionalAllClassesAttributes) throws ConfigException
{
- int fractional_mode = NOT_FRACTIONAL;
+ int fractionalMode;
// Determine if fractional-exclude or fractional-include property is used
// : only one of them is allowed
- Iterator<String> fracConfIt = null;
+ Iterator<String> iterator;
// Deduce the wished fractional mode
if ((exclIt != null) && exclIt.hasNext())
@@ -5438,16 +5383,16 @@
}
else
{
- fractional_mode = EXCLUSIVE_FRACTIONAL;
- fracConfIt = exclIt;
+ fractionalMode = EXCLUSIVE_FRACTIONAL;
+ iterator = exclIt;
}
}
else
{
if ((inclIt != null) && inclIt.hasNext())
{
- fractional_mode = INCLUSIVE_FRACTIONAL;
- fracConfIt = inclIt;
+ fractionalMode = INCLUSIVE_FRACTIONAL;
+ iterator = inclIt;
}
else
{
@@ -5455,11 +5400,11 @@
}
}
- while (fracConfIt.hasNext())
+ while (iterator.hasNext())
{
// Parse a value with the form class:attr1,attr2...
// or *:attr1,attr2...
- String fractCfgStr = fracConfIt.next();
+ String fractCfgStr = iterator.next();
StringTokenizer st = new StringTokenizer(fractCfgStr, ":");
int nTokens = st.countTokens();
if (nTokens < 2)
@@ -5505,7 +5450,7 @@
}
}
}
- return fractional_mode;
+ return fractionalMode;
}
// Return type of the parseFractionalConfig method
@@ -5546,7 +5491,7 @@
FractionalConfig fractionalConfig1, FractionalConfig fractionalConfig2)
throws ConfigException
{
- // Comapre base DNs just to be consistent
+ // Compare base DNs just to be consistent
if (!fractionalConfig1.getBaseDn().equals(fractionalConfig2.getBaseDn()))
return false;
@@ -5574,9 +5519,9 @@
if (specificClassesAttributes1.size() !=
specificClassesAttributes2.size())
return false;
-
- // Check consistency of specific classes attributes
/*
+ * Check consistency of specific classes attributes
+ *
* For each class in specificClassesAttributes1, check that the attribute
* list is equivalent to specificClassesAttributes2 attribute list
*/
@@ -5702,7 +5647,7 @@
for (SearchResultEntry entry : entries)
{
long maxTimeToRun = endDate - TimeThread.getTime();
- if (maxTimeToRun<0)
+ if (maxTimeToRun < 0)
{
Message errMsg = Message.raw(Category.SYNC, Severity.NOTICE,
" end date reached");
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index 22772cc..c92c682 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -85,7 +85,7 @@
*
* It also extends the SynchronizationProvider class in order to have some
* replication code running during the operation process
- * as pre-op, conflictRsolution, and post-op.
+ * as pre-op, conflictResolution, and post-op.
*/
public class MultimasterReplication
extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
@@ -140,7 +140,7 @@
* Don't run the special replication code on Operation that are
* specifically marked as don't synchronize.
*/
- if ((pluginOp != null) && (pluginOp instanceof Operation))
+ if (pluginOp != null && pluginOp instanceof Operation)
{
Operation op = ((Operation) pluginOp);
@@ -154,7 +154,7 @@
* so that the core server let the operation modify the entryuuid
* and ds-sync-hist attributes.
* They are also tagged as dontSynchronize so that the replication
- * code running later do not generate ChnageNumber, solve conflicts
+ * code running later do not generate ChangeNumber, solve conflicts
* and forward the operation to the replication server.
*/
for (Control c : op.getRequestControls())
@@ -163,10 +163,12 @@
{
op.setSynchronizationOperation(true);
op.setDontSynchronize(true);
- // remove this control from the list of controls since
- // it has now been processed and the local backend will
- // fail if it finds a control that it does not know about and
- // that is marked as critical.
+ /*
+ remove this control from the list of controls since
+ it has now been processed and the local backend will
+ fail if it finds a control that it does not know about and
+ that is marked as critical.
+ */
List<Control> controls = op.getRequestControls();
controls.remove(c);
return null;
@@ -175,7 +177,7 @@
}
- LDAPReplicationDomain domain = null;
+ LDAPReplicationDomain domain;
DN temp = dn;
do
{
@@ -333,7 +335,7 @@
}
/**
- * Stope the threads that are waiting for incoming update messages.
+ * Stop the threads that are waiting for incoming update messages.
*/
private synchronized static void stopReplayThreads()
{
@@ -345,7 +347,14 @@
for (ReplayThread replayThread : replayThreads)
{
- replayThread.waitForShutdown();
+ try
+ {
+ replayThread.join();
+ }
+ catch(InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
}
replayThreads.clear();
}
@@ -377,7 +386,7 @@
} catch (ConfigException e)
{
// we should never get to this point because the configEntry has
- // already been validated in configAddisAcceptable
+ // already been validated in isConfigurationAddAcceptable()
return new ConfigChangeResult(ResultCode.CONSTRAINT_VIOLATION, false);
}
}
@@ -544,7 +553,7 @@
DN operationDN = modifyDNOperation.getEntryDN();
LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
- if ((domain == null) || (!domain.solveConflict()))
+ if (domain == null || !domain.solveConflict())
return new SynchronizationProviderResult.ContinueProcessing();
// The historical object is retrieved from the attachment created
@@ -563,7 +572,7 @@
historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
- // Add to the operation the historical attribute : "dn:changeNumger:moddn"
+ // Add to the operation the historical attribute : "dn:changeNumber:moddn"
historicalInformation.setHistoricalAttrToOperation(modifyDNOperation);
return new SynchronizationProviderResult.ContinueProcessing();
@@ -586,7 +595,7 @@
if (!addOperation.isSynchronizationOperation())
domain.doPreOperation(addOperation);
- // Add to the operation the historical attribute : "dn:changeNumger:add"
+ // Add to the operation the historical attribute : "dn:changeNumber:add"
EntryHistorical.setHistoricalAttrToOperation(addOperation);
return new SynchronizationProviderResult.ContinueProcessing();
@@ -777,12 +786,9 @@
private void genericPostOperation(PostOperationOperation operation, DN dn)
{
LDAPReplicationDomain domain = findDomain(dn, operation);
- if (domain == null)
- return;
-
- domain.synchronize(operation);
-
- return;
+ if (domain != null) {
+ domain.synchronize(operation);
+ }
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
index 729b122..1eb9573 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2012 ForgeRock AS.
+ * Portions copyright 2012-2013 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
@@ -35,15 +35,12 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Iterator;
-import org.opends.server.core.DeleteOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperationBasis;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
-import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
@@ -55,7 +52,6 @@
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
-import org.opends.server.types.LDAPException;
import org.opends.server.types.ModificationType;
import org.opends.server.types.RawModification;
import org.opends.server.types.ResultCode;
@@ -64,7 +60,7 @@
import org.opends.server.types.SearchScope;
/**
- * This class implements a ServerState that is stored on the backends
+ * This class implements a ServerState that is stored in the backend
* used to store the synchronized data and that is therefore persistent
* across server reboot.
*/
@@ -73,7 +69,6 @@
private final DN baseDn;
private final InternalClientConnection conn =
InternalClientConnection.getRootConnection();
- private final ByteString asn1BaseDn;
private final int serverId;
private final ServerState state;
@@ -83,16 +78,6 @@
*/
protected static final String REPLICATION_STATE = "ds-sync-state";
- /**
- * The attribute name used to store the entryUUID.
- */
- private static final String ENTRY_UUID = "entryUUID";
-
- /**
- * The attribute name used to store the RUV elements.
- */
- private static final String REPLICATION_RUV_ELEMENT = "nsds50ruv";
-
/**
* create a new ServerState.
* @param baseDn The baseDN for which the ServerState is created
@@ -103,12 +88,12 @@
this.baseDn = baseDn;
this.serverId = serverId;
this.state = new ServerState();
- this.asn1BaseDn = ByteString.valueOf(baseDn.toString());
loadState();
}
/**
- * Create a new PersistenServerState based on an already existing ServerState.
+ * Create a new PersistentServerState based on an already existing
+ * ServerState.
*
* @param baseDn The baseDN for which the ServerState is created.
* @param serverId The serverId.
@@ -119,7 +104,6 @@
this.baseDn = baseDn;
this.serverId = serverId;
this.state = state;
- this.asn1BaseDn = ByteString.valueOf(baseDn.toString());
loadState();
}
@@ -170,16 +154,14 @@
*/
public void loadState()
{
- SearchResultEntry stateEntry = null;
-
// try to load the state from the base entry.
- stateEntry = searchBaseEntry();
-
+ SearchResultEntry stateEntry = searchBaseEntry();
if (stateEntry == null)
{
- // The base entry does not exist yet
- // in the database or was deleted. Try to read the ServerState
- // from the configuration instead.
+ /*
+ The base entry does not exist yet in the database or was deleted.
+ Try to read the ServerState from the configuration instead.
+ */
stateEntry = searchConfigEntry();
}
@@ -205,50 +187,47 @@
*/
private SearchResultEntry searchBaseEntry()
{
- LDAPFilter filter;
-
try
{
- filter = LDAPFilter.decode("objectclass=*");
- } catch (LDAPException e)
- {
- // can not happen
- return null;
- }
-
- /*
- * Search the database entry that is used to periodically
- * save the ServerState
- */
- LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
- attributes.add(REPLICATION_STATE);
- InternalSearchOperation search = conn.processSearch(asn1BaseDn,
- SearchScope.BASE_OBJECT,
- DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
- filter,attributes);
- if (((search.getResultCode() != ResultCode.SUCCESS)) &&
- ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
- {
- Message message = ERR_ERROR_SEARCHING_RUV.
- get(search.getResultCode().getResultCodeName(), search.toString(),
- search.getErrorMessage(), baseDn.toString());
- logError(message);
- return null;
- }
-
- SearchResultEntry stateEntry = null;
- if (search.getResultCode() == ResultCode.SUCCESS)
- {
+ SearchFilter filter =
+ SearchFilter.createFilterFromString("objectclass=*");
/*
- * Read the serverState from the REPLICATION_STATE attribute
+ * Search the database entry that is used to periodically
+ * save the ServerState
*/
- LinkedList<SearchResultEntry> result = search.getSearchEntries();
- if (!result.isEmpty())
+ LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
+ attributes.add(REPLICATION_STATE);
+ InternalSearchOperation search = conn.processSearch(baseDn,
+ SearchScope.BASE_OBJECT,
+ DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, 0, false, filter, attributes);
+ if (((search.getResultCode() != ResultCode.SUCCESS)) &&
+ ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
{
- stateEntry = result.getFirst();
+ Message message = ERR_ERROR_SEARCHING_RUV.
+ get(search.getResultCode().getResultCodeName(), search.toString(),
+ search.getErrorMessage(), baseDn.toString());
+ logError(message);
+ return null;
}
+
+ SearchResultEntry stateEntry = null;
+ if (search.getResultCode() == ResultCode.SUCCESS)
+ {
+ // Read the serverState from the REPLICATION_STATE attribute
+ LinkedList<SearchResultEntry> result = search.getSearchEntries();
+ if (!result.isEmpty())
+ {
+ stateEntry = result.getFirst();
+ }
+ }
+ return stateEntry;
}
- return stateEntry;
+ catch (DirectoryException e)
+ {
+ // cannot happen
+ return null;
+ }
}
/**
@@ -276,15 +255,12 @@
if (op.getResultCode() == ResultCode.SUCCESS)
{
- /*
- * Read the serverState from the REPLICATION_STATE attribute
- */
+ // Read the serverState from the REPLICATION_STATE attribute
LinkedList<SearchResultEntry> resultEntries =
op.getSearchEntries();
if (!resultEntries.isEmpty())
{
- SearchResultEntry resultEntry = resultEntries.getFirst();
- return resultEntry;
+ return resultEntries.getFirst();
}
}
return null;
@@ -455,31 +431,23 @@
dbMaxCn = serverStateMaxCn;
for (SearchResultEntry resEntry : op.getSearchEntries())
{
- List<Attribute> attrs = resEntry.getAttribute(histType);
- Iterator<AttributeValue> iav = attrs.get(0).iterator();
- try
+ for (AttributeValue attrValue :
+ resEntry.getAttribute(histType).get(0))
{
- while (true)
- {
- AttributeValue attrVal = iav.next();
- HistoricalAttributeValue histVal =
- new HistoricalAttributeValue(attrVal.toString());
- ChangeNumber cn = histVal.getCn();
+ HistoricalAttributeValue histVal =
+ new HistoricalAttributeValue(attrValue.toString());
+ ChangeNumber cn = histVal.getCn();
- if ((cn != null) && (cn.getServerId() == serverId))
+ if ((cn != null) && (cn.getServerId() == serverId))
+ {
+ // compare the csn regarding the maxCn we know and
+ // store the biggest
+ if (ChangeNumber.compare(dbMaxCn, cn) < 0)
{
- // compare the csn regarding the maxCn we know and
- // store the biggest
- if (ChangeNumber.compare(dbMaxCn, cn) < 0)
- {
- dbMaxCn = cn;
- }
+ dbMaxCn = cn;
}
}
}
- catch(Exception e)
- {
- }
}
if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0)
@@ -495,215 +463,6 @@
}
}
-/**
- * Check if a ReplicaUpdateVector entry is present
- * if so, translate the ruv into a serverState and
- * a generationId.
- * @return the generationId translated from the RUV
- * entry, 0 if no RUV is present
- */
- public Long checkRUVCompat() {
-
- Long genId = null;
- SearchResultEntry ruvEntry = null;
-
- try
- {
-
- // Search the RUV in the DB
- ruvEntry = searchRUVEntry();
-
- if (ruvEntry == null)
- return null;
-
- // Check if the serverState is already initialized
-
- if( !isServerStateInitilized())
- {
- // Translate the ruv to serverState
- // and GenerationId
- genId = initializeStateWithRUVEntry(ruvEntry);
- }
- }
- catch (Exception e)
- {
- Message message = NOTE_ERR_WHILE_TRYING_TO_DECODE_RUV_IN_STATE.get(
- baseDn.toString());
- logError(message);
- }
- // In any case, remove the RUV entry
- // if it exists
- DeleteOperationBasis del = new DeleteOperationBasis(conn,
- InternalClientConnection.nextOperationID(),
- InternalClientConnection.nextMessageID(), null,
- ByteString.valueOf(ruvEntry.getDN().toNormalizedString()));
-
- // Run the internal operation
- del.setInternalOperation(true);
- del.setSynchronizationOperation(true);
- del.setDontSynchronize(true);
- del.run();
-
- return genId;
- }
-
- /**
- * Initialize the serverState and the GenerationId based on a RUV
- * entry.
- * @param ruvEntry the entry to translate into a serverState.
- * @return the generationId translated from the RUV entry.
- */
- private Long initializeStateWithRUVEntry(SearchResultEntry ruvEntry) {
-
- Long genId = null;
- String value = null;
- String csn = null;
-
- AttributeType ruvElementType =
- DirectoryServer.getAttributeType(REPLICATION_RUV_ELEMENT);
-
- if (ruvElementType == null)
- return null;
-
- for (Attribute attr : ruvEntry.getAttribute(ruvElementType))
- {
- Iterator<AttributeValue> it = attr.iterator();
- while (it.hasNext())
- {
- value = it.next().toString();
- // Search for the GenerationId
- if (value.startsWith("{replicageneration} "))
- {
- // Get only the timestamp present in the CSN
- String replicaGen = value.substring(20, 28);
- genId = Long.parseLong(replicaGen,16);
- }
- else
- {
- // Translate the other elements into serverState
- if (value.startsWith("{replica "))
- {
- String[] bits = value.split(" ");
-
- // Need to take into account when a purl is empty
- if (bits.length > 3)
- {
- if (bits[2].contains("ldap"))
- {
- // the ldap url is present so the max csn is the 5th element
- // Example :
- // {replica 5 ldap://host:port} 494b6635000000050000 4aeab8f300
- // 0000050000
- csn = bits[4];
- }
- else
- {
- // no ldap url so the max csn is the 4th element
- // Example :
- // {replica 31842} 4a0d1ff700017c620000 4a926b6500007c620000
- csn = bits[3];
- }
-
- String temp = csn.substring(0, 8);
- Long timeStamp = Long.parseLong(temp, 16);
-
- temp = csn.substring(8, 12);
- Integer seqNum = Integer.parseInt(temp, 16);
-
- temp = csn.substring(12, 16);
- Integer replicaId = Integer.parseInt(temp, 16);
-
- // No need to take into account the subSeqNum
- ChangeNumber cn =
- new ChangeNumber(timeStamp*1000, seqNum, replicaId);
-
- this.update(cn);
- }
- }
- }
- }
- }
-
- return genId;
-}
-
- /**
- * Check if the server State is initialized by searching
- * the attribute type REPLICATION_STATE in the root entry.
- * @return true if the serverState is initialized, false
- * otherwise
- */
- private boolean isServerStateInitilized() {
- SearchResultEntry resultEntry = searchBaseEntry();
-
- AttributeType synchronizationStateType =
- DirectoryServer.getAttributeType(REPLICATION_STATE);
- List<Attribute> attrs =
- resultEntry.getAttribute(synchronizationStateType);
-
- return (attrs != null);
- }
-
-/**
- * Search the database entry that represent a serverState
- * using the RUV format (compatibility mode).
- * @return the corresponding RUV entry, null otherwise
- */
- private SearchResultEntry searchRUVEntry() {
- LDAPFilter filter;
- SearchResultEntry ruvEntry = null;
-
- // Search the RUV entry
- try
- {
- filter = LDAPFilter.decode("objectclass=ldapSubEntry");
- } catch (LDAPException e)
- {
- // can not happen
- return null;
- }
-
- LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
- attributes.add(ENTRY_UUID);
- attributes.add(REPLICATION_RUV_ELEMENT);
- InternalSearchOperation search = conn.processSearch(asn1BaseDn,
- SearchScope.SUBORDINATE_SUBTREE,
- DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
- filter,attributes);
- if (((search.getResultCode() != ResultCode.SUCCESS)) &&
- ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
- return null;
-
- if (search.getResultCode() == ResultCode.SUCCESS)
- {
- /*
- * Search the ldapSubEntry with the entryUUID equals
- * to "ffffffff-ffff-ffff-ffff-ffffffffffff"
- */
- LinkedList<SearchResultEntry> result = search.getSearchEntries();
- if (!result.isEmpty())
- {
- for (SearchResultEntry ldapSubEntry : result)
- {
- List<Attribute> attrs =
- ldapSubEntry.getAttribute(ENTRY_UUID.toLowerCase());
- if (attrs != null)
- {
- Iterator<AttributeValue> iav = attrs.get(0).iterator();
- AttributeValue attrVal = iav.next();
- if (attrVal.toString().
- equalsIgnoreCase("ffffffff-ffff-ffff-ffff-ffffffffffff"))
- {
- ruvEntry = ldapSubEntry;
- break;
- }
- }
- }
- }
- }
- return ruvEntry;
- }
-
/**
* Get the largest ChangeNumber seen for a given LDAP server ID.
*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index a19cf59..6da2771 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2007-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -58,7 +59,7 @@
* One of this object is instantiated for each ReplicationDomain.
*
*/
-public class RemotePendingChanges
+public final class RemotePendingChanges
{
/**
* A map used to store the pending changes.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
index cb24c37..1304346 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.plugin;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
@@ -57,7 +57,6 @@
private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
private volatile boolean shutdown = false;
- private volatile boolean done = false;
private static int count = 0;
/**
@@ -91,12 +90,11 @@
TRACER.debugInfo("Replication Replay thread starting.");
}
- UpdateToReplay updateToreplay = null;
-
while (!shutdown)
{
try
{
+ UpdateToReplay updateToreplay;
// Loop getting an updateToReplayQueue from the update message queue and
// replaying matching changes
while ( (!shutdown) &&
@@ -119,27 +117,9 @@
logError(message);
}
}
- done = true;
if (debugEnabled())
{
TRACER.debugInfo("Replication Replay thread stopping.");
}
}
-
- /**
- * Wait for the completion of this thread.
- */
- public void waitForShutdown()
- {
- try
- {
- while ((done == false) && (this.isAlive()))
- {
- Thread.sleep(50);
- }
- } catch (InterruptedException e)
- {
- // exit the loop if this thread is interrupted.
- }
- }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationRepairRequestControl.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationRepairRequestControl.java
index 4b5e5cf..22772ba 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationRepairRequestControl.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationRepairRequestControl.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -47,7 +48,7 @@
public class ReplicationRepairRequestControl extends Control
{
/**
- * ControlDecoder implentation to decode this control from a ByteString.
+ * ControlDecoder implementation to decode this control from a ByteString.
*/
private final static class Decoder
implements ControlDecoder<ReplicationRepairRequestControl>
@@ -84,7 +85,6 @@
public static final String
OID_REPLICATION_REPAIR_CONTROL = "1.3.6.1.4.1.26027.1.5.2";
-
/**
* Creates a new instance of the replication repair request control with the
* default settings.
@@ -95,8 +95,6 @@
}
-
-
/**
* Creates a new instance of the replication repair control with the
* provided information.
@@ -111,7 +109,7 @@
}
/**
- * Writes this control's value to an ASN.1 writer. The value (if any) must be
+ * Writes this control value to an ASN.1 writer. The value (if any) must be
* written as an ASN1OctetString.
*
* @param writer The ASN.1 writer to use.
@@ -122,8 +120,6 @@
// No value element
}
-
-
/**
* Appends a string representation of this replication repair request control
* to the provided buffer.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AckMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
index b461272..544e680 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -176,31 +177,13 @@
pos += length + 1;
/* Read the hasTimeout flag */
- if (in[pos++] == 1)
- {
- hasTimeout = true;
- } else
- {
- hasTimeout = false;
- }
+ hasTimeout = in[pos++] == 1;
/* Read the hasWrongStatus flag */
- if (in[pos++] == 1)
- {
- hasWrongStatus = true;
- } else
- {
- hasWrongStatus = false;
- }
+ hasWrongStatus = in[pos++] == 1;
/* Read the hasReplayError flag */
- if (in[pos++] == 1)
- {
- hasReplayError = true;
- } else
- {
- hasReplayError = false;
- }
+ hasReplayError = in[pos++] == 1;
/* Read the list of failed server ids */
while (pos < in.length)
@@ -318,7 +301,7 @@
*/
public String errorsToString()
{
- String idList = null;
+ String idList;
if (failedServers.size() > 0)
{
idList = "[";
@@ -334,12 +317,10 @@
idList="none";
}
- String ackErrorStr = "hasTimeout: " + (hasTimeout ? "yes" : "no") + ", " +
+ return "hasTimeout: " + (hasTimeout ? "yes" : "no") + ", " +
"hasWrongStatus: " + (hasWrongStatus ? "yes" : "no") + ", " +
"hasReplayError: " + (hasReplayError ? "yes" : "no") + ", " +
"concerned server ids: " + idList;
-
- return ackErrorStr;
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
index a624d6b..fb6bb4c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -552,8 +552,7 @@
*/
public List<Attribute> getAttributes() throws LDAPException, ASN1Exception
{
- List<Attribute> attrs = decodeAttributes(encodedAttributes);
- return attrs;
+ return decodeAttributes(encodedAttributes);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
index b036620..517130d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -65,7 +65,7 @@
isSubtreeDelete = true;
}
catch(Exception e)
- {}
+ {/* do nothing */}
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
index 475bdd7..344f2db 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -128,10 +129,7 @@
// data
length = in.length - (pos + 1);
this.entryByteArray = new byte[length];
- for (int i=0; i<length; i++)
- {
- entryByteArray[i] = in[pos+i];
- }
+ System.arraycopy(in, pos, entryByteArray, 0, length);
}
catch (UnsupportedEncodingException e)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
index 0f9b776..bce6fcf 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -87,7 +87,7 @@
}
/**
- * Creates a new UpdateMsg with the given informations.
+ * Creates a new UpdateMsg with the given information.
*
* @param ctx The replication Context of the operation for which the
* update message must be created,.
@@ -103,7 +103,7 @@
}
/**
- * Creates a new UpdateMessage with the given informations.
+ * Creates a new UpdateMessage with the given information.
*
* @param cn The ChangeNumber of the operation for which the
* UpdateMessage is created.
@@ -491,9 +491,9 @@
/* Read the changeNumber */
int pos = 2;
int length = getNextLength(encodedMsg, pos);
- String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
+ String changeNumberStr = new String(encodedMsg, pos, length, "UTF-8");
pos += length + 1;
- changeNumber = new ChangeNumber(changenumberStr);
+ changeNumber = new ChangeNumber(changeNumberStr);
/* Read the dn */
length = getNextLength(encodedMsg, pos);
@@ -506,10 +506,7 @@
pos += length + 1;
/* Read the assured information */
- if (encodedMsg[pos++] == 1)
- assuredFlag = true;
- else
- assuredFlag = false;
+ assuredFlag = encodedMsg[pos++] == 1;
/* Read the assured mode */
assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
@@ -556,15 +553,12 @@
/* read the changeNumber */
int pos = 1;
int length = getNextLength(encodedMsg, pos);
- String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
+ String changeNumberStr = new String(encodedMsg, pos, length, "UTF-8");
pos += length + 1;
- changeNumber = new ChangeNumber(changenumberStr);
+ changeNumber = new ChangeNumber(changeNumberStr);
/* read the assured information */
- if (encodedMsg[pos++] == 1)
- assuredFlag = true;
- else
- assuredFlag = false;
+ assuredFlag = encodedMsg[pos++] == 1;
/* read the dn */
length = getNextLength(encodedMsg, pos);
@@ -628,7 +622,7 @@
/**
* Decode a provided byte array as a list of RawAttribute.
* @param in The provided byte array.
- * @return The list of Rawattribute objects.
+ * @return The list of RawAttribute objects.
* @throws LDAPException when it occurs.
* @throws ASN1Exception when it occurs.
*/
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
index 2b7a27b..0cb208c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -331,14 +331,13 @@
public byte[] getBytes_V45(short reqProtocolVersion)
throws UnsupportedEncodingException
{
- int bodyLength = 0;
byte[] byteNewSuperior = null;
byte[] byteNewSuperiorId = null;
// calculate the length necessary to encode the parameters
byte[] byteNewRdn = newRDN.getBytes("UTF-8");
- bodyLength = byteNewRdn.length + 1 + 1;
+ int bodyLength = byteNewRdn.length + 1 + 1;
if (newSuperior != null)
{
@@ -438,10 +437,7 @@
pos += length + 1;
/* get the deleteoldrdn flag */
- if (in[pos] == 0)
- deleteOldRdn = false;
- else
- deleteOldRdn = true;
+ deleteOldRdn = in[pos] != 0;
pos++;
// For easiness (no additional method), simply compare PDU type to
@@ -501,10 +497,7 @@
pos += length + 1;
/* get the deleteoldrdn flag */
- if (in[pos] == 0)
- deleteOldRdn = false;
- else
- deleteOldRdn = true;
+ deleteOldRdn = in[pos] != 0;
pos++;
// Read mods len
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
index 92be511..65c031d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -168,21 +168,6 @@
}
if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
{
- String mods = "";
- try
- {
- ArrayList<RawModification> ldapmods = decodeRawMods(encodedMods);
-
- for (RawModification mod : ldapmods)
- {
- mods += mod.toString();
- }
- } catch (LDAPException e)
- {
- } catch (ASN1Exception e)
- {
- }
-
return "ModifyMsg content: " +
" protocolVersion: " + protocolVersion +
" dn: " + dn +
@@ -191,8 +176,8 @@
" assuredFlag: " + assuredFlag +
" assuredMode: " + assuredMode +
" safeDataLevel: " + safeDataLevel +
- " size: " + encodedMods.length +
- mods;
+ " size: " + encodedMods.length;
+ /* Do not append mods, they can be too long */
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index 138b2b1..c20d33a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -45,7 +46,7 @@
/**
* This message is part of the replication protocol.
* RS1 sends a MonitorRequestMessage to RS2 to requests its monitoring
- * informations.
+ * information.
* When RS2 receives a MonitorRequestMessage from RS1, RS2 responds with a
* MonitorMsg.
*/
@@ -64,7 +65,7 @@
/**
* Data structure to manage the state of this replication server
- * and the state informations for the servers connected to it.
+ * and the state information for the servers connected to it.
*
*/
class SubTopoMonitorData
@@ -120,7 +121,7 @@
}
/**
- * Sets the informations of an LDAP server.
+ * Sets the information of an LDAP server.
* @param serverId The serverID.
* @param state The server state.
* @param approxFirstMissingDate The approximation of the date
@@ -313,8 +314,7 @@
}
asn1Reader.readEndSequence();
} catch(Exception e)
- {
-
+ { /* do nothing */
}
}
@@ -374,7 +374,7 @@
}
writer.writeEndSequence();
- // then the LDAP server datas
+ // then the LDAP server data
Set<Integer> servers = data.ldapStates.keySet();
for (Integer sid : servers)
{
@@ -512,11 +512,10 @@
sd.state.toString() + "]" +
" afmd=" + sd.approxFirstMissingDate + "]";
}
- String me = this.getClass().getCanonicalName() +
+ return this.getClass().getCanonicalName() +
"[ sender=" + this.senderID +
" destination=" + this.destination +
" data=[" + stateS + "]" +
"]";
- return me;
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index 2c95416..97f4b1c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -121,8 +121,7 @@
*/
public static short minWithCurrent(short version)
{
- short newVersion = (version < currentVersion ? version : currentVersion);
- return newVersion;
+ return (version < currentVersion ? version : currentVersion);
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
index 97409cb..3c185a2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
- * Portions copyright 2011 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -136,8 +136,8 @@
}
else
{
- this.sslCipherSuites = new String[sslProtocols.size()];
- sslProtocols.toArray(this.sslCipherSuites);
+ this.sslCipherSuites = new String[sslCipherSuites.size()];
+ sslCipherSuites.toArray(this.sslCipherSuites);
}
this.sslEncryption = sslEncryption;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index 144a478..56a0215 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -140,7 +141,7 @@
* @param buffer The encode form of the ReplicationMsg.
* @param version The version to use to decode the msg.
*
- * @return The generated SycnhronizationMessage.
+ * @return The generated SynchronizationMessage.
*
* @throws DataFormatException If the encoded form was not a valid msg.
* @throws UnsupportedEncodingException If UTF8 is not supported.
@@ -153,7 +154,7 @@
throws DataFormatException, UnsupportedEncodingException,
NotSupportedOldVersionPDUException
{
- ReplicationMsg msg = null;
+ ReplicationMsg msg;
switch (buffer[0])
{
case MSG_TYPE_SERVER_START_V1:
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
index 3081c12..b1ec977 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -66,7 +67,6 @@
* Server after being connected to a replication server for a given
* replication domain.
*
- * @param baseDn The base DN.
* @param maxReceiveDelay The max receive delay for this server.
* @param maxReceiveQueue The max receive Queue for this server.
* @param maxSendDelay The max Send Delay from this server.
@@ -77,18 +77,18 @@
* @param protocolVersion The replication protocol version of the creator.
* @param generationId The generationId for this server.
* @param sslEncryption Whether to continue using SSL to encrypt messages
- * after the start messages have been exchanged.
+* after the start messages have been exchanged.
* @param groupId The group id of the DS for this DN
*/
- public ServerStartECLMsg(String baseDn, int maxReceiveDelay,
- int maxReceiveQueue, int maxSendDelay,
- int maxSendQueue, int windowSize,
- long heartbeatInterval,
- ServerState serverState,
- short protocolVersion,
- long generationId,
- boolean sslEncryption,
- byte groupId)
+ public ServerStartECLMsg(int maxReceiveDelay,
+ int maxReceiveQueue, int maxSendDelay,
+ int maxSendQueue, int windowSize,
+ long heartbeatInterval,
+ ServerState serverState,
+ short protocolVersion,
+ long generationId,
+ boolean sslEncryption,
+ byte groupId)
{
super(protocolVersion, generationId);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
index d93f93b..abcd8af 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -405,7 +406,7 @@
"\nprotocolVersion: " + protocolVersion +
"\ngenerationId: " + generationId +
"\ngroupId: " + groupId +
- "\nbaseDn: " + baseDn.toString() +
+ "\nbaseDn: " + baseDn +
"\nheartbeatInterval: " + heartbeatInterval +
"\nmaxReceiveDelay: " + maxReceiveDelay +
"\nmaxReceiveQueue: " + maxReceiveQueue +
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
index 050b6d9..99c16d2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -23,12 +23,14 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ChangeNumber;
@@ -63,8 +65,6 @@
*/
public final static short REQUEST_TYPE_EQUALS_REPL_CHANGE_NUMBER = 2;
-
-
/**
* This specifies that the request on the ECL is a PERSISTENT search
* with changesOnly = false.
@@ -82,8 +82,6 @@
*/
public final static short PERSISTENT_CHANGES_ONLY = 2;
-
-
// The type of request as defined by REQUEST_TYPE_...
private short eclRequestType;
@@ -182,10 +180,7 @@
if (excludedDNsString.length()>0)
{
String[] excludedDNsStr = excludedDNsString.split(";");
- for (String excludedDNStr : excludedDNsStr)
- {
- this.excludedServiceIDs.add(excludedDNStr);
- }
+ Collections.addAll(this.excludedServiceIDs, excludedDNsStr);
}
pos += length + 1;
@@ -219,7 +214,7 @@
@Override
public byte[] getBytes()
{
- String excludedSIDsString = new String();
+ String excludedSIDsString = "";
for (String excludedServiceID : excludedServiceIDs)
{
excludedSIDsString = excludedSIDsString.concat(excludedServiceID+";");
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
index 2ec19b7..b621cae 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -176,10 +177,8 @@
{
/* first byte is the type */
boolean foundMatchingType = false;
- for (int i = 0; i < types.length; i++)
- {
- if (types[i] == encodedMsg[0])
- {
+ for (byte type : types) {
+ if (type == encodedMsg[0]) {
foundMatchingType = true;
break;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
index f456025..21e9d8d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
- * Portions copyright 2011 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -389,13 +389,7 @@
status = ServerStatus.valueOf(in[1]);
/* Read the assured flag */
- if (in[2] == 1)
- {
- assuredFlag = true;
- } else
- {
- assuredFlag = false;
- }
+ assuredFlag = in[2] == 1;
/* Read the assured mode */
assuredMode = AssuredMode.valueOf(in[3]);
@@ -403,7 +397,7 @@
/* Read the safe data level */
safeDataLevel = in[4];
- /* Read the refferals URLs */
+ /* Read the referrals URLs */
int pos = 5;
referralsURLs = new ArrayList<String>();
while (pos < in.length)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index 16be10e..132238e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2007-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -117,13 +117,7 @@
/* Read DS assured flag */
boolean assuredFlag;
- if (in[pos++] == 1)
- {
- assuredFlag = true;
- } else
- {
- assuredFlag = false;
- }
+ assuredFlag = in[pos++] == 1;
/* Read DS assured mode */
AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
@@ -188,7 +182,7 @@
}
/* Read Protocol version */
- protocolVersion = Short.valueOf(in[pos++]);
+ protocolVersion = (short)in[pos++];
}
/* Now create DSInfo and store it in list */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
index 8b7b081..1c30cc8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
@@ -77,7 +78,7 @@
{}
/**
- * Creates a new UpdateMsg with the given informations.
+ * Creates a new UpdateMsg with the given information.
*
* @param bytes A Byte Array with the encoded form of the message.
*
@@ -157,16 +158,8 @@
@Override
public boolean equals(Object obj)
{
- if (obj != null)
- {
- if (obj.getClass() != this.getClass())
- return false;
- return changeNumber.equals(((UpdateMsg) obj).changeNumber);
- }
- else
- {
- return false;
- }
+ return obj != null && obj.getClass() == this.getClass() &&
+ changeNumber.equals(((UpdateMsg) obj).changeNumber);
}
/**
@@ -343,10 +336,7 @@
changeNumber = new ChangeNumber(changenumberStr);
/* Read the assured information */
- if (encodedMsg[pos++] == 1)
- assuredFlag = true;
- else
- assuredFlag = false;
+ assuredFlag = encodedMsg[pos++] == 1;
/* Read the assured mode */
assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 5f478fd..f0646c0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -106,7 +106,7 @@
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
private boolean shutdown = false;
private boolean done = false;
- private DirectoryThread thread = null;
+ private DirectoryThread thread;
private final Object flushLock = new Object();
private ReplicationServer replicationServer;
@@ -141,7 +141,7 @@
this.baseDn = baseDn;
trimAge = replicationServer.getTrimAge();
queueMaxSize = queueSize;
- queueLowmark = queueSize * 1 / 5;
+ queueLowmark = queueSize / 5;
queueHimark = queueSize * 4 / 5;
queueMaxBytes = 200 * queueMaxSize;
queueLowmarkBytes = 200 * queueLowmark;
@@ -281,9 +281,7 @@
{
flush();
}
- ReplicationIterator it =
- new ReplicationIterator(serverId, db, changeNumber, this);
- return it;
+ return new ReplicationIterator(serverId, db, changeNumber, this);
}
/**
@@ -313,7 +311,7 @@
*/
public void shutdown()
{
- if (shutdown == true)
+ if (shutdown)
{
return;
}
@@ -325,14 +323,14 @@
}
synchronized (this)
- {
- while (done == false)
+ { /* Can this be replaced with thread.join() ? */
+ while (!done)
{
try
{
this.wait();
} catch (Exception e)
- {}
+ { /* do nothing */}
}
}
@@ -351,7 +349,7 @@
*/
public void run()
{
- while (shutdown == false)
+ while (!shutdown)
{
try
{
@@ -367,7 +365,9 @@
{
msgQueue.wait(1000);
} catch (InterruptedException e)
- { }
+ {
+ Thread.currentThread().interrupt();
+ }
}
}
} catch (Exception end)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index aaae7b8..26685d8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
@@ -95,7 +95,7 @@
try
{
DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
- DatabaseEntry data = new DraftCNData(draftCN,
+ DatabaseEntry data = new DraftCNData(
value, domainBaseDN, changeNumber);
// Use a transaction so that we can override durability.
@@ -625,8 +625,7 @@
try
{
String str = decodeUTF8(key.getData());
- int draftCN = new Integer(str);
- return draftCN;
+ return Integer.valueOf(str);
}
catch (Exception e)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNData.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNData.java
index 47ce17e..06d10e4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNData.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNData.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2010-2011 ForgeRock AS.
+ * Portions Copyright 2010-2013 ForgeRock AS.
*/
package org.opends.server.replication.server;
@@ -51,13 +51,12 @@
/**
* Creates a record to be stored in the DraftCNDB.
- * @param draftCN The DraftCN key.
* @param value The value (cookie).
* @param serviceID The serviceID (domain DN).
* @param changeNumber The replication change number.
*/
- public DraftCNData(int draftCN, String value,
- String serviceID, ChangeNumber changeNumber)
+ public DraftCNData(String value,
+ String serviceID, ChangeNumber changeNumber)
{
String record = value
+ FIELD_SEPARATOR + serviceID
@@ -156,8 +155,8 @@
*/
public void toString(StringBuilder buffer)
{
- buffer.append("DraftCNData : [value=" + value);
- buffer.append("] [serviceID=" + serviceID);
- buffer.append("] [changeNumber=" + changeNumber + "]");
+ buffer.append("DraftCNData : [value=").append(value);
+ buffer.append("] [serviceID=").append(serviceID);
+ buffer.append("] [changeNumber=").append(changeNumber).append("]");
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index 67b864c..0cbc1ea 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
@@ -70,10 +70,6 @@
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
- // A dedicated thread loops trim().
- // trim() : deletes from the DB a number of changes that are older than a
- // certain date.
- //
static int NO_KEY = 0;
private DraftCNDB db;
@@ -82,17 +78,21 @@
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
private boolean shutdown = false;
private boolean trimDone = false;
- private DirectoryThread thread = null;
- private ReplicationServer replicationServer;
-
+ /*
+ A dedicated thread loops trim().
+ trim() : deletes from the DB a number of changes that are older than a
+ certain date.
+ */
+ private DirectoryThread thread;
/**
- *
* The trim age in milliseconds. Changes record in the change DB that
* are older than this age are removed.
- *
*/
private long trimAge;
+ private ReplicationServer replicationServer;
+
+
/**
* Creates a new dbHandler associated to a given LDAP server.
*
@@ -113,7 +113,7 @@
firstkey = db.readFirstDraftCN();
lastkey = db.readLastDraftCN();
- // Triming thread
+ // Trimming thread
thread = new DirectoryThread(this, "Replication DraftCN db ");
thread.start();
@@ -204,7 +204,7 @@
cursor.close();
}
catch(Exception e)
- {
+ { /* do nothing */
}
}
@@ -226,9 +226,7 @@
public DraftCNDbIterator generateIterator(int startDraftCN)
throws DatabaseException, Exception
{
- DraftCNDbIterator it =
- new DraftCNDbIterator(db, startDraftCN);
- return it;
+ return new DraftCNDbIterator(db, startDraftCN);
}
/**
@@ -236,7 +234,7 @@
*/
public void shutdown()
{
- if (shutdown == true)
+ if (shutdown)
{
return;
}
@@ -248,14 +246,14 @@
}
synchronized (this)
- {
- while (trimDone == false)
+ { /* Can we just do a thread.join() ? */
+ while (!trimDone)
{
try
{
this.wait();
} catch (Exception e)
- {}
+ { /* do nothing */ }
}
}
@@ -271,7 +269,7 @@
*/
public void run()
{
- while (shutdown == false)
+ while (!shutdown)
{
try {
trim();
@@ -282,7 +280,9 @@
{
this.wait(1000);
} catch (InterruptedException e)
- { }
+ {
+ Thread.currentThread().interrupt();
+ }
}
} catch (Exception end)
{
@@ -393,7 +393,7 @@
continue;
}
- ServerState cnVector = null;
+ ServerState cnVector;
try
{
Map<String,ServerState> cnStartStates =
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 9f87191..5388c55 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
@@ -145,7 +145,7 @@
{
while (true)
{
- // wait to be resumed or shutdowned
+ // wait to be resumed or shutdown
if ((suspended) && (!shutdown))
{
synchronized(this)
@@ -224,12 +224,7 @@
ECLUpdateMsg update = null;
while (true)
{
- if (shutdown)
- {
- return;
- }
-
- if (suspended)
+ if (shutdown || suspended)
{
return;
}
@@ -267,14 +262,13 @@
{
// except if we are in persistent search
Thread.sleep(200);
- continue;
}
}
else
{
// Publish the update to the remote server using a protocol version he
// it supports
- publish(update, protocolVersion);
+ publish(update);
update = null;
}
}
@@ -292,7 +286,7 @@
/**
* Publish a change either on the protocol session or to a persistent search.
*/
- private void publish(ECLUpdateMsg msg, short reqProtocolVersion)
+ private void publish(ECLUpdateMsg msg)
throws IOException
{
if (debugEnabled())
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java
index 37a9ca0..b483b04 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.server;
@@ -47,10 +48,10 @@
public abstract class ExpectedAcksInfo
{
// The server handler of the server that sent the assured update message and
- // to whow we want to return the final ack
+ // to who we want to return the final ack
private ServerHandler requesterServerHandler = null;
- // The requested assured mode of matcching update message
+ // The requested assured mode of matching update message
private AssuredMode assuredMode = null;
/**
@@ -63,8 +64,8 @@
* This is used for concurrent access to this object by either the assured
* timeout task or the code for processing an ack for the matching update
* message. This should be set to true when the treatment of the expected
- * acks is completed or an ack timeout has occured and we are going to remove
- * this object from the map where it is stored.
+ * acks is completed or an ack timeout has occurred and we are going to
+ * remove this object from the map where it is stored.
*/
private boolean completed = false;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
index 26bcd5f..5e562f6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions copyright 2011 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -87,15 +87,16 @@
private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
// DS safe data level (relevant if assured mode is safe data)
private byte safeDataLevel = (byte) -1;
- // The prococol version
+ // The protocol version
private short protocolVersion = -1;
private Set<String> eclInclude = new HashSet<String>();
private Set<String> eclIncludeForDeletes = new HashSet<String>();
/**
- * Creates a new LighweightServerHandler with the provided serverid, connected
- * to the remote Replication Server represented by replServerHandler.
+ * Creates a new LightweightServerHandler with the provided serverid,
+ * connected to the remote Replication Server represented by
+ * replServerHandler.
*
* @param replServerHandler The server handler of the RS this remote DS is
* connected to
@@ -150,11 +151,9 @@
*/
public DSInfo toDSInfo()
{
- DSInfo dsInfo = new DSInfo(serverId, replicationServerId, generationId,
+ return new DSInfo(serverId, replicationServerId, generationId,
status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
eclInclude, eclIncludeForDeletes, protocolVersion);
-
- return dsInfo;
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 7a924c6..036cc3c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -180,7 +180,7 @@
while ((msgQueue.count() > maxQueueSize) ||
(msgQueue.bytesCount() > maxQueueBytesSize))
{
- setFollowing(false);
+ following = false;
msgQueue.removeFirst();
}
}
@@ -272,9 +272,9 @@
protected UpdateMsg getNextMessage(boolean synchronous)
{
UpdateMsg msg;
- while (activeConsumer == true)
+ while (activeConsumer)
{
- if (following == false)
+ if (!following)
{
/* this server is late with regard to some other masters
* in the topology or just joined the topology.
@@ -376,7 +376,7 @@
if ((msgQueue.count() < maxQueueSize) &&
(msgQueue.bytesCount() < maxQueueBytesSize))
{
- setFollowing(true);
+ following = true;
}
}
} else
@@ -392,7 +392,7 @@
if (msgQueue.contains(msg))
{
/* we finally catch up with the regular queue */
- setFollowing(true);
+ following = true;
lateQueue.clear();
UpdateMsg msg1;
do
@@ -417,11 +417,11 @@
}
synchronized (msgQueue)
{
- if (following == true)
+ if (following)
{
try
{
- while (msgQueue.isEmpty() && (following == true))
+ while (msgQueue.isEmpty() && following)
{
if (!synchronous)
return null;
@@ -465,7 +465,7 @@
ChangeNumber result = null;
synchronized (msgQueue)
{
- if (isFollowing())
+ if (following)
{
if (msgQueue.isEmpty())
{
@@ -479,13 +479,14 @@
{
if (lateQueue.isEmpty())
{
- // isFollowing is false AND lateQueue is empty
- // We may be at the very moment when the writer has emptyed the
- // lateQueue when it sent the last update. The writer will fill again
- // the lateQueue when it will send the next update but we are not yet
- // there. So let's take the last change not sent directly from
- // the db.
-
+ /*
+ following is false AND lateQueue is empty
+ We may be at the very moment when the writer has emptied the
+ lateQueue when it sent the last update. The writer will fill again
+ the lateQueue when it will send the next update but we are not yet
+ there. So let's take the last change not sent directly from
+ the db.
+ */
ReplicationIteratorComparator comparator =
new ReplicationIteratorComparator();
SortedSet<ReplicationIterator> iteratorSortedSet =
@@ -500,9 +501,11 @@
// get an iterator in this server db from that last change
ReplicationIterator iterator =
replicationServerDomain.getChangelogIterator(serverId, lastCsn);
- // if that iterator has changes, then it is a candidate
- // it is added in the sorted list at a position given by its
- // current change (see ReplicationIteratorComparator).
+ /*
+ if that iterator has changes, then it is a candidate
+ it is added in the sorted list at a position given by its
+ current change (see ReplicationIteratorComparator).
+ */
if (iterator != null)
{
if (iterator.getChange() != null)
@@ -558,7 +561,7 @@
* When the server is up to date or close to be up to date,
* the number of updates to be sent is the size of the receive queue.
*/
- if (isFollowing())
+ if (following)
return msgQueue.count();
else
{
@@ -622,16 +625,6 @@
}
/**
- * Check if the LDAP server can follow the speed of the other servers.
- * @return true when the server has all the not yet sent changes
- * in its queue.
- */
- public boolean isFollowing()
- {
- return following;
- }
-
- /**
* Set that the consumer is now becoming inactive and thus getNextMessage
* should not return any UpdateMsg any more.
* @param active the provided state of the consumer.
@@ -641,14 +634,6 @@
this.activeConsumer = active;
}
- /**
- * Set the following flag of this server.
- * @param following the value that should be set.
- */
- private void setFollowing(boolean following)
- {
- this.following = following;
- }
/**
* Set the initial value of the serverState for this handler.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java
index d894aec..adb7e8b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitorData.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2012 ForgeRock AS
+ * Portions Copyright 2012-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -84,11 +84,6 @@
private ConcurrentHashMap<Integer, Long> missingChanges =
new ConcurrentHashMap<Integer, Long>();
- // For each RS server, an approximation of the date of the first missing
- // change
- private ConcurrentHashMap<Integer, Long> fmRSDate =
- new ConcurrentHashMap<Integer, Long>();
-
private ConcurrentHashMap<Integer, Long> missingChangesRS =
new ConcurrentHashMap<Integer, Long>();
@@ -162,47 +157,38 @@
// Regarding each other LSj
// Sum the difference : max(LSj) - state(LSi)
- Iterator<Integer> lsiStateItr = this.LDAPStates.keySet().iterator();
- while (lsiStateItr.hasNext())
- {
- Integer lsiSid = lsiStateItr.next();
+ for (Integer lsiSid : this.LDAPStates.keySet()) {
ServerState lsiState = this.LDAPStates.get(lsiSid);
- Long lsiMissingChanges = (long)0;
- if (lsiState != null)
- {
- Iterator<Integer> lsjMaxItr = this.maxCNs.keySet().iterator();
- while (lsjMaxItr.hasNext())
- {
- Integer lsjSid = lsjMaxItr.next();
+ Long lsiMissingChanges = (long) 0;
+ if (lsiState != null) {
+ for (Integer lsjSid : this.maxCNs.keySet()) {
ChangeNumber lsjMaxCN = this.maxCNs.get(lsjSid);
ChangeNumber lsiLastCN = lsiState.getMaxChangeNumber(lsjSid);
int missingChangesLsiLsj =
- ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN);
+ ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN);
- if (debugEnabled())
- {
+ if (debugEnabled()) {
mds +=
- "+ diff("+lsjMaxCN+"-"
- +lsiLastCN+")="+missingChangesLsiLsj;
+ "+ diff(" + lsjMaxCN + "-"
+ + lsiLastCN + ")=" + missingChangesLsiLsj;
}
-
- // Regarding a DS that is generating changes. If it is a local DS1,
- // we get its server state, store it, then retrieve server states of
- // remote DSs. When a remote server state is coming, it may contain
- // a change number for DS1 which is newer than the one we locally
- // stored in the server state of DS1. To prevent seeing DS1 has
- // missing changes whereas it is wrong, we replace the value with 0
- // if it is a low value. We cannot overwrite big values as they may be
- // useful for a local server retrieving changes it generated earlier,
- // when it is recovering from an old snapshot and the local RS is
- // sending him the changes it is missing.
+ /*
+ Regarding a DS that is generating changes. If it is a local DS1,
+ we get its server state, store it, then retrieve server states of
+ remote DSs. When a remote server state is coming, it may contain
+ a change number for DS1 which is newer than the one we locally
+ stored in the server state of DS1. To prevent seeing DS1 has
+ missing changes whereas it is wrong, we replace the value with 0
+ if it is a low value. We cannot overwrite big values as they may be
+ useful for a local server retrieving changes it generated earlier,
+ when it is recovering from an old snapshot and the local RS is
+ sending him the changes it is missing.
+ */
if (lsjSid.equals(lsiSid)) {
- if (missingChangesLsiLsj <= 50)
- {
+ if (missingChangesLsiLsj <= 50) {
missingChangesLsiLsj = 0;
- if (debugEnabled())
- {
+ if (debugEnabled()) {
mds += " (diff replaced by 0 as for server id " + lsiSid + ")";
}
}
@@ -211,11 +197,10 @@
lsiMissingChanges += missingChangesLsiLsj;
}
}
- if (debugEnabled())
- {
+ if (debugEnabled()) {
mds += "=" + lsiMissingChanges;
}
- this.missingChanges.put(lsiSid,lsiMissingChanges);
+ this.missingChanges.put(lsiSid, lsiMissingChanges);
}
// Computes the missing changes counters for RS :
@@ -227,21 +212,17 @@
Long lsiMissingChanges = (long)0;
if (lsiState != null)
{
- Iterator<Integer> lsjMaxItr = this.maxCNs.keySet().iterator();
- while (lsjMaxItr.hasNext())
- {
- int lsjSid = lsjMaxItr.next();
+ for (Integer lsjSid : this.maxCNs.keySet()) {
ChangeNumber lsjMaxCN = this.maxCNs.get(lsjSid);
ChangeNumber lsiLastCN = lsiState.getMaxChangeNumber(lsjSid);
int missingChangesLsiLsj =
- ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN);
+ ChangeNumber.diffSeqNum(lsjMaxCN, lsiLastCN);
- if (debugEnabled())
- {
+ if (debugEnabled()) {
mds +=
- "+ diff("+lsjMaxCN+"-"
- +lsiLastCN+")="+missingChangesLsiLsj;
+ "+ diff(" + lsjMaxCN + "-"
+ + lsiLastCN + ")=" + missingChangesLsiLsj;
}
lsiMissingChanges += missingChangesLsiLsj;
}
@@ -269,42 +250,25 @@
{
String mds = "Monitor data=\n";
- // RS data
- Iterator<Integer> rsite = fmRSDate.keySet().iterator();
- while (rsite.hasNext())
- {
- Integer sid = rsite.next();
- mds += "\nfmRSDate(" + sid + ")=\t "+ "afmd=" + fmRSDate.get(sid);
- }
-
// maxCNs
- Iterator<Integer> itc = maxCNs.keySet().iterator();
- while (itc.hasNext())
- {
- Integer sid = itc.next();
+ for (Integer sid : maxCNs.keySet()) {
ChangeNumber cn = maxCNs.get(sid);
mds += "\nmaxCNs(" + sid + ")= " + cn.toStringUI();
}
// LDAP data
- Iterator<Integer> lsite = LDAPStates.keySet().iterator();
- while (lsite.hasNext())
- {
- Integer sid = lsite.next();
+ for (Integer sid : LDAPStates.keySet()) {
ServerState ss = LDAPStates.get(sid);
mds += "\nLSData(" + sid + ")=\t" + "state=[" + ss.toString()
- + "] afmd=" + this.getApproxFirstMissingDate(sid);
+ + "] afmd=" + this.getApproxFirstMissingDate(sid);
mds += " missingDelay=" + this.getApproxDelay(sid);
- mds +=" missingCount=" + missingChanges.get(sid);
+ mds += " missingCount=" + missingChanges.get(sid);
}
// RS data
- rsite = RSStates.keySet().iterator();
- while (rsite.hasNext())
- {
- Integer sid = rsite.next();
+ for (Integer sid : RSStates.keySet()) {
ServerState ss = RSStates.get(sid);
mds += "\nRSData(" + sid + ")=\t" + "state=[" + ss.toString()
+ "] missingCount=" + missingChangesRS.get(sid);
@@ -321,10 +285,7 @@
*/
public void setMaxCNs(ServerState state)
{
- Iterator<Integer> it = state.iterator();
- while (it.hasNext())
- {
- int sid = it.next();
+ for (Integer sid : state) {
ChangeNumber newCN = state.getMaxChangeNumber(sid);
setMaxCN(sid, newCN);
}
@@ -352,17 +313,6 @@
}
/**
- * Get the highest know change number of the LDAP server with the provided
- * serverId.
- * @param serverId The server ID.
- * @return The highest change number.
- */
- public ChangeNumber getMaxCN(int serverId)
- {
- return maxCNs.get(serverId);
- }
-
- /**
* Get the state of the LDAP server with the provided serverId.
* @param serverId The server ID.
* @return The server state.
@@ -454,9 +404,7 @@
*/
public long getRSApproxFirstMissingDate(int serverId)
{
- Long res;
- if ((res = fmRSDate.get(serverId)) != null)
- return res;
+ // For now, we do store RS first missing change date
return 0;
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
index 1d4c14f..0f10959 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -184,7 +184,7 @@
{
int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
int n = 0;
- while ((done == false) && (this.isAlive()))
+ while ((!done) && (this.isAlive()))
{
Thread.sleep(50);
n++;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
index 68f8a28..ae96d4e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.server;
@@ -93,7 +94,7 @@
System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
int maxLen = bytes.length;
- int pos = -1;
+ int pos;
int nZeroFound = 0; // Number of 0 value found
boolean found = false;
@@ -143,7 +144,6 @@
System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
maxLen = bytes.length;
- pos = -1;
nZeroFound = 0; // Number of 0 value found
found = false;
@@ -183,13 +183,6 @@
} else
{
- if (!(realUpdateMsg instanceof UpdateMsg))
- {
- // Should never happen
- throw new UnsupportedEncodingException(
- "Unknown underlying real message type.");
- }
-
/**
* Prepare VLATEST serialized form of the message:
* Get the encoding form of the real message then overwrite the assured
@@ -204,7 +197,7 @@
System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
int maxLen = bytes.length;
- int pos = -1;
+ int pos;
int nZeroFound = 0; // Number of 0 value found
boolean found = false;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index aa2fa28..b7c58a7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -151,11 +151,6 @@
private ReplicationServer server;
/**
- * The configuration of this backend.
- */
- private BackendCfg cfg;
-
- /**
* The number of milliseconds between job progress reports.
*/
private long progressInterval = 10000;
@@ -214,7 +209,7 @@
if (config != null)
{
Validator.ensureTrue(config instanceof BackendCfg);
- cfg = (BackendCfg)config;
+ BackendCfg cfg = (BackendCfg) config;
DN[] newBaseDNs = new DN[cfg.getBaseDN().size()];
cfg.getBaseDN().toArray(newBaseDNs);
setBaseDNs(newBaseDNs);
@@ -655,7 +650,7 @@
attrs);
ldifWriter.writeChangeRecord(changeRecord);
}
- catch (Exception e) {}
+ catch (Exception e) { /* do nothing */ }
for (ReplicationServerDomain exportContainer : exportContainers)
{
@@ -718,7 +713,7 @@
for (int serverId : rsd.getServers())
{
if (exportConfig != null && exportConfig.isCancelled())
- {
+ { // Abort if cancelled
break;
}
@@ -764,7 +759,7 @@
while (ri.getChange() != null)
{
if (exportConfig != null && exportConfig.isCancelled())
- {
+ { // abort if cancelled
break;
}
if (searchOperation != null)
@@ -988,8 +983,7 @@
LDIFWriter ldifWriter2 = writer.getLDIFWriter();
ldifWriter2.writeChangeRecord(changeRecord);
LDIFReader reader = writer.getLDIFReader();
- Entry modDNEntry = reader.readEntry();
- entry = modDNEntry;
+ entry = reader.readEntry();
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index fe5b442..2536033 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -23,9 +23,11 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
+
+import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -75,7 +77,7 @@
//
// A counter record has to follow the order of the db, so it needs to have
// a changenumber key that follow the order.
- // A counter record must have its own chagenumber key since the Db does not
+ // A counter record must have its own changenumber key since the Db does not
// support duplicate key (it is a compatibility breaker character of the DB).
//
// We define 2 conditions to store a counter record :
@@ -813,9 +815,10 @@
return null;
}
+ ChangeNumber cn = null;
try
{
- ChangeNumber cn = new ChangeNumber(
+ cn = new ChangeNumber(
decodeUTF8(key.getData()));
if (ReplicationDB.isaCounter(cn))
{
@@ -833,9 +836,14 @@
* happen if the database is corrupted. There is not much more that we
* can do at this point except trying to continue with the next
* record. In such case, it is therefore possible that we miss some
- * changes. TODO. log an error message. TODO : REPAIR : Such problem
- * should be handled by the repair functionality.
+ * changes.
+ * TODO : This should be handled by the repair functionality.
*/
+ Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD
+ .get(replicationServer.getServerId(),
+ (cn == null ? "" : cn.toString()),
+ e.getMessage());
+ logError(message);
}
}
return currentChange;
@@ -1085,7 +1093,6 @@
/**
* Encode the provided counter value in a database entry.
- * @param entry The provided entry.
* @return The database entry with the counter value encoded inside.
*/
static private DatabaseEntry encodeCounterValue(int value)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index ecd0b73..f14be05 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
import org.opends.messages.*;
@@ -168,9 +168,6 @@
if (str[0].equals(GENERATION_ID_TAG))
{
long generationId;
-
- String baseDn;
-
try
{
// <generationId>
@@ -186,7 +183,7 @@
+ "<" + str[1] + ">"));
}
- baseDn = str[2];
+ String baseDn = str[2];
if (debugEnabled())
TRACER.debugInfo(
@@ -213,7 +210,7 @@
status = cursor.getFirst(key, data, LockMode.DEFAULT);
while (status == OperationStatus.SUCCESS)
{
- String stringData = null;
+ String stringData;
try
{
stringData = new String(data.getData(), "UTF-8");
@@ -234,7 +231,7 @@
String[] str = stringData.split(FIELD_SEPARATOR, 2);
if (!str[0].equals(GENERATION_ID_TAG))
{
- int serverId = -1;
+ int serverId;
try
{
// <serverId>
@@ -561,14 +558,14 @@
txn.abort();
}
catch(Exception e)
- {}
+ { /* do nothing */ }
}
}
/**
* Get or create a db to manage integer change number associated
* to multidomain server state.
- * TODO:ECL how to manage compatibilty of this db with new domains
+ * TODO:ECL how to manage compatibility of this db with new domains
* added or removed ?
* @return the retrieved or created db.
* @throws DatabaseException when a problem occurs.
@@ -583,8 +580,7 @@
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(true);
- Database db = dbEnvironment.openDatabase(null, stringId, dbConfig);
- return db;
+ return dbEnvironment.openDatabase(null, stringId, dbConfig);
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
index e659974..29c086b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -106,7 +106,7 @@
*/
public boolean next()
{
- boolean hasNext = false;
+ boolean hasNext;
currentChange = cursor.next(); // can return null
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 614efdd..0e75c5e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -75,7 +75,7 @@
/**
* ReplicationServer Listener. This singleton is the main object of the
- * replication server It waits for the incoming connections and create listener
+ * replication server. It waits for the incoming connections and create listener
* and publisher objects for connection with LDAP servers and with replication
* servers It is responsible for creating the replication server
* replicationServerDomain and managing it
@@ -251,7 +251,7 @@
{
backendConfigEntryDN = DN.decode(
"ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
- } catch (Exception e) {}
+ } catch (Exception e) { /* do nothing */ }
// Creates the backend associated to this ReplicationServer
// if it does not exist.
@@ -293,7 +293,7 @@
listenSocket.getLocalPort());
logError(listenMsg);
- while ((shutdown == false) && (stopListen == false))
+ while (!shutdown && !stopListen)
{
// Wait on the replicationServer port.
// Read incoming messages and create LDAP or ReplicationServer listener
@@ -365,7 +365,7 @@
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- if (shutdown == false) {
+ if (!shutdown) {
Message message =
ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
logError(message);
@@ -1561,9 +1561,7 @@
public ExternalChangeLogSession createECLSession(StartECLSessionMsg msg)
throws DirectoryException
{
- ExternalChangeLogSessionImpl session =
- new ExternalChangeLogSessionImpl(this, msg);
- return session;
+ return new ExternalChangeLogSessionImpl(this, msg);
}
/**
@@ -1623,16 +1621,9 @@
{
InetAddress localAddr = InetAddress.getLocalHost();
- if (localPorts.contains(port)
+ return localPorts.contains(port)
&& (InetAddress.getByName(hostname).isLoopbackAddress() ||
- InetAddress.getByName(hostname).equals(localAddr)))
- {
- return true;
- }
- else
- {
- return false;
- }
+ InetAddress.getByName(hostname).equals(localAddr));
} catch (UnknownHostException e)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 3505af8..88c9487 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -677,20 +677,14 @@
sourceHandler.sendAck(ack);
} else
{
- if (safeDataLevel != (byte) 0)
- {
- /**
- * level > 1 : We need further acks
- * The message will be posted in assured mode to eligible
- * servers. The embedded safe data level is not changed, and his
- * value will be used by a remote RS to determine if he must send
- * an ack (level > 1) or not (level = 1)
- */
- interestedInAcks = true;
- } else
- {
- // Should never happen
- }
+ /**
+ * level > 1 : We need further acks
+ * The message will be posted in assured mode to eligible
+ * servers. The embedded safe data level is not changed, and his
+ * value will be used by a remote RS to determine if he must send
+ * an ack (level > 1) or not (level = 1)
+ */
+ interestedInAcks = true;
}
} else
{ // A RS sent us the safe data message, for sure no further ack to wait
@@ -1752,7 +1746,7 @@
monitorData.getApproxFirstMissingDate(replicaId), true);
}
- // Add the informations about the Replication Servers
+ // Add the information about the Replication Servers
// currently in the topology.
it = monitorData.rsIterator();
while (it.hasNext())
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index c62e7a6..77853a8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -70,8 +70,9 @@
* @return Whether the remote server requires encryption or not.
* @throws DirectoryException When a problem occurs.
*/
- public boolean processStartFromRemote(ReplServerStartMsg inReplServerStartMsg)
- throws DirectoryException
+ private boolean processStartFromRemote(
+ ReplServerStartMsg inReplServerStartMsg)
+ throws DirectoryException
{
try
{
@@ -358,7 +359,7 @@
{
/*
Only protocol version above V1 has a phase 2 handshake
- NOW PROCEDE WITH SECOND PHASE OF HANDSHAKE:
+ NOW PROCEED WITH SECOND PHASE OF HANDSHAKE:
TopologyMsg then TopologyMsg (with a RS)
wait and process Topo from remote RS
*/
@@ -524,21 +525,15 @@
// Remote RS sent his topo msg
TopologyMsg inTopoMsg = (TopologyMsg) msg;
- // Store remore RS weight if it has one
+ /* Store remote RS weight if it has one.
+ * For protocol version < 4, use default value of 1 for weight
+ */
if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// List should only contain RS info for sender
RSInfo rsInfo = inTopoMsg.getRsList().get(0);
weight = rsInfo.getWeight();
}
- else
- {
- /*
- Remote RS uses protocol version prior to 4 : use default value for
- weight: 1
- */
- }
-
/*
if the remote RS and the local RS have the same genID
then it's ok and nothing else to do
@@ -569,11 +564,10 @@
private void checkGenerationId()
{
if (localGenerationId > 0)
- {
- // if the local RS is initialized
+ { // the local RS is initialized
if (generationId > 0)
- {
- // if the remote RS is initialized
+ { // the remote RS is initialized.
+ // If not, there's nothing to do anyway.
if (generationId != localGenerationId)
{
// if the 2 RS have different generationID
@@ -621,13 +615,6 @@
}
}
}
- else
- {
- /*
- The remote RS has no genId. We don't change anything for the
- current RS.
- */
- }
}
else
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
index 5f55904..305196b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.server;
@@ -106,14 +107,10 @@
return false;
} else
{
-
// Mark this ack received for the server
expectedServersAckStatus.put(ackingServerId, true);
numReceivedAcks++;
- if (numReceivedAcks == safeDataLevel)
- return true;
- else
- return false;
+ return numReceivedAcks == safeDataLevel;
}
}
@@ -128,7 +125,7 @@
{
// Fill collected errors info
ack.setHasTimeout(true);
- // Tell wich servers did not send an ack in time
+ // Tell which servers did not send an ack in time
List<Integer> failedServers = new ArrayList<Integer>();
Set<Integer> serverIds = expectedServersAckStatus.keySet();
serversInTimeout = new ArrayList<Integer>(); // Use next loop to fill it
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 20d7bd3..af1f1ba 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -170,11 +170,12 @@
/**
* The associated ServerWriter that sends messages to the remote server.
*/
- protected ServerReader reader;
+ protected ServerWriter writer = null;
+
/**
* The associated ServerReader that receives messages from the remote server.
*/
- protected ServerWriter writer = null;
+ protected ServerReader reader;
// window
private int rcvWindow;
@@ -366,7 +367,7 @@
session.setSoTimeout(0);
}
catch(Exception e)
- {
+ { /* do nothing */
}
// sendWindow MUST be created before starting the writer
@@ -1148,10 +1149,9 @@
*/
public RSInfo toRSInfo()
{
- RSInfo rsInfo = new RSInfo(serverId, serverURL, generationId, groupId,
- weight);
- return rsInfo;
+ return new RSInfo(serverId, serverURL, generationId, groupId,
+ weight);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java
index 38f8465..5d94ac5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2011 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.service;
import org.opends.messages.Message;
@@ -96,7 +96,7 @@
// queue
while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
{
- if (repDomain.processUpdate(updateMsg) == true)
+ if (repDomain.processUpdate(updateMsg))
{
repDomain.processUpdateDoneSynchronous(updateMsg);
}
@@ -138,7 +138,7 @@
{
int FACTOR = 40; // Wait for 2 seconds before interrupting the thread
int n = 0;
- while ((done == false) && (this.isAlive()))
+ while (!done && this.isAlive())
{
Thread.sleep(50);
n++;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplInputStream.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplInputStream.java
index 37b0882..c1a5452 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplInputStream.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplInputStream.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.service;
@@ -124,10 +125,7 @@
copiedLength = len;
}
- for (int i =0; i<copiedLength; i++)
- {
- b[off+i] = bytes[index+i];
- }
+ System.arraycopy(bytes, index, b, off, copiedLength);
index += copiedLength;
if (index == bytes.length)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 346e137..5dea96a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -1234,7 +1234,7 @@
}
else
{
- serverStartMsg = new ServerStartECLMsg(baseDn, 0, 0, 0, 0,
+ serverStartMsg = new ServerStartECLMsg(0, 0, 0, 0,
maxRcvWindow, heartbeatInterval, state,
ProtocolVersion.getCurrentVersion(),
this.getGenerationID(), isSslEncryption, groupId);
@@ -2605,13 +2605,14 @@
{
synchronized (monitorResponse)
{
- if (monitorResponse.get() == false)
+ if (!monitorResponse.get())
{
monitorResponse.wait(10000);
}
}
} catch (InterruptedException e)
{
+ Thread.currentThread().interrupt();
}
return replicaStates;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index f9ab3eb..8ceab31 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2012 ForgeRock AS
+ * Portions Copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -39,6 +39,7 @@
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -165,8 +166,8 @@
* to be able to correlate all the coming back acks to the original
* operation.
*/
- private final SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs =
- new TreeMap<ChangeNumber, UpdateMsg>();
+ private final Map<ChangeNumber, UpdateMsg> waitingAckMsgs =
+ new ConcurrentHashMap<ChangeNumber, UpdateMsg>();
/**
@@ -243,7 +244,7 @@
// that have not been successfully acknowledged (either because of timeout,
// wrong status or error at replay) for a particular server (DS or RS). String
// format: <server id>:<number of failed updates>
- private Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
+ private final Map<Integer, Integer> assuredSrServerNotAcknowledgedUpdates =
new HashMap<Integer,Integer>();
// Number of updates received in Assured Mode, Safe Read request
private AtomicInteger assuredSrReceivedUpdates = new AtomicInteger(0);
@@ -264,7 +265,7 @@
// Multiple values allowed: number of updates sent in Assured Mode, Safe Data,
// that have not been successfully acknowledged because of timeout for a
// particular RS. String format: <server id>:<number of failed updates>
- private Map<Integer, Integer> assuredSdServerTimeoutUpdates =
+ private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
new HashMap<Integer,Integer>();
/**
@@ -278,10 +279,12 @@
/* Status related monitoring fields */
- // Indicates the date when the status changed. This may be used to indicate
- // the date the session with the current replication server started (when
- // status is NORMAL for instance). All the above assured monitoring fields
- // are also reset each time the status is changed
+ /**
+ * Indicates the date when the status changed. This may be used to indicate
+ * the date the session with the current replication server started (when
+ * status is NORMAL for instance). All the above assured monitoring fields
+ * are also reset each time the status is changed
+ */
private Date lastStatusChangeDate = new Date();
/**
@@ -583,7 +586,7 @@
}
/**
- * Returns informations about the DS server related to the provided serverId.
+ * Returns information about the DS server related to the provided serverId.
* based on the TopologyMsg we received when the remote replica connected or
* disconnected. Return null when no server with the provided serverId is
* connected.
@@ -696,8 +699,7 @@
*/
public void setURLs(Set<String> referralsUrl)
{
- for (String url : referralsUrl)
- this.refUrls.add(url);
+ this.refUrls.addAll(referralsUrl);
}
/**
@@ -793,11 +795,12 @@
// Another server is exporting its entries to us
InitializeTargetMsg initTargetMsg = (InitializeTargetMsg) msg;
- // This must be done while we are still holding the
- // broker lock because we are now going to receive a
- // bunch of entries from the remote server and we
- // want the import thread to catch them and
- // not the ListenerThread.
+ /*
+ This must be done while we are still holding the broker lock
+ because we are now going to receive a bunch of entries from the
+ remote server and we want the import thread to catch them and
+ not the ListenerThread.
+ */
initialize(initTargetMsg, initTargetMsg.getSenderID());
}
else if (msg instanceof ErrorMsg)
@@ -805,15 +808,16 @@
ErrorMsg errorMsg = (ErrorMsg)msg;
if (ieContext != null)
{
- // This is an error termination for the 2 following cases :
- // - either during an export
- // - or before an import really started
- // For example, when we publish a request and the
- // replicationServer did not find the import source.
- //
- // A remote error during the import will be received in the
- // receiveEntryBytes() method.
- //
+ /*
+ This is an error termination for the 2 following cases :
+ - either during an export
+ - or before an import really started
+ For example, when we publish a request and the
+ replicationServer did not find the import source.
+
+ A remote error during the import will be received in the
+ receiveEntryBytes() method.
+ */
if (debugEnabled())
TRACER.debugInfo(
"[IE] processErrorMsg:" + this.serverID +
@@ -827,9 +831,11 @@
}
else
{
- // Simply log - happen when the ErrorMsg relates to a previous
- // attempt of initialization while we have started a new one
- // on this side.
+ /*
+ Simply log - happen when the ErrorMsg relates to a previous
+ attempt of initialization while we have started a new one
+ on this side.
+ */
logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails()));
}
}
@@ -864,15 +870,17 @@
{
// just retry
}
- // Test if we have received and export request message and
- // if that's the case handle it now.
- // This must be done outside of the portion of code protected
- // by the broker lock so that we keep receiveing update
- // when we are doing and export and so that a possible
- // closure of the socket happening when we are publishing the
- // entries to the remote can be handled by the other
- // replay thread when they call this method and therefore the
- // broker.receive() method.
+ /*
+ Test if we have received and export request message and
+ if that's the case handle it now.
+ This must be done outside of the portion of code protected
+ by the broker lock so that we keep receiving update
+ when we are doing and export and so that a possible
+ closure of the socket happening when we are publishing the
+ entries to the remote can be handled by the other
+ replay thread when they call this method and therefore the
+ broker.receive() method.
+ */
if (initReqMsg != null)
{
// Do this work in a thread to allow replay thread continue working
@@ -898,8 +906,8 @@
* particular server in the list. This increments the counter of error for the
* passed server, or creates an initial value of 1 error for it if the server
* is not yet present in the map.
- * @param errorList
- * @param sid
+ * @param errorsByServer map of number of errors per serverID
+ * @param sid the ID of the server which produced an error
*/
private void updateAssuredErrorsByServer(Map<Integer,Integer> errorsByServer,
Integer sid)
@@ -916,7 +924,7 @@
{
// Server already present in list, just increment number of
// errors for the server
- int val = serverErrCount.intValue();
+ int val = serverErrCount;
val++;
errorsByServer.put(sid, val);
}
@@ -935,10 +943,7 @@
// Remove the message for pending ack list (this may already make the thread
// that is waiting for the ack be aware of its reception)
- synchronized (waitingAckMsgs)
- {
- update = waitingAckMsgs.remove(changeNumber);
- }
+ update = waitingAckMsgs.remove(changeNumber);
// Signal waiting thread ack has been received
if (update != null)
@@ -957,8 +962,10 @@
if ( hasTimeout || hasReplayErrors || hasWrongStatus)
{
- // Some problems detected: message not correclty reached every requested
- // servers. Log problem
+ /*
+ Some problems detected: message did not correctly reach every
+ requested servers. Log problem
+ */
Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(
serviceID, Integer.toString(serverID),
update.toString(), ack.errorsToString());
@@ -1021,27 +1028,6 @@
}
}
- /**
- * Retrieves a replication domain based on the baseDn.
- *
- * @param serviceID The identifier of the domain to retrieve.
- *
- * @return The domain retrieved.
- *
- * @throws DirectoryException When an error occurred or no domain
- * match the provided baseDn.
- */
- static ReplicationDomain retrievesReplicationDomain(String serviceID)
- throws DirectoryException
- {
- ReplicationDomain replicationDomain = domains.get(serviceID);
- if (replicationDomain == null)
- {
- throw new DirectoryException(ResultCode.OTHER,
- ERR_NO_MATCHING_DOMAIN.get(serviceID));
- }
- return replicationDomain;
- }
/*
* After this point the code is related to the Total Update.
@@ -1051,7 +1037,7 @@
* This thread is launched when we want to export data to another server.
*
* When a task is created locally (so this local server is the initiator)
- * of the export (Exemple: dsreplication initialize-all),
+ * of the export (Example: dsreplication initialize-all),
* this thread is NOT used but the task thread is running the export instead).
*/
private class ExportThread extends DirectoryThread
@@ -1095,9 +1081,11 @@
initWindow);
} catch (DirectoryException de)
{
- // An error message has been sent to the peer
- // This server is not the initiator of the export so there is
- // nothing more to do locally.
+ /*
+ An error message has been sent to the peer
+ This server is not the initiator of the export so there is
+ nothing more to do locally.
+ */
}
if (debugEnabled())
@@ -1211,29 +1199,6 @@
/**
* Update the counters of the task for each entry processed during
* an import or export.
- * @throws DirectoryException if an error occurred.
- */
- public void updateCounters()
- throws DirectoryException
- {
- entryLeftCount--;
-
- if (initializeTask != null)
- {
- if (initializeTask instanceof InitializeTask)
- {
- ((InitializeTask)initializeTask).setLeft(entryLeftCount);
- }
- else if (initializeTask instanceof InitializeTargetTask)
- {
- ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
- }
- }
- }
-
- /**
- * Update the counters of the task for each entry processed during
- * an import or export.
*
* @param entriesDone The number of entries that were processed
* since the last time this method was called.
@@ -1379,7 +1344,7 @@
* on this server, and the {@code importBackend(InputStream)}
* will be called on the remote server.
* <p>
- * The InputStream and OutpuStream given as a parameter to those
+ * The InputStream and OutputStream given as a parameter to those
* methods will be connected through the replication protocol.
*
* @param target The server-id of the server that should be initialized.
@@ -1394,10 +1359,7 @@
public void initializeRemote(int target, Task initTask)
throws DirectoryException
{
-
initializeRemote(target, this.serverID, initTask, this.initWindow);
-
-
}
/**
@@ -1426,10 +1388,12 @@
// Acquire and initialize the export context
acquireIEContext(false);
- // We manage the list of servers to initialize in order :
- // - to test at the end that all expected servers have reconnected
- // after their import and with the right genId
- // - to update the task with the server(s) where this test failed
+ /*
+ We manage the list of servers to initialize in order :
+ - to test at the end that all expected servers have reconnected
+ after their import and with the right genId
+ - to update the task with the server(s) where this test failed
+ */
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
@@ -1526,14 +1490,15 @@
{
try
{
- // Handling the errors during export
+ /*
+ Handling the errors during export
- // Note: we could have lost the connection and another thread
- // the listener one) has already managed to reconnect.
- // So we MUST rely on the test broker.isConnected()
- // ONLY to do 'wait to be reconnected by another thread'
- // (if not yet reconnected already).
-
+ Note: we could have lost the connection and another thread
+ the listener one) has already managed to reconnect.
+ So we MUST rely on the test broker.isConnected()
+ ONLY to do 'wait to be reconnected by another thread'
+ (if not yet reconnected already).
+ */
if (!broker.isConnected())
{
// We are still disconnected, so we wait for the listener thread
@@ -1550,14 +1515,16 @@
if ((initTask != null) && broker.isConnected() &&
(serverToInitialize != RoutableMsg.ALL_SERVERS))
{
- // NewAttempt case : In the case where
- // - it's not an InitializeAll
- // - AND the previous export attempt failed
- // - AND we are (now) connected
- // - and we own the task and this task is not an InitializeAll
- // Let's :
- // - sleep to let time to the other peer to reconnect if needed
- // - and launch another attempt
+ /*
+ NewAttempt case : In the case where
+ - it's not an InitializeAll
+ - AND the previous export attempt failed
+ - AND we are (now) connected
+ - and we own the task and this task is not an InitializeAll
+ Let's :
+ - sleep to let time to the other peer to reconnect if needed
+ - and launch another attempt
+ */
try { Thread.sleep(1000); } catch(Exception e){}
logError(NOTE_RESENDING_INIT_TARGET.get(
exportRootException.getLocalizedMessage()));
@@ -1632,8 +1599,7 @@
int waitResultAttempt = 0;
Set<Integer> replicasWeAreWaitingFor = new HashSet<Integer>(0);
- for (Integer sid : ieContext.startList)
- replicasWeAreWaitingFor.add(sid);
+ replicasWeAreWaitingFor.addAll(ieContext.startList);
if (debugEnabled())
TRACER.debugInfo(
@@ -1657,8 +1623,11 @@
{
// this one is still not doing the Full Update ... retry later
done = false;
- try
- { Thread.sleep(100); } catch (InterruptedException e) {}
+ try { Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
waitResultAttempt++;
break;
}
@@ -1673,8 +1642,7 @@
while ((!done) && (waitResultAttempt<1200) // 2mn
&& (!broker.shuttingDown()));
- ieContext.failureList.addAll(
- Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0])));
+ ieContext.failureList.addAll(replicasWeAreWaitingFor);
if (debugEnabled())
TRACER.debugInfo(
@@ -1697,9 +1665,11 @@
TRACER.debugInfo(
"[IE] wait for end replicasWeAreWaitingFor=" + replicasWeAreWaitingFor);
- // In case some new servers appear during the init, we want them to be
- // considered in the processing of sorting the successfully initialized
- // and the others
+ /*
+ In case some new servers appear during the init, we want them to be
+ considered in the processing of sorting the successfully initialized
+ and the others
+ */
for (DSInfo dsi : getReplicasList())
replicasWeAreWaitingFor.add(dsi.getDsId());
@@ -1709,22 +1679,25 @@
done = true;
short reconnectMaxDelayInSec = 10;
short reconnectWait = 0;
- Integer[] servers = replicasWeAreWaitingFor.toArray(new Integer[0]);
- for (int serverId : servers)
+ for (int serverId : replicasWeAreWaitingFor)
{
if (ieContext.failureList.contains(serverId))
{
- // this server has already been in error during initialization
- // dont't wait for it
+ /*
+ this server has already been in error during initialization
+ don't wait for it
+ */
continue;
}
DSInfo dsInfo = isRemoteDSConnected(serverId);
if (dsInfo == null)
{
- // this server is disconnected
- // may be for a long time if it crashed or had been stopped
- // may be just the time to reconnect after import : should be short
+ /*
+ this server is disconnected
+ may be for a long time if it crashed or had been stopped
+ may be just the time to reconnect after import : should be short
+ */
if (++reconnectWait<reconnectMaxDelayInSec)
{
// let's still wait to give a chance to this server to reconnect
@@ -1764,8 +1737,7 @@
}
while ((!done) && (!broker.shuttingDown())); // infinite wait
- ieContext.failureList.addAll(
- Arrays.asList(replicasWeAreWaitingFor.toArray(new Integer[0])));
+ ieContext.failureList.addAll(replicasWeAreWaitingFor);
if (debugEnabled())
TRACER.debugInfo(
@@ -1839,8 +1811,10 @@
}
else
{
- // When we are the exporter in the case of initializeAll
- // exporting must not be stopped on the first error.
+ /*
+ When we are the exporter in the case of initializeAll
+ exporting must not be stopped on the first error.
+ */
}
}
}
@@ -1889,7 +1863,7 @@
}
}
- // Check good sequentiality of msg received
+ // Check good ordering of msg received
if (msg instanceof EntryMsg)
{
EntryMsg entryMsg = (EntryMsg)msg;
@@ -1899,7 +1873,7 @@
if (ieContext.exporterProtocolVersion >=
ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- // check the msgCnt of the msg received to check sequenciality
+ // check the msgCnt of the msg received to check ordering
if (++ieContext.msgCnt != entryMsg.getMsgId())
{
if (ieContext.getException() == null)
@@ -1928,16 +1902,20 @@
}
else if (msg instanceof DoneMsg)
{
- // This is the normal termination of the import
- // No error is stored and the import is ended
- // by returning null
+ /*
+ This is the normal termination of the import
+ No error is stored and the import is ended
+ by returning null
+ */
return null;
}
else if (msg instanceof ErrorMsg)
{
- // This is an error termination during the import
- // The error is stored and the import is ended
- // by returning null
+ /*
+ This is an error termination during the import
+ The error is stored and the import is ended
+ by returning null
+ */
if (ieContext.getException() == null)
{
ErrorMsg errMsg = (ErrorMsg)msg;
@@ -1971,7 +1949,6 @@
}
catch(Exception e)
{
- // TODO: i18n
if (ieContext.getException() == null)
ieContext.setException(new DirectoryException(ResultCode.OTHER,
ERR_INIT_IMPORT_FAILURE.get(e.getLocalizedMessage())));
@@ -1984,7 +1961,7 @@
* This is based on the hypothesis that the entries are separated
* by a "\n\n" String.
*
- * @param entryBytes
+ * @param entryBytes the set of bytes containing one or more entries.
* @return The number of entries in the provided byte[].
*/
private int countEntryLimits(byte[] entryBytes)
@@ -1997,7 +1974,7 @@
* This is based on the hypothesis that the entries are separated
* by a "\n\n" String.
*
- * @param entryBytes
+ * @param entryBytes the set of bytes containing one or more entries.
* @return The number of entries in the provided byte[].
*/
private int countEntryLimits(byte[] entryBytes, int pos, int length)
@@ -2029,7 +2006,8 @@
throws IOException
{
if (debugEnabled())
- TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" + lDIFEntry);
+ TRACER.debugInfo("[IE] Entering exportLDIFEntry entry=" +
+ Arrays.toString(lDIFEntry));
// build the message
EntryMsg entryMessage = new EntryMsg(
@@ -2039,9 +2017,11 @@
// Waiting the slowest loop
while (!broker.shuttingDown())
{
- // If an error was raised - like receiving an ErrorMsg from a remote
- // server that have been stored by the listener thread in the ieContext,
- // we just abandon the export by throwing an exception.
+ /*
+ If an error was raised - like receiving an ErrorMsg from a remote
+ server that have been stored by the listener thread in the ieContext,
+ we just abandon the export by throwing an exception.
+ */
if (ieContext.getException() != null)
throw(new IOException(ieContext.getException().getMessage()));
@@ -2094,7 +2074,8 @@
} // Waiting the slowest loop
if (debugEnabled())
- TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry=" + lDIFEntry);
+ TRACER.debugInfo("[IE] Entering exportLDIFEntry pub entry="
+ + Arrays.toString(lDIFEntry));
// publish the message
boolean sent = broker.publish(entryMessage, false);
@@ -2212,18 +2193,22 @@
errMsg = ERR_INITIALIZATION_FAILED_NOCONN.get(getServiceID());
}
- // We must not test here whether the remote source is connected to
- // the topology by testing if it stands in the replicas list since.
- // In the case of a re-attempt of initialization, the listener thread is
- // running this method directly coming from initailize() method and did
- // not processed any topology message in between the failure and the
- // new attempt.
+ /*
+ We must not test here whether the remote source is connected to
+ the topology by testing if it stands in the replicas list since.
+ In the case of a re-attempt of initialization, the listener thread is
+ running this method directly coming from initialize() method and did
+ not processed any topology message in between the failure and the
+ new attempt.
+ */
try
{
- // We must immediatly acquire a context to store the task inside
- // The context will be used when we (the listener thread) will receive
- // the InitializeTargetMsg, process the import, and at the end
- // update the task.
+ /*
+ We must immediately acquire a context to store the task inside
+ The context will be used when we (the listener thread) will receive
+ the InitializeTargetMsg, process the import, and at the end
+ update the task.
+ */
acquireIEContext(true); //test and set if no import already in progress
ieContext.initializeTask = initTask;
@@ -2234,11 +2219,13 @@
// Publish Init request msg
broker.publish(ieContext.initReqMsgSent);
- // The normal success processing is now to receive InitTargetMsg then
- // entries from the remote server.
- // The error cases are :
- // - either local error immediatly caught below
- // - a remote error we will receive as an ErrorMsg
+ /*
+ The normal success processing is now to receive InitTargetMsg then
+ entries from the remote server.
+ The error cases are :
+ - either local error immediately caught below
+ - a remote error we will receive as an ErrorMsg
+ */
}
catch(DirectoryException de)
{
@@ -2272,15 +2259,15 @@
*
* @param initTargetMsgReceived The message received from the remote server.
*
- * @param requestorServerId The serverId of the server that requested the
+ * @param requesterServerId The serverId of the server that requested the
* initialization meaning the server where the
* task has initially been created (this server,
* or the remote server).
*/
void initialize(InitializeTargetMsg initTargetMsgReceived,
- int requestorServerId)
+ int requesterServerId)
{
- InitializeTask initFromtask = null;
+ InitializeTask initFromTask = null;
if (debugEnabled())
TRACER.debugInfo("[IE] Entering initialize - domain=" + this);
@@ -2300,16 +2287,20 @@
// Acquire an import context if no already done (and initialize).
if (initTargetMsgReceived.getInitiatorID() == this.serverID)
{
- // The initTargetMsgReceived received is the answer to a request that
- // we (this server) sent previously. In this case, so the IEContext
- // has been already acquired when the request was published in order
- // to store the task (to be updated with the status at the end).
+ /*
+ The initTargetMsgReceived received is the answer to a request that
+ we (this server) sent previously. In this case, so the IEContext
+ has been already acquired when the request was published in order
+ to store the task (to be updated with the status at the end).
+ */
}
else
{
- // The initTargetMsgReceived is for an import initiated by the remote
- // server.
- // Test and set if no import already in progress
+ /*
+ The initTargetMsgReceived is for an import initiated by the remote
+ server.
+ Test and set if no import already in progress
+ */
acquireIEContext(true);
}
@@ -2319,16 +2310,18 @@
ieContext.initWindow = initTargetMsgReceived.getInitWindow();
// Protocol version is -1 when not known.
ieContext.exporterProtocolVersion = getProtocolVersion(source);
- initFromtask = (InitializeTask)ieContext.initializeTask;
+ initFromTask = (InitializeTask)ieContext.initializeTask;
- // Lauch the import
+ // Launch the import
importBackend(new ReplInputStream(this));
}
catch (DirectoryException e)
{
- // Store the exception raised. It will be considered if no other exception
- // has been previously stored in the context
+ /*
+ Store the exception raised. It will be considered if no other exception
+ has been previously stored in the context
+ */
if (ieContext.getException() == null)
ieContext.setException(e);
}
@@ -2339,30 +2332,37 @@
+ " ends import with exception=" + ieContext.getException()
+ " connected=" + broker.isConnected());
- // It is necessary to restart (reconnect to RS) for different reasons
- // - when everything went well, reconnect in order to exchange
- // new state, new generation ID
- // - when we have connection failure, reconnect to retry a new import
- // right here, right now
- // we never want retryOnFailure if we fails reconnecting in the restart.
+ /*
+ It is necessary to restart (reconnect to RS) for different reasons
+ - when everything went well, reconnect in order to exchange
+ new state, new generation ID
+ - when we have connection failure, reconnect to retry a new import
+ right here, right now
+ we never want retryOnFailure if we fails reconnecting in the restart.
+ */
broker.reStart(false);
if (ieContext.getException() != null)
{
- if (broker.isConnected() && (initFromtask != null)
+ if (broker.isConnected() && (initFromTask != null)
&& (++ieContext.attemptCnt<2))
{
- // Worth a new attempt
- // since initFromtask is in this server, connection is ok
+ /*
+ Worth a new attempt
+ since initFromTask is in this server, connection is ok
+ */
try
{
-
- // Wait for the exporter to stabilize - eventually reconnect as
- // well if it was connected to the same RS than the one we lost ...
+ /*
+ Wait for the exporter to stabilize - eventually reconnect as
+ well if it was connected to the same RS than the one we lost ...
+ */
Thread.sleep(1000);
- // Restart the whole import protocol exchange by sending again
- // the request
+ /*
+ Restart the whole import protocol exchange by sending again
+ the request
+ */
logError(NOTE_RESENDING_INIT_FROM_REMOTE_REQUEST.get(
ieContext.getException().getLocalizedMessage()));
@@ -2378,8 +2378,10 @@
}
catch(Exception e)
{
- // An error occurs when sending a new request for a new import.
- // This error is not stored, prefering to keep the initial one.
+ /*
+ An error occurs when sending a new request for a new import.
+ This error is not stored, prefering to keep the initial one.
+ */
logError(ERR_SENDING_NEW_ATTEMPT_INIT_REQUEST.get(
e.getLocalizedMessage(),
ieContext.getException().getLocalizedMessage()));
@@ -2394,7 +2396,7 @@
TRACER.debugInfo("[IE] Domain=" + this
+ " ends initialization with exception=" + ieContext.getException()
+ " connected=" + broker.isConnected()
- + " task=" + initFromtask
+ + " task=" + initFromTask
+ " attempt=" + ieContext.attemptCnt);
try
@@ -2402,24 +2404,28 @@
if (broker.isConnected() && (ieContext.getException() != null))
{
// Let's notify the exporter
- ErrorMsg errorMsg = new ErrorMsg(requestorServerId,
+ ErrorMsg errorMsg = new ErrorMsg(requesterServerId,
ieContext.getException().getMessageObject());
broker.publish(errorMsg);
}
else // !broker.isConnected()
{
- // Don't try to reconnect here.
- // The current running thread is the listener thread and will loop on
- // receive() that is expected to manage reconnects attempt.
+ /*
+ Don't try to reconnect here.
+ The current running thread is the listener thread and will loop on
+ receive() that is expected to manage reconnects attempt.
+ */
}
- // Update the task that initiated the import must be the last thing.
- // Particularly, broker.restart() after import success must be done
- // before some other operations/tasks to be launched,
- // like resetting the generation ID.
- if (initFromtask != null)
+ /*
+ Update the task that initiated the import must be the last thing.
+ Particularly, broker.restart() after import success must be done
+ before some other operations/tasks to be launched,
+ like resetting the generation ID.
+ */
+ if (initFromTask != null)
{
- initFromtask.updateTaskCompletionState(ieContext.getException());
+ initFromTask.updateTaskCompletionState(ieContext.getException());
}
}
finally
@@ -2435,10 +2441,10 @@
}
/**
- * Return the protocol version of the DS related to the provided serverid.
+ * Return the protocol version of the DS related to the provided serverId.
* Returns -1 when the protocol version is not known.
- * @param dsServerId The provided serverid.
- * @return The procotol version.
+ * @param dsServerId The provided serverId.
+ * @return The protocol version.
*/
short getProtocolVersion(int dsServerId)
{
@@ -2515,11 +2521,11 @@
private void checkGenerationID(long generationID)
throws DirectoryException
{
- boolean allset = true;
+ boolean allSet = true;
for (int i = 0; i< 50; i++)
{
- allset = true;
+ allSet = true;
for (RSInfo rsInfo : getRsList())
{
// the 'empty' RSes (generationId==-1) are considered as good citizens
@@ -2532,16 +2538,16 @@
} catch (InterruptedException e)
{
}
- allset = false;
+ allSet = false;
break;
}
}
- if (allset)
+ if (allSet)
{
break;
}
}
- if (!allset)
+ if (!allSet)
{
ResultCode resultCode = ResultCode.OTHER;
Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
@@ -2735,9 +2741,11 @@
*/
void processUpdateDoneSynchronous(UpdateMsg msg)
{
- // Warning: in synchronous mode, no way to tell the replay of an update went
- // wrong Just put null in processUpdateDone so that if assured replication
- // is used the ack is sent without error at replay flag.
+ /*
+ Warning: in synchronous mode, no way to tell the replay of an update went
+ wrong Just put null in processUpdateDone so that if assured replication
+ is used the ack is sent without error at replay flag.
+ */
processUpdateDone(msg, null);
state.update(msg.getChangeNumber());
}
@@ -2749,10 +2757,7 @@
*/
public boolean isConnected()
{
- if (broker != null)
- return broker.isConnected();
- else
- return false;
+ return broker != null && broker.isConnected();
}
/**
@@ -2764,10 +2769,7 @@
*/
public boolean hasConnectionError()
{
- if (broker != null)
- return broker.hasConnectionError();
- else
- return true;
+ return broker == null || broker.hasConnectionError();
}
/**
@@ -2852,24 +2854,16 @@
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged per server.
- * @return The number of updates sent in assured safe read mode that have not
- * been acknowledged per server.
+ * @return A copy of the map that contains the number of updates sent in
+ * assured safe read mode that have not been acknowledged per server.
*/
public Map<Integer, Integer> getAssuredSrServerNotAcknowledgedUpdates()
{
- // Clone a snapshot with synchronized section to have a consistent view in
- // monitoring
- Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>();
synchronized(assuredSrServerNotAcknowledgedUpdates)
{
- Set<Integer> keySet = assuredSrServerNotAcknowledgedUpdates.keySet();
- for (Integer serverId : keySet)
- {
- Integer i = assuredSrServerNotAcknowledgedUpdates.get(serverId);
- snapshot.put(serverId, i);
- }
+ return new HashMap<Integer, Integer>(
+ assuredSrServerNotAcknowledgedUpdates);
}
- return snapshot;
}
/**
@@ -2937,24 +2931,16 @@
/**
* Gets the number of updates sent in assured safe data mode that have not
* been acknowledged due to timeout error per server.
- * @return The number of updates sent in assured safe data mode that have not
- * been acknowledged due to timeout error per server.
+ * @return A copy of the map that contains the number of updates sent in
+ * assured safe data mode that have not been acknowledged due to timeout
+ * error per server.
*/
public Map<Integer, Integer> getAssuredSdServerTimeoutUpdates()
{
- // Clone a snapshot with synchronized section to have a consistent view in
- // monitoring
- Map<Integer, Integer> snapshot = new HashMap<Integer, Integer>();
synchronized(assuredSdServerTimeoutUpdates)
{
- Set<Integer> keySet = assuredSdServerTimeoutUpdates.keySet();
- for (Integer serverId : keySet)
- {
- Integer i = assuredSdServerTimeoutUpdates.get(serverId);
- snapshot.put(serverId, i);
- }
+ return new HashMap<Integer, Integer>(assuredSdServerTimeoutUpdates);
}
- return snapshot;
}
/**
@@ -2981,14 +2967,20 @@
assuredSrTimeoutUpdates = new AtomicInteger(0);
assuredSrWrongStatusUpdates = new AtomicInteger(0);
assuredSrReplayErrorUpdates = new AtomicInteger(0);
- assuredSrServerNotAcknowledgedUpdates = new HashMap<Integer,Integer>();
+ synchronized (assuredSrServerNotAcknowledgedUpdates)
+ {
+ assuredSrServerNotAcknowledgedUpdates.clear();
+ }
assuredSrReceivedUpdates = new AtomicInteger(0);
assuredSrReceivedUpdatesAcked = new AtomicInteger(0);
assuredSrReceivedUpdatesNotAcked = new AtomicInteger(0);
assuredSdSentUpdates = new AtomicInteger(0);
assuredSdAcknowledgedUpdates = new AtomicInteger(0);
assuredSdTimeoutUpdates = new AtomicInteger(0);
- assuredSdServerTimeoutUpdates = new HashMap<Integer,Integer>();
+ synchronized (assuredSdServerTimeoutUpdates)
+ {
+ assuredSdServerTimeoutUpdates.clear();
+ }
}
/*
@@ -3080,8 +3072,10 @@
{
synchronized (sessionLock)
{
- // Stop the broker first in order to prevent the listener from
- // reconnecting - see OPENDJ-457.
+ /*
+ Stop the broker first in order to prevent the listener from
+ reconnecting - see OPENDJ-457.
+ */
if (broker != null)
{
broker.stop();
@@ -3263,8 +3257,10 @@
{
broker.updateWindowAfterReplay();
- // Send an ack if it was requested and the group id is the same of the RS
- // one. Only Safe Read mode makes sense in DS for returning an ack.
+ /*
+ Send an ack if it was requested and the group id is the same of the RS
+ one. Only Safe Read mode makes sense in DS for returning an ack.
+ */
byte rsGroupId = broker.getRsGroupId();
if (msg.isAssured())
{
@@ -3282,9 +3278,9 @@
if (replayErrorMsg != null)
{
// Mark the error in the ack
- // -> replay error occured
+ // -> replay error occurred
ackMsg.setHasReplayError(true);
- // -> replay error occured in our server
+ // -> replay error occurred in our server
List<Integer> idList = new ArrayList<Integer>();
idList.add(serverID);
ackMsg.setFailedServers(idList);
@@ -3306,8 +3302,10 @@
logError(errorMsg);
} else
{
- // In safe data mode assured update that comes up to a DS requires no
- // ack from a destinator DS. Safe data mode is based on RS acks only
+ /*
+ In safe data mode assured update that comes up to a DS requires no
+ ack from a recipient DS. Safe data mode is based on RS acks only
+ */
}
}
}
@@ -3343,7 +3341,7 @@
* If assured configured, set message accordingly to request an ack in the
* right assured mode.
* No ack requested for a RS with a different group id. Assured
- * replication suported for the same locality, i.e: a topology working in
+ * replication supported for the same locality, i.e: a topology working in
* the same
* geographical location). If we are connected to a RS which is not in our
* locality, no need to ask for an ack.
@@ -3355,12 +3353,11 @@
if (assuredMode == AssuredMode.SAFE_DATA_MODE)
msg.setSafeDataLevel(assuredSdLevel);
- // Add the assured message to the list of update that are
- // waiting for acks
- synchronized (waitingAckMsgs)
- {
- waitingAckMsgs.put(msg.getChangeNumber(), msg);
- }
+ /*
+ Add the assured message to the list of update that are
+ waiting for acks
+ */
+ waitingAckMsgs.put(msg.getChangeNumber(), msg);
}
}
@@ -3410,8 +3407,10 @@
{
try
{
- // WARNING: this timeout may be difficult to optimize: too low, it
- // may use too much CPU, too high, it may penalize performance...
+ /*
+ WARNING: this timeout may be difficult to optimize: too low, it
+ may use too much CPU, too high, it may penalize performance...
+ */
msg.wait(10);
} catch (InterruptedException e)
{
@@ -3425,14 +3424,13 @@
// Timeout ?
if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
{
- // Timeout occured, be sure that ack is not being received and if so,
- // remove the update from the wait list, log the timeout error and
- // also update assured monitoring counters
+ /*
+ Timeout occurred, be sure that ack is not being received and if so,
+ remove the update from the wait list, log the timeout error and
+ also update assured monitoring counters
+ */
UpdateMsg update;
- synchronized (waitingAckMsgs)
- {
- update = waitingAckMsgs.remove(cn);
- }
+ update = waitingAckMsgs.remove(cn);
if (update != null)
{
@@ -3490,9 +3488,9 @@
}
/**
- * Publish informations to the Replication Service (not assured mode).
+ * Publish information to the Replication Service (not assured mode).
*
- * @param msg The byte array containing the informations that should
+ * @param msg The byte array containing the information that should
* be sent to the remote entities.
*/
public void publish(byte[] msg)
@@ -3501,10 +3499,11 @@
synchronized (this)
{
update = new UpdateMsg(generator.newChangeNumber(), msg);
-
- // If assured replication is configured, this will prepare blocking
- // mechanism. If assured replication is disabled, this returns
- // immediately
+ /*
+ If assured replication is configured, this will prepare blocking
+ mechanism. If assured replication is disabled, this returns
+ immediately
+ */
prepareWaitForAckIfAssuredEnabled(update);
publish(update);
@@ -3512,16 +3511,18 @@
try
{
- // If assured replication is enabled, this will wait for the matching
- // ack or time out. If assured replication is disabled, this returns
- // immediately
+ /*
+ If assured replication is enabled, this will wait for the matching
+ ack or time out. If assured replication is disabled, this returns
+ immediately
+ */
waitForAckIfAssuredEnabled(update);
} catch (TimeoutException ex)
{
// This exception may only be raised if assured replication is
// enabled
Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
- assuredTimeout), msg.toString());
+ assuredTimeout), update.toString());
logError(errorMsg);
}
}
@@ -3557,10 +3558,7 @@
*/
public boolean importInProgress()
{
- if (ieContext == null)
- return false;
- else
- return ieContext.importInProgress;
+ return ieContext != null && ieContext.importInProgress;
}
/**
@@ -3572,10 +3570,7 @@
*/
public boolean exportInProgress()
{
- if (ieContext == null)
- return false;
- else
- return !ieContext.importInProgress;
+ return ieContext != null && !ieContext.importInProgress;
}
/**
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
index dfbc0cc..8e65c5d 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
@@ -23,26 +23,20 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions Copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import org.opends.server.TestCaseUtils;
-import org.opends.server.core.AddOperationBasis;
-import org.opends.server.protocols.internal.InternalClientConnection;
-import org.opends.server.replication.common.ServerState;
-
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
-import org.opends.server.types.Entry;
-import org.opends.server.types.ResultCode;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
+
+import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
+import static org.testng.Assert.assertEquals;
/**
* Test the PersistentServerState class.
@@ -65,11 +59,11 @@
* retrieve ServerState to persistent storage.
*/
@Test(dataProvider = "suffix")
- public void persistenServerStateTest(String dn)
+ public void persistentServerStateTest(String dn)
throws Exception
{
/*
- * Create a new PersitentServerState,
+ * Create a new PersistentServerState,
* update it with 2 new ChangeNumbers with 2 different server Ids
* save it
*
@@ -109,67 +103,4 @@
"cn1 has not been saved after clear for " + dn);
}
-
- /**
- * Ensures that the Directory Server is able to
- * translate a ruv entry to a sever state.
- *
- * @throws Exception If an unexpected problem occurs.
- */
- @SuppressWarnings("unchecked")
- @Test
- public void translateRuvEntryTest()
- throws Exception
- {
- LDAPReplicationDomain replDomain = null;
-
- try
- {
- String RuvString =
- "dn: nsuniqueid=ffffffff-ffffffff-ffffffff-ffffffff, o=test\n"
- +"objectClass: top\n"
- +"objectClass: ldapsubentry\n"
- +"objectClass: extensibleobject\n"
- +"nsds50ruv: {replicageneration} 49098853000000010000\n"
- +"nsds50ruv: {replica 3 ldap://kawax:3389} 491d517b000000030000 "
- +"491d564a000000030000\n"
- +"nsds50ruv: {replica 1 ldap://kawax:1389} 490989e8000000010000 "
- +"490989e8000000010000\n"
- +"ds6ruv: {PRIO 3 ldap://kawax:3389}\n"
- +"ds6ruv: {PRIO 1 ldap://kawax:1389}\n"
- +"entryUUID: ffffffff-ffff-ffff-ffff-ffffffffffff\n";
-
- Entry RuvEntry = TestCaseUtils.entryFromLdifString(RuvString);
- AddOperationBasis addOp = new AddOperationBasis(InternalClientConnection.
- getRootConnection(), InternalClientConnection.nextOperationID(),
- InternalClientConnection.nextMessageID(), null, RuvEntry.getDN(),
- RuvEntry.getObjectClasses(), RuvEntry.getUserAttributes(),
- RuvEntry.getOperationalAttributes());
-
- addOp.setInternalOperation(true);
- addOp.run();
-
- assertTrue(addOp.getResultCode() == ResultCode.SUCCESS);
-
- DomainFakeCfg domainConf =
- new DomainFakeCfg("o=test", 1, "localhost:3389");
- replDomain = MultimasterReplication.createNewDomain(domainConf);
- replDomain.start();
-
- // Then check serverSate and GenId
- assertTrue(replDomain.getGenerationID() == 1225361491);
-
- ServerState state = replDomain.getServerState();
- assertTrue(state.getMaxChangeNumber( 1).
- compareTo(new ChangeNumber("0000011d4d42b240000100000000")) == 0);
- assertTrue(state.getMaxChangeNumber( 3).
- compareTo(new ChangeNumber("0000011d9a991110000300000000")) == 0);
-
- }
- finally
- {
- if (replDomain != null)
- MultimasterReplication.deleteDomain(DN.decode("o=test"));
- }
- }
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index b3b2f94..415b317 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2012 ForgeRock AS
+ * Portions copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -681,21 +681,21 @@
ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), 1234567, 45678);
ArrayList<Integer> fservers1 = new ArrayList<Integer>();
- fservers1.add(new Integer(12345));
- fservers1.add(new Integer(-12345));
- fservers1.add(new Integer(31657));
- fservers1.add(new Integer(-28456));
- fservers1.add(new Integer(0));
+ fservers1.add(12345);
+ fservers1.add(-12345);
+ fservers1.add(31657);
+ fservers1.add(-28456);
+ fservers1.add(0);
ArrayList<Integer> fservers2 = new ArrayList<Integer>();
ArrayList<Integer> fservers3 = new ArrayList<Integer>();
- fservers3.add(new Integer(0));
+ fservers3.add(0);
ArrayList<Integer> fservers4 = new ArrayList<Integer>();
- fservers4.add(new Integer(100));
- fservers4.add(new Integer(2000));
- fservers4.add(new Integer(30000));
- fservers4.add(new Integer(-100));
- fservers4.add(new Integer(-2000));
- fservers4.add(new Integer(-30000));
+ fservers4.add(100);
+ fservers4.add(2000);
+ fservers4.add(30000);
+ fservers4.add(-100);
+ fservers4.add(-2000);
+ fservers4.add(-30000);
return new Object[][] {
{cn1, true, false, false, fservers1},
@@ -1421,7 +1421,7 @@
public void startECLMsgTest(int serverId, String baseDN, int window,
ServerState state, long genId, boolean sslEncryption, byte groupId) throws Exception
{
- ServerStartECLMsg msg = new ServerStartECLMsg(baseDN,
+ ServerStartECLMsg msg = new ServerStartECLMsg(
window, window, window, window, window, window, state,
ProtocolVersion.getCurrentVersion(), genId, sslEncryption, groupId);
ServerStartECLMsg newMsg = new ServerStartECLMsg(msg.getBytes());
@@ -1447,13 +1447,13 @@
{
// data
ChangeNumber changeNumber = new ChangeNumber(TimeThread.getTime(), 123, 45);
- String generalizedState = new String("fakegenstate");
+ String generalizedState = "fakegenstate";
ServerState state = new ServerState();
assertTrue(state.update(new ChangeNumber((long)75, 5,263)));
short mode = 3;
int firstDraftChangeNumber = 13;
int lastDraftChangeNumber = 14;
- String myopid = new String("fakeopid");
+ String myopid = "fakeopid";
// create original
StartECLSessionMsg msg = new StartECLSessionMsg();
msg.setChangeNumber(changeNumber);
--
Gitblit v1.10.0