From b4f8838b15342670c31753a484abf0129e3c9653 Mon Sep 17 00:00:00 2001
From: jcduff <jcduff@localhost>
Date: Thu, 23 Oct 2008 14:04:24 +0000
Subject: [PATCH] The commit will bring the following features : - An updated version of the underlying database. BDB JE 3.3 is now used. - Attribute API refactoring providing a better abstraction and offering improved performances. - A new GUI called the Control-Panel to replace the Status-Panel: the specifications for this GUI are available on OpenDS Wiki and contains a link to a mockup. See <https://www.opends.org/wiki/page/ControlPanelUISpecification>. - Some changes in the replication protocol to implement "Assured Replication Mode". The specifications are on OpenDS Wiki at <https://www.opends.org/wiki/page/AssuredMode> and section 7 described some of the replication changes required to support this. Assured Replication is not finished, but the main replication protocol changes to support it are done. As explained by Gilles on an email on the Dev mailing list (http://markmail.org/message/46rgo3meq3vriy4a), with these changes the newer versions of OpenDS may not be able to replicate with OpenDS 1.0 instances. - Support for Service Tags on the platforms where the functionality is available and enabled. Specifications are published at <https://www.opends.org/wiki/page/OpenDSServiceTagEnabled>. For more information on Service Tags see <http://wikis.sun.com/display/ServiceTag/Sun+Service+Tag+FAQ>. - The Admin Connector service. In order to provide agentry of the OpenDS server at any time, a new service has been added, dedicated to the administration, configuration and monitoring of the server. An overview of the Admin Connector service and it's use is available on the OpenDS wiki <https://www.opends.org/wiki/page/ManagingAdministrationTrafficToTheServer> - Updates to the various command line tools to support the Admin Connector service. - Some internal re-architecting of the server to put the foundation of future developments such as virtual directory services. The new NetworkGroups and WorkFlow internal services which have been specified in <https://www.opends.org/wiki/page/BasicOperationRoutingThroughNetworkGroup> are now implemented. - Many bug fixes...
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java | 275 ++++++++++++++++++++++++++++++++++++++----------------
1 files changed, 191 insertions(+), 84 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index 0e8cd78..7f3f00a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -44,7 +44,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -72,18 +71,22 @@
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationServerListener;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyMsg;
-import org.opends.server.replication.protocol.UpdateMessage;
+import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeValue;
+import org.opends.server.types.Attributes;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.BackupDirectory;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.CanceledOperationException;
import org.opends.server.types.ConditionResult;
import org.opends.server.types.Control;
import org.opends.server.types.DN;
@@ -91,6 +94,7 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.Entry;
+import org.opends.server.types.FilterType;
import org.opends.server.types.IndexType;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFExportConfig;
@@ -131,6 +135,8 @@
public class ReplicationBackend
extends Backend
{
+ private static final String CHANGE_NUMBER = "replicationChangeNumber";
+
/**
* The tracer object for the debug logger.
*/
@@ -273,15 +279,11 @@
DirectoryServer.getObjectClass(ATTR_OBJECTCLASSES_LC, true);
rootObjectclasses.put(objectclassOC, ATTR_OBJECTCLASSES_LC);
attributes = new LinkedHashMap<AttributeType,List<Attribute>>();
- AttributeType changeType =
- DirectoryServer.getAttributeType("changetype", true);
- LinkedHashSet<AttributeValue> valueSet =
- new LinkedHashSet<AttributeValue>(1);
- valueSet.add(new AttributeValue(changeType, "add"));
- Attribute a = new Attribute(changeType, "changetype", valueSet);
+
+ Attribute a = Attributes.create("changetype", "add");
ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
attrList.add(a);
- attributes.put(changeType, attrList);
+ attributes.put(a.getAttributeType(), attrList);
operationalAttributes = new LinkedHashMap<AttributeType,List<Attribute>>();
}
@@ -465,7 +467,7 @@
* {@inheritDoc}
*/
@Override()
- public synchronized void replaceEntry(Entry entry,
+ public synchronized void replaceEntry(Entry oldEntry, Entry newEntry,
ModifyOperation modifyOperation)
throws DirectoryException
{
@@ -649,13 +651,11 @@
new HashMap<AttributeType,List<Attribute>>();
ArrayList<Attribute> ldapAttrList = new ArrayList<Attribute>();
- AttributeType ocType=
- DirectoryServer.getAttributeType("objectclass", true);
- LinkedHashSet<AttributeValue> ocValues =
- new LinkedHashSet<AttributeValue>();
- ocValues.add(new AttributeValue(ocType, "top"));
- ocValues.add(new AttributeValue(ocType, "domain"));
- Attribute ocAttr = new Attribute(ocType, "objectclass", ocValues);
+ AttributeType ocType = DirectoryServer.getObjectClassAttributeType();
+ AttributeBuilder builder = new AttributeBuilder(ocType);
+ builder.add("top");
+ builder.add("domain");
+ Attribute ocAttr = builder.toAttribute();
ldapAttrList.add(ocAttr);
attributes.put(ocType, ldapAttrList);
@@ -676,31 +676,25 @@
}
attributes.clear();
+
ldapAttrList.clear();
-
ldapAttrList.add(ocAttr);
+ attributes.put(ocType, ldapAttrList);
- AttributeType stateType=
- DirectoryServer.getAttributeType("state", true);
- LinkedHashSet<AttributeValue> stateValues =
- new LinkedHashSet<AttributeValue>();
- stateValues.add(new AttributeValue(stateType,
- exportContainer.getDbServerState().toString()));
TRACER.debugInfo("State=" +
exportContainer.getDbServerState().toString());
- Attribute stateAttr = new Attribute(ocType, "state", stateValues);
+ Attribute stateAttr = Attributes.create("state", exportContainer
+ .getDbServerState().toString());
+ ldapAttrList.clear();
ldapAttrList.add(stateAttr);
+ attributes.put(stateAttr.getAttributeType(), ldapAttrList);
- AttributeType genidType=
- DirectoryServer.getAttributeType("generation-id", true);
- LinkedHashSet<AttributeValue> genidValues =
- new LinkedHashSet<AttributeValue>();
- genidValues.add(new AttributeValue(genidType,
- String.valueOf(exportContainer.getGenerationId())+
- exportContainer.getBaseDn()));
- Attribute genidAttr = new Attribute(ocType, "generation-id", genidValues);
+ Attribute genidAttr = Attributes.create("generation-id", String
+ .valueOf(exportContainer.getGenerationId())
+ + exportContainer.getBaseDn());
+ ldapAttrList.clear();
ldapAttrList.add(genidAttr);
- attributes.put(genidType, ldapAttrList);
+ attributes.put(genidAttr.getAttributeType(), ldapAttrList);
try
{
@@ -739,13 +733,44 @@
break;
}
+ ChangeNumber previousChangeNumber = null;
+ if (searchOperation != null)
+ {
+ // Try to optimize for filters like replicationChangeNumber>=xxxxx
+ // or replicationChangeNumber=xxxxx :
+ // If the search filter is one of these 2 filters, move directly to
+ // ChangeNumber=xxxx before starting the iteration.
+ SearchFilter filter = searchOperation.getFilter();
+ previousChangeNumber = extractChangeNumber(filter);
+
+ if ((previousChangeNumber == null) &&
+ (filter.getFilterType().equals(FilterType.AND)))
+ {
+ for (SearchFilter filterComponents: filter.getFilterComponents())
+ {
+ previousChangeNumber = extractChangeNumber(filterComponents);
+ if (previousChangeNumber != null)
+ break;
+ }
+ }
+ }
+
+
ReplicationIterator ri = rsd.getChangelogIterator(serverId,
- null);
+ previousChangeNumber);
if (ri != null)
{
try
{
+ int lookthroughCount = 0;
+ int lookthroughLimit = 0;
+ if (searchOperation != null)
+ {
+ lookthroughLimit =
+ searchOperation.getClientConnection().getLookthroughLimit();
+ }
+
// Walk through the changes
while (ri.getChange() != null)
{
@@ -753,8 +778,32 @@
{
break;
}
- UpdateMessage msg = ri.getChange();
- processChange(msg, exportConfig, ldifWriter, searchOperation);
+ if (searchOperation != null)
+ {
+ try
+ {
+ if (lookthroughLimit > 0 && lookthroughCount > lookthroughLimit)
+ {
+ // Lookthrough limit exceeded
+ searchOperation.setResultCode(
+ ResultCode.ADMIN_LIMIT_EXCEEDED);
+ searchOperation.setErrorMessage(null);
+ break;
+ }
+
+ searchOperation.checkIfCanceled(false);
+ } catch (CanceledOperationException e)
+ {
+ searchOperation.setResultCode(ResultCode.CANCELED);
+ searchOperation.setErrorMessage(null);
+ break;
+ }
+ }
+ lookthroughCount++;
+ UpdateMsg msg = ri.getChange();
+ processChange(
+ msg, exportConfig, ldifWriter, searchOperation,
+ rsd.getBaseDn().toString());
if (!ri.next())
break;
}
@@ -768,17 +817,60 @@
}
/**
+ * Attempt to extract a ChangeNumber from searchFilter like
+ * ReplicationChangeNumber=xxxx or ReplicationChangeNumber>=xxxx.
+ *
+ * @param filter The filter to evaluate.
+ *
+ * @return The extracted ChangeNumber or null if no ChangeNumber
+ * was found.
+ */
+ private ChangeNumber extractChangeNumber(SearchFilter filter)
+ {
+ AttributeType changeNumberAttrType =
+ DirectoryServer.getDefaultAttributeType(CHANGE_NUMBER);
+
+ FilterType filterType = filter.getFilterType();
+
+ if ( (filterType.equals(FilterType.GREATER_OR_EQUAL) ||
+ filterType.equals(FilterType.EQUALITY) ) &&
+ (filter.getAttributeType().equals(changeNumberAttrType)))
+ {
+ try
+ {
+ ChangeNumber startingChangeNumber =
+ new ChangeNumber(filter.getAssertionValue().getStringValue());
+ return new ChangeNumber(
+ startingChangeNumber.getTime(),
+ startingChangeNumber.getSeqnum()-1,
+ startingChangeNumber.getServerId());
+ }
+ catch (Exception e)
+ {
+ // don't try to optimize the search if we the ChangeNumber is
+ // not a valid replication ChangeNumber.
+ }
+ }
+ return null;
+ }
+
+
+ /**
* Export one change.
*/
- private void processChange(UpdateMessage msg,
+ private void processChange(UpdateMsg msg,
LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
- SearchOperation searchOperation)
+ SearchOperation searchOperation, String baseDN)
{
InternalClientConnection conn =
InternalClientConnection.getRootConnection();
Entry entry = null;
DN dn = null;
+ ObjectClass objectclass =
+ DirectoryServer.getDefaultObjectClass("extensibleObject");
+
+
try
{
if (msg instanceof AddMsg)
@@ -786,43 +878,51 @@
AddMsg addMsg = (AddMsg)msg;
AddOperation addOperation = (AddOperation)msg.createOperation(conn);
- dn = DN.decode("puid=" + addMsg.getParentUid() + "," +
- "changeNumber=" + msg.getChangeNumber().toString() + "," +
- msg.getDn() +","+ BASE_DN);
+ dn = DN.decode("puid=" + addMsg.getParentUid() + "+" +
+ CHANGE_NUMBER + "=" + msg.getChangeNumber().toString() + "+" +
+ msg.getDn() + "," + BASE_DN);
Map<AttributeType,List<Attribute>> attributes =
new HashMap<AttributeType,List<Attribute>>();
+ Map<ObjectClass, String> objectclasses =
+ new HashMap<ObjectClass, String>();
for (RawAttribute a : addOperation.getRawAttributes())
{
Attribute attr = a.toAttribute();
- AttributeType attrType = attr.getAttributeType();
- List<Attribute> attrs = attributes.get(attrType);
- if (attrs == null)
+ if (attr.getAttributeType().isObjectClassType())
{
- attrs = new ArrayList<Attribute>(1);
- attrs.add(attr);
- attributes.put(attrType, attrs);
+ for (ByteString os : a.getValues())
+ {
+ String ocName = os.toString();
+ ObjectClass oc =
+ DirectoryServer.getObjectClass(toLowerCase(ocName));
+ if (oc == null)
+ {
+ oc = DirectoryServer.getDefaultObjectClass(ocName);
+ }
+
+ objectclasses.put(oc,ocName);
+ }
}
else
{
- attrs.add(attr);
+ addAttribute(attributes, attr);
}
}
- AddChangeRecordEntry changeRecord =
- new AddChangeRecordEntry(dn, attributes);
+ Attribute changetype = Attributes.create("changetype", "add");
+ addAttribute(attributes, changetype);
+
if (exportConfig != null)
{
+ AddChangeRecordEntry changeRecord =
+ new AddChangeRecordEntry(dn, attributes);
ldifWriter.writeChangeRecord(changeRecord);
}
else
{
- Writer writer = new Writer();
- LDIFWriter ldifWriter2 = writer.getLDIFWriter();
- ldifWriter2.writeChangeRecord(changeRecord);
- LDIFReader reader = writer.getLDIFReader();
- entry = reader.readEntry();
+ entry = new Entry(dn, objectclasses, attributes, null);
}
}
else if (msg instanceof DeleteMsg)
@@ -830,7 +930,7 @@
DeleteMsg delMsg = (DeleteMsg)msg;
dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
- "changeNumber=" + delMsg.getChangeNumber().toString()+ "," +
+ CHANGE_NUMBER + "=" + delMsg.getChangeNumber().toString()+ "," +
msg.getDn() +","+ BASE_DN);
DeleteChangeRecordEntry changeRecord =
@@ -853,7 +953,7 @@
ModifyOperation op = (ModifyOperation)msg.createOperation(conn);
dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
- "changeNumber=" + msg.getChangeNumber().toString()+ "," +
+ CHANGE_NUMBER + "=" + msg.getChangeNumber().toString()+ "," +
msg.getDn() +","+ BASE_DN);
op.setInternalOperation(true);
@@ -877,7 +977,7 @@
ModifyDNOperation op = (ModifyDNOperation)msg.createOperation(conn);
dn = DN.decode("uuid=" + msg.getUniqueId() + "," +
- "changeNumber=" + msg.getChangeNumber().toString()+ "," +
+ CHANGE_NUMBER + "=" + msg.getChangeNumber().toString()+ "," +
msg.getDn() +","+ BASE_DN);
op.setInternalOperation(true);
@@ -906,6 +1006,15 @@
}
else
{
+ // Add extensibleObject objectclass and the ChangeNumber
+ // in the entry.
+ entry.addObjectClass(objectclass);
+ Attribute changeNumber =
+ Attributes.create(CHANGE_NUMBER, msg.getChangeNumber().toStringUI());
+ addAttribute(entry.getUserAttributes(), changeNumber);
+ Attribute domain = Attributes.create("replicationDomain", baseDN);
+ addAttribute(entry.getUserAttributes(), domain);
+
// Get the base DN, scope, and filter for the search.
DN searchBaseDN = searchOperation.getBaseDN();
SearchScope scope = searchOperation.getScope();
@@ -941,7 +1050,28 @@
}
}
-
+ /**
+ * Add an attribute to a provided Map of attribute.
+ *
+ * @param attributes The Map that should be updated.
+ * @param attribute The attribute that should be added to the Map.
+ */
+ private void addAttribute(
+ Map<AttributeType,List<Attribute>> attributes, Attribute attribute)
+ {
+ AttributeType attrType = attribute.getAttributeType();
+ List<Attribute> attrs = attributes.get(attrType);
+ if (attrs == null)
+ {
+ attrs = new ArrayList<Attribute>(1);
+ attrs.add(attribute);
+ attributes.put(attrType, attrs);
+ }
+ else
+ {
+ attrs.add(attribute);
+ }
+ }
/**
* {@inheritDoc}
@@ -1189,17 +1319,6 @@
{
if (baseDNSet.contains(searchBaseDN))
{
- // Get the base DN, scope, and filter for the search.
- SearchScope scope = searchOperation.getScope();
- SearchFilter filter = searchOperation.getFilter();
- Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes,
- operationalAttributes);
-
- if (re.matchesBaseAndScope(searchBaseDN, scope) &&
- filter.matchesEntry(re))
- {
- searchOperation.returnEntry(re, new LinkedList<Control>());
- }
return;
}
else
@@ -1212,18 +1331,6 @@
}
}
- // Get the base DN, scope, and filter for the search.
- SearchScope scope = searchOperation.getScope();
- SearchFilter filter = searchOperation.getFilter();
- Entry re = new Entry(searchBaseDN, rootObjectclasses, attributes,
- operationalAttributes);
-
- if (re.matchesBaseAndScope(searchBaseDN, scope) &&
- filter.matchesEntry(re))
- {
- searchOperation.returnEntry(re, new LinkedList<Control>());
- }
-
// Walk through all entries and send the ones that match.
Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator();
if (rsdi != null)
--
Gitblit v1.10.0