From 6591ed28aac023a77de1564a3b65526a53229fe8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 23 Aug 2013 12:43:38 +0000
Subject: [PATCH] Fixed the duplicate attribute values returned by "dc=replicationchanges".
---
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java | 421 ++++++++++++++++++++++-----------------------------
1 files changed, 183 insertions(+), 238 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index f693b33..e78ee6d 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -46,6 +46,7 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationServerListener;
import org.opends.server.replication.protocol.*;
@@ -53,27 +54,30 @@
import org.opends.server.types.*;
import org.opends.server.util.*;
+import static java.util.Collections.*;
+
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.JebMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.types.FilterType.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
/**
- * This class defines a backend that stores its information in an
- * associated replication server object.
+ * This class defines a backend that stores its information in an associated
+ * replication server object.
+ * <p>
* This is primarily intended to take advantage of the backup/restore/
- * import/export of the backend API, and to provide an LDAP access
- * to the replication server database.
- * <BR><BR>
- * Entries stored in this backend are held in the DB associated with
- * the replication server.
- * <BR><BR>
+ * import/export of the backend API, and to provide an LDAP access to the
+ * replication server database.
+ * <p>
+ * Entries stored in this backend are held in the DB associated with the
+ * replication server.
+ * <p>
* Currently are only implemented the create and restore backup features.
- *
*/
public class ReplicationBackend
extends Backend
@@ -137,20 +141,6 @@
// Perform all initialization in initializeBackend.
}
-
- /**
- * Set the base DNs for this backend. This is used by the unit tests
- * to set the base DNs without having to provide a configuration
- * object when initializing the backend.
- * @param baseDNs The set of base DNs to be served by this memory backend.
- */
- public void setBaseDNs(DN[] baseDNs)
- {
- this.baseDNs = baseDNs;
- }
-
-
-
/**
* {@inheritDoc}
*/
@@ -163,7 +153,7 @@
BackendCfg cfg = (BackendCfg) config;
DN[] newBaseDNs = new DN[cfg.getBaseDN().size()];
cfg.getBaseDN().toArray(newBaseDNs);
- setBaseDNs(newBaseDNs);
+ this.baseDNs = newBaseDNs;
}
}
@@ -182,8 +172,7 @@
throw new ConfigException(message);
}
- baseDNSet = new HashSet<DN>();
- baseDNSet.addAll(Arrays.asList(baseDNs));
+ baseDNSet = new HashSet<DN>(Arrays.asList(baseDNs));
supportedControls = new HashSet<String>();
supportedFeatures = new HashSet<String>();
@@ -213,8 +202,8 @@
ObjectClass objectclassOC =
DirectoryServer.getObjectClass(ATTR_OBJECTCLASSES_LC, true);
rootObjectclasses.put(objectclassOC, ATTR_OBJECTCLASSES_LC);
- attributes = new LinkedHashMap<AttributeType,List<Attribute>>();
+ attributes = new LinkedHashMap<AttributeType,List<Attribute>>();
Attribute a = Attributes.create("changetype", "add");
List<Attribute> attrList = new ArrayList<Attribute>(1);
attrList.add(a);
@@ -284,10 +273,8 @@
//This method only returns the number of actual change entries, the
//domain and any baseDN entries are not counted.
long retNum=0;
- for (Iterator<ReplicationServerDomain> iter = server.getDomainIterator();
- iter.hasNext();)
+ for (ReplicationServerDomain rsd : toIterable(server.getDomainIterator()))
{
- ReplicationServerDomain rsd = iter.next();
retNum += rsd.getChangesCount();
}
return retNum;
@@ -507,7 +494,8 @@
{
break;
}
- processContainer(exportContainer, exportConfig, ldifWriter, null);
+ writeChangesAfterChangeNumber(exportContainer, exportConfig,
+ ldifWriter, null, null);
}
}
finally
@@ -540,23 +528,22 @@
for (Iterator<ReplicationServerDomain> iter = server.getDomainIterator();
iter.hasNext();)
{
- ReplicationServerDomain rc = iter.next();
+ ReplicationServerDomain rsd = iter.next();
// Skip containers that are not covered by the include branches.
- DN baseDN = DN.decode(rc.getBaseDn() + "," + BASE_DN);
-
if (includeBranches == null || includeBranches.isEmpty())
{
- exportContainers.add(rc);
+ exportContainers.add(rsd);
}
else
{
+ DN baseDN = DN.decode(rsd.getBaseDn() + "," + BASE_DN);
for (DN includeBranch : includeBranches)
{
if (includeBranch.isDescendantOf(baseDN)
|| includeBranch.isAncestorOf(baseDN))
{
- exportContainers.add(rc);
+ exportContainers.add(rsd);
}
}
}
@@ -576,12 +563,9 @@
builder.add("domain");
Attribute ocAttr = builder.toAttribute();
- List<Attribute> ldapAttrList = new ArrayList<Attribute>();
- ldapAttrList.add(ocAttr);
-
Map<AttributeType, List<Attribute>> attrs =
new HashMap<AttributeType, List<Attribute>>();
- attrs.put(ocType, ldapAttrList);
+ attrs.put(ocType, singletonList(ocAttr));
try
{
@@ -603,31 +587,22 @@
break;
}
- attrs.clear();
-
- // TODO JNR these multiple calls to clear() are more than suspect!
- ldapAttrList.clear();
- ldapAttrList.add(ocAttr);
- attrs.put(ocType, ldapAttrList);
-
- TRACER.debugInfo("State=" +
- exportContainer.getDbServerState());
- Attribute stateAttr = Attributes.create("state",
- exportContainer.getDbServerState().toString());
- ldapAttrList.clear();
- ldapAttrList.add(stateAttr);
- attrs.put(stateAttr.getAttributeType(), ldapAttrList);
-
+ final ServerState serverState = exportContainer.getDbServerState();
+ TRACER.debugInfo("State=" + serverState);
+ Attribute stateAttr = Attributes.create("state", serverState.toString());
Attribute genidAttr = Attributes.create("generation-id",
exportContainer.getGenerationId() + exportContainer.getBaseDn());
- ldapAttrList.clear();
- ldapAttrList.add(genidAttr);
- attrs.put(genidAttr.getAttributeType(), ldapAttrList);
+ attrs.clear();
+ attrs.put(ocType, singletonList(ocAttr));
+ attrs.put(stateAttr.getAttributeType(), singletonList(stateAttr));
+ attrs.put(genidAttr.getAttributeType(), singletonList(genidAttr));
+
+ final String dnString = exportContainer.getBaseDn() + "," + BASE_DN;
try
{
- ChangeRecordEntry changeRecord = new AddChangeRecordEntry(
- DN.decode(exportContainer.getBaseDn() + "," + BASE_DN), attrs);
+ DN dn = DN.decode(dnString);
+ ChangeRecordEntry changeRecord = new AddChangeRecordEntry(dn, attrs);
ldifWriter.writeChangeRecord(changeRecord);
}
catch (Exception e)
@@ -636,100 +611,47 @@
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- Message message = ERR_BACKEND_EXPORT_ENTRY.get(
- exportContainer.getBaseDn() + "," + BASE_DN,
- String.valueOf(e));
- logError(message);
+ logError(ERR_BACKEND_EXPORT_ENTRY.get(dnString, String.valueOf(e)));
}
}
}
/**
- * Processes the changes for a given ReplicationServerDomain.
+ * Exports or returns all the changes from a ReplicationServerDomain coming
+ * after the changeNumber specified in the searchOperation.
*/
- private void processContainer(ReplicationServerDomain rsd,
- LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
- SearchOperation searchOperation)
+ private void writeChangesAfterChangeNumber(ReplicationServerDomain rsd,
+ final LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
+ SearchOperation searchOperation, final ChangeNumber previousCN)
{
for (int serverId : rsd.getServers())
{
if (exportConfig != null && exportConfig.isCancelled())
{ // Abort if cancelled
- break;
+ return;
}
- ChangeNumber previousChangeNumber = null;
- if (searchOperation != null)
- {
- // Try to optimize for filters like replicationChangeNumber>=xxxxx
- // or replicationChangeNumber=xxxxx :
- // If the search filter is one of these 2 filters, move directly to
- // ChangeNumber=xxxx before starting the iteration.
- SearchFilter filter = searchOperation.getFilter();
- previousChangeNumber = extractChangeNumber(filter);
-
- if (previousChangeNumber == null &&
- filter.getFilterType().equals(FilterType.AND))
- {
- for (SearchFilter filterComponents: filter.getFilterComponents())
- {
- previousChangeNumber = extractChangeNumber(filterComponents);
- if (previousChangeNumber != null)
- break;
- }
- }
- }
-
-
- ReplicationIterator ri = rsd.getChangelogIterator(serverId,
- previousChangeNumber);
+ ReplicationIterator ri = rsd.getChangelogIterator(serverId, previousCN);
if (ri != null)
{
try
{
int lookthroughCount = 0;
- int lookthroughLimit = 0;
- if (searchOperation != null)
- {
- lookthroughLimit =
- searchOperation.getClientConnection().getLookthroughLimit();
- }
// Walk through the changes
while (ri.getChange() != null)
{
if (exportConfig != null && exportConfig.isCancelled())
{ // abort if cancelled
- break;
+ return;
}
- if (searchOperation != null)
+ if (!canContinue(searchOperation, lookthroughCount))
{
- try
- {
- if (lookthroughLimit > 0 && lookthroughCount > lookthroughLimit)
- {
- // Lookthrough limit exceeded
- searchOperation.setResultCode(
- ResultCode.ADMIN_LIMIT_EXCEEDED);
- searchOperation.setErrorMessage(null);
- break;
- }
-
- searchOperation.checkIfCanceled(false);
- } catch (CanceledOperationException e)
- {
- searchOperation.setResultCode(ResultCode.CANCELED);
- searchOperation.setErrorMessage(null);
- break;
- }
+ break;
}
lookthroughCount++;
- UpdateMsg msg = ri.getChange();
- processChange(
- msg, exportConfig, ldifWriter, searchOperation,
- rsd.getBaseDn());
- if (!ri.next())
- break;
+ writeChange(ri.getChange(), ldifWriter, searchOperation,
+ rsd.getBaseDn(), exportConfig != null);
}
}
finally
@@ -740,6 +662,45 @@
}
}
+ private boolean canContinue(SearchOperation searchOperation,
+ int lookthroughCount)
+ {
+ if (searchOperation == null)
+ {
+ return true;
+ }
+
+ int limit = searchOperation.getClientConnection().getLookthroughLimit();
+ if (lookthroughCount > limit && limit > 0)
+ {
+ // lookthrough limit exceeded
+ searchOperation.setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED);
+ searchOperation.setErrorMessage(null);
+ return false;
+ }
+
+ try
+ {
+ searchOperation.checkIfCanceled(false);
+ return true;
+ }
+ catch (CanceledOperationException e)
+ {
+ searchOperation.setResultCode(ResultCode.CANCELED);
+ searchOperation.setErrorMessage(null);
+ return false;
+ }
+ }
+
+ private ChangeNumber extractChangeNumber(SearchOperation searchOperation)
+ {
+ if (searchOperation != null)
+ {
+ return extractChangeNumber(searchOperation.getFilter());
+ }
+ return null;
+ }
+
/**
* Attempt to extract a ChangeNumber from searchFilter like
* ReplicationChangeNumber=xxxx or ReplicationChangeNumber>=xxxx.
@@ -751,27 +712,42 @@
*/
private ChangeNumber extractChangeNumber(SearchFilter filter)
{
- AttributeType changeNumberAttrType =
- DirectoryServer.getDefaultAttributeType(CHANGE_NUMBER);
-
- FilterType filterType = filter.getFilterType();
- if ( (filterType.equals(FilterType.GREATER_OR_EQUAL) ||
- filterType.equals(FilterType.EQUALITY) ) &&
- filter.getAttributeType().equals(changeNumberAttrType))
+ // Try to optimize for filters like replicationChangeNumber>=xxxxx
+ // or replicationChangeNumber=xxxxx :
+ // If the search filter is one of these 2 filters, move directly to
+ // ChangeNumber=xxxx before starting the iteration.
+ final FilterType filterType = filter.getFilterType();
+ if (GREATER_OR_EQUAL.equals(filterType) || EQUALITY.equals(filterType))
{
- try
+ AttributeType changeNumberAttrType =
+ DirectoryServer.getDefaultAttributeType(CHANGE_NUMBER);
+ if (filter.getAttributeType().equals(changeNumberAttrType))
{
- ChangeNumber startingChangeNumber =
- new ChangeNumber(filter.getAssertionValue().getValue().toString());
- return new ChangeNumber(
- startingChangeNumber.getTime(),
- startingChangeNumber.getSeqnum()-1,
- startingChangeNumber.getServerId());
+ try
+ {
+ ChangeNumber startingCN =
+ new ChangeNumber(filter.getAssertionValue().getValue().toString());
+ return new ChangeNumber(startingCN.getTime(),
+ startingCN.getSeqnum() - 1, startingCN.getServerId());
+ }
+ catch (Exception e)
+ {
+ // don't try to optimize the search if the ChangeNumber is
+ // not a valid replication ChangeNumber.
+ }
}
- catch (Exception e)
+ }
+ else if (AND.equals(filterType))
+ {
+ for (SearchFilter filterComponent : filter.getFilterComponents())
{
- // don't try to optimize the search if we the ChangeNumber is
- // not a valid replication ChangeNumber.
+ // This code does not expect more than one CN in the search filter.
+ // It is ok, since it is only used by developers/testers for debugging.
+ final ChangeNumber previousCN = extractChangeNumber(filterComponent);
+ if (previousCN != null)
+ {
+ return previousCN;
+ }
}
}
return null;
@@ -779,21 +755,19 @@
/**
- * Export one change.
+ * Exports one change.
*/
- private void processChange(UpdateMsg updateMsg,
- LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
- SearchOperation searchOperation, String baseDN)
+ private void writeChange(UpdateMsg updateMsg, LDIFWriter ldifWriter,
+ SearchOperation searchOperation, String baseDN, boolean isExport)
{
InternalClientConnection conn =
InternalClientConnection.getRootConnection();
Entry entry = null;
DN dn = null;
- ObjectClass objectclass =
+ ObjectClass extensibleObjectOC =
DirectoryServer.getDefaultObjectClass("extensibleObject");
-
try
{
if (updateMsg instanceof LDAPUpdateMsg)
@@ -837,11 +811,9 @@
addAttribute(attrs, attr);
}
}
+ addAttribute(attrs, "changetype", "add");
- Attribute changetype = Attributes.create("changetype", "add");
- addAttribute(attrs, changetype);
-
- if (exportConfig != null)
+ if (isExport)
{
ChangeRecordEntry changeRecord =
new AddChangeRecordEntry(dn, attrs);
@@ -855,51 +827,45 @@
else if (msg instanceof DeleteMsg)
{
dn = computeDN(msg);
-
ChangeRecordEntry changeRecord = new DeleteChangeRecordEntry(dn);
- entry = writeChangeRecord(exportConfig, ldifWriter, changeRecord);
+ entry = writeChangeRecord(ldifWriter, changeRecord, isExport);
}
else if (msg instanceof ModifyMsg)
{
ModifyOperation op = (ModifyOperation)msg.createOperation(conn);
- op.setInternalOperation(true);
dn = computeDN(msg);
ChangeRecordEntry changeRecord =
new ModifyChangeRecordEntry(dn, op.getRawModifications());
- entry = writeChangeRecord(exportConfig, ldifWriter, changeRecord);
+ entry = writeChangeRecord(ldifWriter, changeRecord, isExport);
}
else if (msg instanceof ModifyDNMsg)
{
ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn);
- op.setInternalOperation(true);
dn = computeDN(msg);
- ChangeRecordEntry changeRecord =
- new ModifyDNChangeRecordEntry(dn, op.getNewRDN(), op.deleteOldRDN(),
- op.getNewSuperior());
- entry = writeChangeRecord(exportConfig, ldifWriter, changeRecord);
+ ChangeRecordEntry changeRecord = new ModifyDNChangeRecordEntry(
+ dn, op.getNewRDN(), op.deleteOldRDN(), op.getNewSuperior());
+ entry = writeChangeRecord(ldifWriter, changeRecord, isExport);
}
- if (exportConfig != null)
+
+ if (isExport)
{
this.exportedCount++;
}
else
{
- // Add extensibleObject objectclass and the ChangeNumber
- // in the entry.
- if (!entry.getObjectClasses().containsKey(objectclass))
- entry.addObjectClass(objectclass);
- Attribute changeNumber =
- Attributes.create(CHANGE_NUMBER,
- msg.getChangeNumber().toString());
- addAttribute(entry.getUserAttributes(), changeNumber);
- Attribute domain = Attributes.create("replicationDomain", baseDN);
- addAttribute(entry.getUserAttributes(), domain);
+ // Add extensibleObject objectclass and the ChangeNumber in the entry.
+ if (!entry.getObjectClasses().containsKey(extensibleObjectOC))
+ entry.addObjectClass(extensibleObjectOC);
+
+ addAttribute(entry.getUserAttributes(), CHANGE_NUMBER,
+ msg.getChangeNumber().toString());
+ addAttribute(entry.getUserAttributes(), "replicationDomain", baseDN);
// Get the base DN, scope, and filter for the search.
- DN searchBaseDN = searchOperation.getBaseDN();
+ DN searchBaseDN = searchOperation.getBaseDN();
SearchScope scope = searchOperation.getScope();
SearchFilter filter = searchOperation.getFilter();
@@ -918,52 +884,47 @@
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- String dnStr;
- if (dn == null)
- {
- dnStr = "Unkown";
- }
- else
- {
- dnStr = dn.toNormalizedString();
- }
+
+ final String dnStr = (dn != null) ? dn.toNormalizedString() : "Unknown";
+
Message message;
- if (exportConfig != null)
+ if (isExport)
{
- message = ERR_BACKEND_EXPORT_ENTRY.get(
- dnStr, String.valueOf(e));
+ message = ERR_BACKEND_EXPORT_ENTRY.get(dnStr, String.valueOf(e));
}
else
{
- message = ERR_BACKEND_SEARCH_ENTRY.get(
- dnStr, e.getLocalizedMessage());
+ message = ERR_BACKEND_SEARCH_ENTRY.get(dnStr, e.getLocalizedMessage());
}
logError(message);
}
}
-
private DN computeDN(LDAPUpdateMsg msg) throws DirectoryException
{
return DN.decode("uuid=" + msg.getEntryUUID() + "," + CHANGE_NUMBER + "="
+ msg.getChangeNumber() + "," + msg.getDn() + "," + BASE_DN);
}
- private Entry writeChangeRecord(LDIFExportConfig exportConfig,
- LDIFWriter ldifWriter, ChangeRecordEntry changeRecord)
- throws IOException, LDIFException
+ private Entry writeChangeRecord(LDIFWriter ldifWriter,
+ ChangeRecordEntry changeRecord, boolean isExport) throws IOException,
+ LDIFException
{
- if (exportConfig != null)
+ if (isExport)
{
ldifWriter.writeChangeRecord(changeRecord);
return null;
}
final Writer writer = new Writer();
- final LDIFWriter ldifWriter2 = writer.getLDIFWriter();
- ldifWriter2.writeChangeRecord(changeRecord);
- final LDIFReader ldifReader = writer.getLDIFReader();
- return ldifReader.readEntry();
+ writer.getLDIFWriter().writeChangeRecord(changeRecord);
+ return writer.getLDIFReader().readEntry();
+ }
+
+ private void addAttribute(Map<AttributeType, List<Attribute>> attributes,
+ String attrName, String attrValue)
+ {
+ addAttribute(attributes, Attributes.create(attrName, attrValue));
}
/**
@@ -1035,37 +996,39 @@
return true;
}
-
-
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override()
public void createBackup(BackupConfig backupConfig)
throws DirectoryException
{
- BackupManager backupManager = new BackupManager(getBackendID());
- File backendDir = getFileForPath(getReplicationServerCfg()
- .getReplicationDBDirectory());
- backupManager.createBackup(backendDir, backupConfig);
+ createBackupManager().createBackup(getBackendDir(), backupConfig);
}
-
-
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override()
- public void removeBackup(BackupDirectory backupDirectory,
- String backupID)
+ public void restoreBackup(RestoreConfig restoreConfig)
throws DirectoryException
{
- BackupManager backupManager =
- new BackupManager(getBackendID());
- backupManager.removeBackup(backupDirectory, backupID);
+ createBackupManager().restoreBackup(getBackendDir(), restoreConfig);
}
+ /** {@inheritDoc} */
+ @Override()
+ public void removeBackup(BackupDirectory backupDirectory, String backupID)
+ throws DirectoryException
+ {
+ createBackupManager().removeBackup(backupDirectory, backupID);
+ }
+ private BackupManager createBackupManager()
+ {
+ return new BackupManager(getBackendID());
+ }
+
+ private File getBackendDir() throws DirectoryException
+ {
+ return getFileForPath(getReplicationServerCfg().getReplicationDBDirectory());
+ }
/**
* {@inheritDoc}
@@ -1076,24 +1039,6 @@
return true;
}
-
-
- /**
- * {@inheritDoc}
- */
- @Override()
- public void restoreBackup(RestoreConfig restoreConfig)
- throws DirectoryException
- {
- BackupManager backupManager =
- new BackupManager(getBackendID());
- File backendDir = getFileForPath(getReplicationServerCfg()
- .getReplicationDBDirectory());
- backupManager.restoreBackup(backendDir, restoreConfig);
- }
-
-
-
/**
* {@inheritDoc}
*/
@@ -1204,8 +1149,7 @@
}
}
- // don't do anything if the search is a base search on
- // the backend suffix.
+ // don't do anything if the search is a base search on the backend suffix.
try
{
DN backendBaseDN = DN.decode(BASE_DN);
@@ -1261,7 +1205,9 @@
findSearchContainers(searchBaseDN);
for (ReplicationServerDomain exportContainer : searchContainers)
{
- processContainer(exportContainer, null, null, searchOperation);
+ final ChangeNumber previousCN = extractChangeNumber(searchOperation);
+ writeChangesAfterChangeNumber(exportContainer, null, null,
+ searchOperation, previousCN);
}
}
@@ -1413,4 +1359,3 @@
throw new UnsupportedOperationException("Operation not supported.");
}
}
-
--
Gitblit v1.10.0