From e54104537c0b558b6625eb1b6b5f56b1e69cc8f3 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 01 Sep 2006 12:04:47 +0000
Subject: [PATCH] issue 604 : solve the naming conflict that might happen when several masters are used there are 3 main parts in this commit : - attach the replication context in an OperationContext - if operation replay fails then fix the problem - in the pre-op checks for conflict and cause failure if necessary most of the time there should be no conflict and the operation should be processed normally
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java | 137 +-
opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyContext.java | 46 +
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ModifyConflictTest.java | 101 +-
opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDnContext.java | 63 +
opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java | 948 ++++++++++++++++++---
opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyFakeOperation.java | 7
opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java | 91 -
opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java | 225 +++++
opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java | 121 +-
opendj-sdk/opends/src/server/org/opends/server/changelog/ChangelogDB.java | 5
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java | 22
opendj-sdk/opends/src/server/org/opends/server/synchronization/Historical.java | 75 +
opendj-sdk/opends/src/server/org/opends/server/changelog/ChangelogCache.java | 5
opendj-sdk/opends/src/server/org/opends/server/synchronization/ListenerThread.java | 108 --
opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java | 184 +--
opendj-sdk/opends/src/server/org/opends/server/core/ModifyDNOperation.java | 61
opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteContext.java | 44 +
opendj-sdk/opends/resource/config/config.ldif | 15
opendj-sdk/opends/src/server/org/opends/server/synchronization/OperationContext.java | 121 ++
opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyMsg.java | 102 -
opendj-sdk/opends/src/server/org/opends/server/synchronization/AddContext.java | 62 +
opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchMessages.java | 18
22 files changed, 1,824 insertions(+), 737 deletions(-)
diff --git a/opendj-sdk/opends/resource/config/config.ldif b/opendj-sdk/opends/resource/config/config.ldif
index 3d505af..5b128f4 100644
--- a/opendj-sdk/opends/resource/config/config.ldif
+++ b/opendj-sdk/opends/resource/config/config.ldif
@@ -126,7 +126,6 @@
ds-cfg-index-type: presence
ds-cfg-index-type: equality
ds-cfg-index-type: substring
-ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=givenName,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
@@ -135,7 +134,6 @@
ds-cfg-index-type: presence
ds-cfg-index-type: equality
ds-cfg-index-type: substring
-ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=mail,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
@@ -144,14 +142,12 @@
ds-cfg-index-type: presence
ds-cfg-index-type: equality
ds-cfg-index-type: substring
-ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=member,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
objectClass: ds-cfg-je-index
ds-cfg-index-attribute: member
ds-cfg-index-type: equality
-ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=sn,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
@@ -160,7 +156,6 @@
ds-cfg-index-type: presence
ds-cfg-index-type: equality
ds-cfg-index-type: substring
-ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=telephoneNumber,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
@@ -169,22 +164,24 @@
ds-cfg-index-type: presence
ds-cfg-index-type: equality
ds-cfg-index-type: substring
-ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=uid,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
objectClass: ds-cfg-je-index
ds-cfg-index-attribute: uid
ds-cfg-index-type: equality
-ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=ds-sync-hist,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
objectClass: ds-cfg-je-index
-objectClass: extensibleObject
ds-cfg-index-attribute: ds-sync-hist
ds-cfg-index-type: ordering
-ds-cfg-index-entry-limit: 4000
+
+dn: ds-cfg-index-attribute=entryuuid,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
+objectClass: top
+objectClass: ds-cfg-je-index
+ds-cfg-index-attribute: entryuuid
+ds-cfg-index-type: equality
dn: cn=JE Database,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
diff --git a/opendj-sdk/opends/src/server/org/opends/server/changelog/ChangelogCache.java b/opendj-sdk/opends/src/server/org/opends/server/changelog/ChangelogCache.java
index 5db10e3..460d4bc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/changelog/ChangelogCache.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/changelog/ChangelogCache.java
@@ -265,8 +265,6 @@
{
/*
* create the balanced tree that will be used to forward changes
- * TODO initialize it will all the previous changes that this replicaID
- * has not seen
*/
synchronized (connectedServers)
{
@@ -305,8 +303,6 @@
{
/*
* create the balanced tree that will be used to forward changes
- * TODO initialize it will all the previous changes that this replicaID
- * has not seen
* TODO throw proper exception
*/
synchronized (changelogServers)
@@ -482,7 +478,6 @@
public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
short serverId)
{
- // TODO Auto-generated method stub
ServerHandler handler;
if (isLDAPserver)
handler = connectedServers.get(serverId);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/changelog/ChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/changelog/ChangelogDB.java
index d9e09a8..e03e7b6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/changelog/ChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/changelog/ChangelogDB.java
@@ -349,7 +349,7 @@
catch (DatabaseException dbe)
{
}
- /* database is faulty : TODO : log better message */
+ /* database is faulty */
int msgID = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR;
String message = getMessage(msgID) + stackTraceToSingleLineString(e);
logError(ErrorLogCategory.SYNCHRONIZATION,
@@ -553,7 +553,8 @@
* to continue with the next record.
* In such case, it is therefore possible that we miss some changes.
* TODO. log an error message.
- * TODO. Such problem should be handled by the repair functionality.
+ * TODO : REPAIR : Such problem should be handled by the
+ * repair functionality.
*/
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/ModifyDNOperation.java b/opendj-sdk/opends/src/server/org/opends/server/core/ModifyDNOperation.java
index 2a91f81..bdb7f6a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/ModifyDNOperation.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/ModifyDNOperation.java
@@ -1137,36 +1137,6 @@
}
- // Invoke any conflict resolution processing that might be needed by the
- // synchronization provider.
- for (SynchronizationProvider provider :
- DirectoryServer.getSynchronizationProviders())
- {
- try
- {
- SynchronizationProviderResult result =
- provider.handleConflictResolution(this);
- if (! result.continueOperationProcessing())
- {
- break modifyDNProcessing;
- }
- }
- catch (DirectoryException de)
- {
- assert debugException(CLASS_NAME, "run", de);
-
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- MSGID_MODDN_SYNCH_CONFLICT_RESOLUTION_FAILED,
- getConnectionID(), getOperationID(),
- stackTraceToSingleLineString(de));
-
- setResponseData(de);
- break modifyDNProcessing;
- }
- }
-
-
// Get the current entry from the appropriate backend. If it doesn't
// exist, then fail.
try
@@ -1215,6 +1185,37 @@
break modifyDNProcessing;
}
+
+ // Invoke any conflict resolution processing that might be needed by the
+ // synchronization provider.
+ for (SynchronizationProvider provider :
+ DirectoryServer.getSynchronizationProviders())
+ {
+ try
+ {
+ SynchronizationProviderResult result =
+ provider.handleConflictResolution(this);
+ if (! result.continueOperationProcessing())
+ {
+ break modifyDNProcessing;
+ }
+ }
+ catch (DirectoryException de)
+ {
+ assert debugException(CLASS_NAME, "run", de);
+
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ MSGID_MODDN_SYNCH_CONFLICT_RESOLUTION_FAILED,
+ getConnectionID(), getOperationID(),
+ stackTraceToSingleLineString(de));
+
+ setResponseData(de);
+ break modifyDNProcessing;
+ }
+ }
+
+
// Check to see if the client has permission to perform the
// modify DN.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddContext.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddContext.java
new file mode 100644
index 0000000..c736429
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddContext.java
@@ -0,0 +1,62 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization;
+
+/**
+ * This class describe the Synchronization contexte that is attached to
+ * Add Operation.
+ */
+public class AddContext extends OperationContext
+{
+ /**
+ * The Unique Id of the parent entry od the added entry.
+ */
+ private String parentUid;
+
+ /**
+ * Creates a new AddContext with the provided information.
+ *
+ * @param changeNumber The change number of the add operation.
+ * @param uid the Unique Id of the added entry.
+ * @param parentUid The unique Id of the parent of the added entry.
+ */
+ public AddContext(ChangeNumber changeNumber, String uid, String parentUid)
+ {
+ super(changeNumber, uid);
+ this.parentUid = parentUid;
+ }
+
+ /**
+ * Get the Unique Id of the parent of the added entry.
+ *
+ * @return Returns the Unique Id of the parent of the added entry.
+ */
+ public String getParentUid()
+ {
+ return parentUid;
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java
index 31aa4c9..d177029 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/AddMsg.java
@@ -45,7 +45,7 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeValue;
-import static org.opends.server.synchronization.SynchMessages.*;
+import static org.opends.server.synchronization.OperationContext.*;
import static org.opends.server.util.StaticUtils.toLowerCase;
/**
@@ -55,8 +55,8 @@
public class AddMsg extends UpdateMessage
{
private static final long serialVersionUID = -4905520652801395185L;
- private String dn;
private byte[] encodedAttributes ;
+ private String parentUniqueId;
/**
* Creates a new AddMessage.
@@ -64,6 +64,12 @@
*/
public AddMsg(AddOperation op)
{
+ super((AddContext) op.getAttachment(SYNCHROCONTEXT),
+ op.getRawEntryDN().stringValue());
+
+ AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
+ this.parentUniqueId = ctx.getParentUid();
+
// Encode the object classes (SET OF LDAPString).
LinkedHashSet<AttributeValue> ocValues =
new LinkedHashSet<AttributeValue>(op.getObjectClasses().size());
@@ -99,12 +105,8 @@
}
}
- dn = op.getRawEntryDN().stringValue();
-
// Encode the sequence.
encodedAttributes = ASN1Element.encodeValue(elems);
-
- changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
}
/**
@@ -112,18 +114,22 @@
*
* @param cn ChangeNumber of the add.
* @param dn DN of the added entry.
+ * @param uniqueId The Unique identifier of the added entry.
+ * @param parentId The unique Id of the parent of the added entry.
* @param objectClass objectclass of the added entry.
* @param userAttributes user attributes of the added entry.
* @param operationalAttributes operational attributes of the added entry.
*/
public AddMsg(ChangeNumber cn,
String dn,
+ String uniqueId,
+ String parentId,
Attribute objectClass,
Collection<Attribute> userAttributes,
Collection<Attribute> operationalAttributes)
{
- this.dn = dn;
- this.changeNumber = cn;
+ super (new AddContext(cn, uniqueId, parentId), dn);
+ this.parentUniqueId = parentId;
ArrayList<ASN1Element> elems = new ArrayList<ASN1Element>();
elems.add(new LDAPAttribute(objectClass).encode());
@@ -142,40 +148,29 @@
*
* @param in The byte[] from which the operation must be read.
* @throws DataFormatException The input byte[] is not a valid AddMsg
+ * @throws UnsupportedEncodingException If UTF8 is not supported by the jvm
*/
- public AddMsg(byte[] in) throws DataFormatException
+ public AddMsg(byte[] in) throws DataFormatException,
+ UnsupportedEncodingException
{
- /* first byte is the type */
- if (in[0] != MSG_TYPE_ADD_REQUEST)
- throw new DataFormatException("byte[] is not a valid add msg");
- int pos = 1;
+ super(in);
- /* read the dn
- * first calculate the length then construct the string
- */
- int length = 0;
- int offset = pos;
- while (in[pos++] != 0)
+ int pos = decodeHeader(MSG_TYPE_ADD_REQUEST, in);
+
+ // read the parent unique Id
+ int length = getNextLength(in, pos);
+ if (length != 0)
{
- if (pos > in.length)
- throw new DataFormatException("byte[] is not a valid add msg");
- length++;
+ parentUniqueId = new String(in, pos, length, "UTF-8");
+ pos += length + 1;
}
- try
+ else
{
- dn = new String(in, offset, length, "UTF-8");
-
- /* read the changeNumber
- * it is always 24 characters long
- */
- String changenumberStr = new String(in, pos, 24, "UTF-8");
- changeNumber = new ChangeNumber(changenumberStr);
- pos +=24;
- } catch (UnsupportedEncodingException e ) {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ parentUniqueId = null;
+ pos += 1;
}
- /* Read the attributes : all the remaining bytes */
+ // Read the attributes : all the remaining bytes
encodedAttributes = new byte[in.length-pos];
int i =0;
while (pos<in.length)
@@ -185,14 +180,11 @@
}
/**
- * Create and return an AddOperation from a received ADD message.
- * @param connection The connection where we received the message.
- * @return The created operation.
- * @throws LDAPException In case msg encoding was not valid.
- * @throws ASN1Exception In case msg encoding was not valid.
+ * {@inheritDoc}
*/
@Override
- public AddOperation createOperation(InternalClientConnection connection)
+ public AddOperation createOperation(InternalClientConnection connection,
+ String newDn)
throws LDAPException, ASN1Exception
{
ArrayList<LDAPAttribute> attr = new ArrayList<LDAPAttribute>();
@@ -207,9 +199,10 @@
AddOperation add = new AddOperation(connection,
InternalClientConnection.nextOperationID(),
InternalClientConnection.nextMessageID(), null,
- new ASN1OctetString(dn), attr);
-
- add.setAttachment(SYNCHRONIZATION, getChangeNumber());
+ new ASN1OctetString(newDn), attr);
+ AddContext ctx = new AddContext(getChangeNumber(), getUniqueId(),
+ parentUniqueId);
+ add.setAttachment(SYNCHROCONTEXT, ctx);
return add;
}
@@ -220,36 +213,30 @@
@Override
public byte[] getBytes()
{
- byte[] byteDn;
try
{
- byteDn = dn.getBytes("UTF-8");
-
- /* The ad message is stored in the form :
- * <operation type><dn><changenumber><attributes>
- * the length of result byte array is therefore :
- * 1 + dn length + 1 + 24 + attribute length
- */
- int length = 1 + byteDn.length + 1 + 24 + encodedAttributes.length;
- byte[] resultByteArray = new byte[length];
- int pos = 1;
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_ADD_REQUEST;
- /* put the DN and a terminating 0 */
- for (int i = 0; i< byteDn.length; i++,pos++)
+ int length = encodedAttributes.length;
+ byte[] byteParentId = null;
+ if (parentUniqueId != null)
{
- resultByteArray[pos] = byteDn[i];
+ byteParentId = parentUniqueId.getBytes("UTF-8");
+ length += byteParentId.length + 1;
}
- resultByteArray[pos++] = 0;
- /* put the ChangeNumber */
- byte[] changeNumberByte =
- this.getChangeNumber().toString().getBytes("UTF-8");
- for (int i=0; i<24; i++,pos++)
+ else
{
- resultByteArray[pos] = changeNumberByte[i];
+ length += 1;
}
+ /* encode the header in a byte[] large enough to also contain the mods */
+ byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD_REQUEST, length);
+
+ int pos = resultByteArray.length - length;
+
+ if (byteParentId != null)
+ pos = addByteArray(byteParentId, resultByteArray, pos);
+ else
+ resultByteArray[pos++] = 0;
+
/* put the attributes */
for (int i=0; i<encodedAttributes.length; i++,pos++)
{
@@ -271,6 +258,6 @@
@Override
public String toString()
{
- return ("ADD " + dn + " " + getChangeNumber());
+ return ("ADD " + getDn() + " " + getChangeNumber());
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteContext.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteContext.java
new file mode 100644
index 0000000..f99ff0f
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteContext.java
@@ -0,0 +1,44 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization;
+
+/**
+ * This class is used to describe the context attached to a Delete Operation.
+ */
+public class DeleteContext extends OperationContext
+{
+ /**
+ * Creates a new DeleteContext with the provided information.
+ *
+ * @param changeNumber The change number of the Delete Operation.
+ * @param uid The unique Id of the deleted entry.
+ */
+ public DeleteContext(ChangeNumber changeNumber, String uid)
+ {
+ super(changeNumber, uid);
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java
index 3e7388f..3fec9b3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/DeleteMsg.java
@@ -26,7 +26,7 @@
*/
package org.opends.server.synchronization;
-import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION;
+import static org.opends.server.synchronization.OperationContext.*;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
@@ -41,7 +41,6 @@
*/
public class DeleteMsg extends UpdateMessage
{
- private String dn;
private static final long serialVersionUID = -4905520652801395185L;
/**
@@ -50,8 +49,8 @@
*/
public DeleteMsg(DeleteOperation op)
{
- dn = op.getRawEntryDN().stringValue();
- changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
+ super((OperationContext) op.getAttachment(SYNCHROCONTEXT),
+ op.getRawEntryDN().stringValue());
}
/**
@@ -59,55 +58,29 @@
*
* @param in The byte[] from which the operation must be read.
* @throws DataFormatException The input byte[] is not a valid AddMsg
+ * @throws UnsupportedEncodingException If UTF8 is not supported by the jvm
*/
- public DeleteMsg(byte[] in) throws DataFormatException
+ public DeleteMsg(byte[] in) throws DataFormatException,
+ UnsupportedEncodingException
{
- /* first byte is the type */
- if (in[0] != MSG_TYPE_DELETE_REQUEST)
- throw new DataFormatException("byte[] is not a valid delete msg");
- int pos = 1;
-
- /* read the dn
- * first calculate the length then construct the string
- */
- int length = 0;
- int offset = pos;
- while (in[pos++] != 0)
- {
- if (pos > in.length)
- throw new DataFormatException("byte[] is not a valid delete msg");
- length++;
- }
- try
- {
- dn = new String(in, offset, length, "UTF-8");
-
- /* read the changeNumber
- * it is always 24 characters long
- */
- String changenumberStr = new String(in, pos, 24, "UTF-8");
- changeNumber = new ChangeNumber(changenumberStr);
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
+ super(in);
+ decodeHeader(MSG_TYPE_DELETE_REQUEST, in);
}
/**
- * Create an Operation from a delete Message.
- *
- * @param connection the connection
- * @return the Operation from which the message was received
+ * {@inheritDoc}
*/
@Override
- public Operation createOperation(InternalClientConnection connection)
+ public Operation createOperation(InternalClientConnection connection,
+ String newDn)
{
DeleteOperation del = new DeleteOperation(connection,
InternalClientConnection.nextOperationID(),
InternalClientConnection.nextMessageID(), null,
- new ASN1OctetString(dn));
- del.setAttachment(SYNCHRONIZATION, getChangeNumber());
+ new ASN1OctetString(newDn));
+ DeleteContext ctx = new DeleteContext(getChangeNumber(), getUniqueId());
+ del.setAttachment(SYNCHROCONTEXT, ctx);
return del;
}
@@ -119,44 +92,14 @@
@Override
public byte[] getBytes()
{
- byte[] byteDn;
try
{
- byteDn = dn.getBytes("UTF-8");
-
- /* The Delete message is stored in the form :
- * <operation type><dn><changenumber>
- * the length of result byte array is therefore :
- * 1 + dn length + 1 + 24
- */
- int length = 1 + byteDn.length + 1 + 24;
- byte[] resultByteArray = new byte[length];
- int pos = 1;
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_DELETE_REQUEST;
-
- /* put the DN and a terminating 0 */
- for (int i = 0; i< byteDn.length; i++,pos++)
- {
- resultByteArray[pos] = byteDn[i];
- }
- resultByteArray[pos++] = 0;
-
- /* put the ChangeNumber */
- byte[] changeNumberByte =
- this.getChangeNumber().toString().getBytes("UTF-8");
- for (int i=0; i<24; i++,pos++)
- {
- resultByteArray[pos] = changeNumberByte[i];
- }
-
- return resultByteArray;
+ return encodeHeader(MSG_TYPE_DELETE_REQUEST, 0);
} catch (UnsupportedEncodingException e)
{
// should never happen : TODO : log error properly
+ return null;
}
- return null;
}
/**
@@ -165,6 +108,6 @@
@Override
public String toString()
{
- return ("DEL " + dn + " " + getChangeNumber());
+ return ("DEL " + getDn() + " " + getChangeNumber());
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/Historical.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/Historical.java
index 3da4227..fa0d2a1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/Historical.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/Historical.java
@@ -35,6 +35,7 @@
import java.util.Set;
import java.util.TreeMap;
+import org.opends.server.core.AddOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.types.Attribute;
@@ -44,7 +45,6 @@
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
-import static org.opends.server.synchronization.SynchMessages.*;
/**
* This class is used to store historical information that is
@@ -68,6 +68,9 @@
static final String HISTORICALATTRIBUTENAME = "ds-sync-hist";
static final AttributeType historicalAttrType =
DirectoryServer.getSchema().getAttributeType(HISTORICALATTRIBUTENAME);
+ static final String ENTRYUIDNAME = "entryuuid";
+ static final AttributeType entryuuidAttrType =
+ DirectoryServer.getSchema().getAttributeType(ENTRYUIDNAME);
/*
* The last update seen on this entry, allows fast conflict detection.
@@ -109,7 +112,7 @@
{
List<Modification> mods = modifyOperation.getModifications();
ChangeNumber changeNumber =
- (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION);
+ OperationContext.getChangeNumber(modifyOperation);
for (Iterator modsIterator = mods.iterator(); modsIterator.hasNext();)
{
@@ -288,7 +291,8 @@
List<Modification> mods = modifyOperation.getModifications();
Entry modifiedEntry = modifyOperation.getModifiedEntry();
ChangeNumber changeNumber =
- (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION);
+ OperationContext.getChangeNumber(modifyOperation);
+
/*
* If this is a local operation we need first to update the historical
* information, then update the entry with the historical information
@@ -802,7 +806,7 @@
} catch (Exception e)
{
/*
- * TODO This Exception shows that there are some
+ * TODO : REPAIR : This Exception shows that there are some
* inconsistency in the historical information.
* This method can't fix the problem.
* This should be logged and somehow the repair
@@ -812,13 +816,70 @@
}
else
{
- modifyFakeOperation = new ModifyFakeOperation(entry.getDN(),cn);
- modifyFakeOperation.addModification(mod);
- operations.put(histVal.getCn(), modifyFakeOperation);
+ String uuidString = getEntryUuid(entry);
+ if (uuidString != null)
+ {
+ modifyFakeOperation = new ModifyFakeOperation(entry.getDN(),
+ cn, uuidString);
+
+ modifyFakeOperation.addModification(mod);
+ operations.put(histVal.getCn(), modifyFakeOperation);
+ }
}
}
}
}
return operations.values();
}
+
+ /**
+ * Get the entry unique Id in String form.
+ *
+ * @param entry The entry for which the unique id should be returned.
+ *
+ * @return The Unique Id of the entry if it has one. null, otherwise.
+ */
+ public static String getEntryUuid(Entry entry)
+ {
+ String uuidString = null;
+ List<Attribute> uuidAttrs =
+ entry.getOperationalAttribute(entryuuidAttrType);
+ if (uuidAttrs != null)
+ {
+ Attribute uuid = uuidAttrs.get(0);
+ if (uuid.hasValue())
+ {
+ AttributeValue uuidVal = uuid.getValues().iterator().next();
+ uuidString = uuidVal.getStringValue();
+ }
+ }
+ return uuidString;
+ }
+
+ /**
+ * Get the Entry Unique Id from an add operation.
+ * This must be called after the entry uuid preop plugin (i.e no
+ * sooner than the synchronization provider pre-op)
+ *
+ * @param op The operation
+ * @return The Entry Unique Id String form.
+ */
+ public static String getEntryUuid(AddOperation op)
+ {
+ String uuidString = null;
+ Map<AttributeType, List<Attribute>> attrs = op.getOperationalAttributes();
+ List<Attribute> uuidAttrs = attrs.get(entryuuidAttrType);
+
+ if (uuidAttrs != null)
+ {
+ Attribute uuid = uuidAttrs.get(0);
+ if (uuid.hasValue())
+ {
+ AttributeValue uuidVal = uuid.getValues().iterator().next();
+ uuidString = uuidVal.getStringValue();
+ }
+ }
+ return uuidString;
+ }
}
+
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ListenerThread.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ListenerThread.java
index 56276cd..1d9e66b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ListenerThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ListenerThread.java
@@ -26,22 +26,15 @@
*/
package org.opends.server.synchronization;
-import java.util.zip.DataFormatException;
-
-import org.opends.server.api.DirectoryThread;
-import org.opends.server.core.Operation;
-import org.opends.server.protocols.asn1.ASN1Exception;
-import org.opends.server.protocols.internal.InternalClientConnection;
-import org.opends.server.protocols.ldap.LDAPException;
-import org.opends.server.types.ErrorLogCategory;
-import org.opends.server.types.ErrorLogSeverity;
-import org.opends.server.types.ResultCode;
-
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.SynchMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.types.ErrorLogCategory;
+import org.opends.server.types.ErrorLogSeverity;
+
/**
* Thread that is used to get messages from the Changelog servers
* and replay them in the current server.
@@ -49,22 +42,17 @@
public class ListenerThread extends DirectoryThread
{
private SynchronizationDomain listener;
- private ChangeNumberGenerator CNgen;
private boolean shutdown = false;
/**
* Constructor for the ListenerThread.
*
* @param listener the Plugin that created this thread
- * @param gen the Generator to use to get new ChangeNumber
*/
- public ListenerThread(SynchronizationDomain listener,
- ChangeNumberGenerator gen)
+ public ListenerThread(SynchronizationDomain listener)
{
- super("Listener thread");
+ super("Synchronization Listener thread");
this.listener = listener;
- this.CNgen = gen;
- setName("Synchronization Listener");
}
/**
@@ -80,82 +68,24 @@
*/
public void run()
{
- InternalClientConnection conn = new InternalClientConnection();
UpdateMessage msg;
- while (((msg = listener.receive()) != null) && (shutdown == false))
+ try
{
- Operation op;
-
- try
+ while (((msg = listener.receive()) != null) && (shutdown == false))
{
- op = msg.createOperation(conn);
-
- op.setInternalOperation(true);
- op.setSynchronizationOperation(true);
- ChangeNumber changeNumber =
- (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
- if (changeNumber != null)
- CNgen.adjust(changeNumber);
- try
- {
- op.run();
- if (op.getResultCode() != ResultCode.SUCCESS)
- {
- int msgID = MSGID_ERROR_REPLAYING_OPERATION;
- String message = getMessage(msgID,
- op.getResultCode().getResultCodeName(),
- changeNumber.toString(),
- op.toString(), op.getErrorMessage());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- listener.updateError(changeNumber);
- }
- } catch (Exception e)
- {
- int msgID = MSGID_EXCEPTION_REPLAYING_OPERATION;
- String message = getMessage(msgID, stackTraceToSingleLineString(e),
- op.toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- listener.updateError(changeNumber);
- }
+ listener.replay(msg);
}
- catch (ASN1Exception e)
- {
- int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
- String message = getMessage(msgID, msg) +
- stackTraceToSingleLineString(e);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- }
- catch (LDAPException e)
- {
- int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
- String message = getMessage(msgID, msg) +
- stackTraceToSingleLineString(e);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- }
- catch (DataFormatException e)
- {
- int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
- String message = getMessage(msgID, msg) +
- stackTraceToSingleLineString(e);
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- }
- finally
- {
- if (msg.isAssured())
- listener.ack(msg.getChangeNumber());
- listener.incProcessedUpdates();
- }
+ } catch (Exception e)
+ {
+ /*
+ * catch all exceptions happening in listener.receive and listener.replay
+ * so that the thread never dies even in case of problems.
+ */
+ int msgID = MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE;
+ String message = getMessage(msgID, stackTraceToSingleLineString(e));
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR, message, msgID);
}
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyContext.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyContext.java
new file mode 100644
index 0000000..b89edac
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyContext.java
@@ -0,0 +1,46 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization;
+
+/**
+ * This class describe the synchronization context that is attached to
+ * Modify operation.
+ */
+public class ModifyContext extends OperationContext
+{
+
+ /**
+ * Creates a new Modify Context with the provided parameters.
+ *
+ * @param changeNumber The change number of the operation.
+ * @param uid the unique Id of the modified entry.
+ */
+ public ModifyContext(ChangeNumber changeNumber, String uid)
+ {
+ super(changeNumber, uid);
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
index c0e016a..eb4a162 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
@@ -26,7 +26,7 @@
*/
package org.opends.server.synchronization;
-import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION;
+import static org.opends.server.synchronization.OperationContext.*;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
@@ -41,10 +41,10 @@
*/
public class ModifyDNMsg extends UpdateMessage
{
- private String dn;
private String newRDN;
private String newSuperior;
private boolean deleteOldRdn;
+ private String newSuperiorId;
private static final long serialVersionUID = -4905520652801395185L;
/**
@@ -54,106 +54,83 @@
*/
public ModifyDNMsg(ModifyDNOperation op)
{
- dn = op.getRawEntryDN().stringValue();
+ super((OperationContext) op.getAttachment(SYNCHROCONTEXT),
+ op.getRawEntryDN().stringValue());
+
+ ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
+ newSuperiorId = ctx.getNewParentId();
+
deleteOldRdn = op.deleteOldRDN();
if (op.getRawNewSuperior() != null)
newSuperior = op.getRawNewSuperior().stringValue();
else
newSuperior = null;
newRDN = op.getRawNewRDN().stringValue();
- changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
}
/**
- * Creates a new Add message from a byte[].
+ * Creates a new ModifyDN message from a byte[].
*
* @param in The byte[] from which the operation must be read.
- * @throws DataFormatException The input byte[] is not a valid AddMsg
+ * @throws DataFormatException The input byte[] is not a valid AddMsg.
+ * @throws UnsupportedEncodingException If UTF8 is not supported.
*/
- public ModifyDNMsg(byte[] in) throws DataFormatException
+ public ModifyDNMsg(byte[] in) throws DataFormatException,
+ UnsupportedEncodingException
{
- /* first byte is the type */
- if (in[0] != MSG_TYPE_MODIFYDN_REQUEST)
- throw new DataFormatException("byte[] is not a valid add msg");
- int pos = 1;
+ super(in);
- /* read the dn
+ int pos = decodeHeader(MSG_TYPE_MODIFYDN_REQUEST, in);
+
+ /* read the newRDN
* first calculate the length then construct the string
*/
- int length = 0;
- int offset = pos;
- while (in[pos++] != 0)
- {
- if (pos > in.length)
- throw new DataFormatException("byte[] is not a valid add msg");
- length++;
- }
- try
- {
- dn = new String(in, offset, length, "UTF-8");
+ int length = getNextLength(in, pos);
+ newRDN = new String(in, pos, length, "UTF-8");
+ pos += length + 1;
- /* read the changeNumber
- * it is always 24 characters long
- */
- String changenumberStr = new String(in, pos, 24, "UTF-8");
- changeNumber = new ChangeNumber(changenumberStr);
- pos +=24;
+ /* read the newSuperior
+ * first calculate the length then construct the string
+ */
+ length = getNextLength(in, pos);
+ if (length != 0)
+ newSuperior = new String(in, pos, length, "UTF-8");
+ else
+ newSuperior = null;
+ pos += length + 1;
- /* read the newRDN
- * first calculate the length then construct the string
- */
- length = 0;
- offset = pos;
- while (in[pos++] != 0)
- {
- if (pos > in.length)
- throw new DataFormatException("byte[] is not a valid add msg");
- length++;
- }
- newRDN = new String(in, offset, length, "UTF-8");
+ /* read the new parent Id
+ */
+ length = getNextLength(in, pos);
+ if (length != 0)
+ newSuperiorId = new String(in, pos, length, "UTF-8");
+ else
+ newSuperiorId = null;
+ pos += length + 1;
- /* read the newSuperior
- * first calculate the length then construct the string
- */
- length = 0;
- offset = pos;
- while (in[pos++] != 0)
- {
- if (pos > in.length)
- throw new DataFormatException("byte[] is not a valid add msg");
- length++;
- }
- if (length != 0)
- newSuperior = new String(in, offset, length, "UTF-8");
- else
- newSuperior = null;
-
- /* get the deleteoldrdn flag */
- if (in[pos] == 0)
- deleteOldRdn = false;
- else
- deleteOldRdn = true;
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
+ /* get the deleteoldrdn flag */
+ if (in[pos] == 0)
+ deleteOldRdn = false;
+ else
+ deleteOldRdn = true;
}
/**
- * Create an operation from this ModifyDN message.
- * @param connection the connection to use when creating the operation
- * @return the created operation
+ * {@inheritDoc}
*/
@Override
- public Operation createOperation(InternalClientConnection connection)
+ public Operation createOperation(InternalClientConnection connection,
+ String newDn)
{
ModifyDNOperation moddn = new ModifyDNOperation(connection,
InternalClientConnection.nextOperationID(),
InternalClientConnection.nextMessageID(), null,
- new ASN1OctetString(dn), new ASN1OctetString(newRDN),
+ new ASN1OctetString(newDn), new ASN1OctetString(newRDN),
deleteOldRdn,
(newSuperior == null ? null : new ASN1OctetString(newSuperior)));
- moddn.setAttachment(SYNCHRONIZATION, getChangeNumber());
+ ModifyDnContext ctx = new ModifyDnContext(getChangeNumber(), getUniqueId(),
+ newSuperiorId);
+ moddn.setAttachment(SYNCHROCONTEXT, ctx);
return moddn;
}
@@ -167,16 +144,12 @@
{
try
{
- byte[] byteDn = dn.getBytes("UTF-8");
byte[] byteNewRdn = newRDN.getBytes("UTF-8");
byte[] byteNewSuperior = null;
+ byte[] byteNewSuperiorId = null;
- /* The Modify DN message is stored in the form :
- * <operation type><dn><changenumber><newrdn><newsuperior><deleteoldrdn>
- * the length of result byte array is therefore :
- * 1 + dn length+1 + 24 + newrdn length+1 + newsuperior length+1 +1
- */
- int length = 1 + byteDn.length + 1 + 24 + byteNewRdn.length + 1 + 1;
+ // calculate the length necessary to encode the parameters
+ int length = byteNewRdn.length + 1 + 1;
if (newSuperior != null)
{
byteNewSuperior = newSuperior.getBytes("UTF-8");
@@ -185,42 +158,32 @@
else
length += 1;
- byte[] resultByteArray = new byte[length];
- int pos = 1;
-
- /* put the type of the operation */
- resultByteArray[0] = MSG_TYPE_MODIFYDN_REQUEST;
-
- /* put the DN and a terminating 0 */
- for (int i = 0; i< byteDn.length; i++,pos++)
+ if (newSuperiorId != null)
{
- resultByteArray[pos] = byteDn[i];
+ byteNewSuperiorId = newSuperiorId.getBytes("UTF-8");
+ length += byteNewSuperiorId.length + 1;
}
- resultByteArray[pos++] = 0;
+ else
+ length += 1;
- /* put the ChangeNumber */
- byte[] changeNumberByte =
- this.getChangeNumber().toString().getBytes("UTF-8");
- for (int i=0; i<24; i++,pos++)
- {
- resultByteArray[pos] = changeNumberByte[i];
- }
+ byte[] resultByteArray = encodeHeader(MSG_TYPE_MODIFYDN_REQUEST, length);
+ int pos = resultByteArray.length - length;
/* put the new RDN and a terminating 0 */
- for (int i = 0; i< byteNewRdn.length; i++,pos++)
- {
- resultByteArray[pos] = byteNewRdn[i];
- }
- resultByteArray[pos++] = 0;
+ pos = addByteArray(byteNewRdn, resultByteArray, pos);
/* put the newsuperior and a terminating 0 */
if (newSuperior != null)
{
- for (int i = 0; i< byteNewSuperior.length; i++,pos++)
- {
- resultByteArray[pos] = byteNewSuperior[i];
- }
+ pos = addByteArray(byteNewSuperior, resultByteArray, pos);
+ }
+ else
resultByteArray[pos++] = 0;
+
+ /* put the newsuperiorId and a terminating 0 */
+ if (newSuperiorId != null)
+ {
+ pos = addByteArray(byteNewSuperiorId, resultByteArray, pos);
}
else
resultByteArray[pos++] = 0;
@@ -245,7 +208,16 @@
@Override
public String toString()
{
- return ("Modify DN " + dn + " " + newRDN + " " + newSuperior + " " +
+ return ("Modify DN " + getDn() + " " + newRDN + " " + newSuperior + " " +
getChangeNumber());
}
+
+ /**
+ * Set the new superior.
+ * @param string the new superior.
+ */
+ public void setNewSuperior(String string)
+ {
+ newSuperior = string;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDnContext.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDnContext.java
new file mode 100644
index 0000000..e6cde3e
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyDnContext.java
@@ -0,0 +1,63 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization;
+
+/**
+ * This class describe the synchronization context that is attached to
+ * ModifyDN operation.
+ */
+public class ModifyDnContext extends OperationContext
+{
+ private String newParentId;
+
+ /**
+ * Creates a new ModifyDN Context with the provided parameters.
+ *
+ * @param changeNumber The change number of the operation.
+ * @param uid the unique Id of the modified entry.
+ * @param newParentId The unique Identifier of the new parent,
+ * can be null if the entry is to stay below the same
+ * parent.
+ */
+ public ModifyDnContext(ChangeNumber changeNumber, String uid,
+ String newParentId)
+ {
+ super(changeNumber, uid);
+ this.newParentId = newParentId;
+ }
+
+ /**
+ * Get the unique Identifier of the new parent.
+ * Can be null if the entry is to stay below the same parent.
+ *
+ * @return Returns the unique Identifier of the new parent..
+ */
+ public String getNewParentId()
+ {
+ return newParentId;
+ }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyFakeOperation.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyFakeOperation.java
index a302594..24338df 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyFakeOperation.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyFakeOperation.java
@@ -44,17 +44,20 @@
{
private ArrayList<Modification> mods = new ArrayList<Modification>();
private DN dn;
+ private String entryuuid;
/**
* Creates a new ModifyFakeOperation with the provided information.
*
* @param dn The dn on which the Operation was applied.
* @param changenumber The ChangeNumber of the operation.
+ * @param entryuuid The unique ID of the entry on which the Operation applies.
*/
- public ModifyFakeOperation(DN dn, ChangeNumber changenumber)
+ public ModifyFakeOperation(DN dn, ChangeNumber changenumber, String entryuuid)
{
super(changenumber);
this.dn = dn;
+ this.entryuuid = entryuuid;
}
/**
@@ -75,6 +78,6 @@
@Override
public SynchronizationMessage generateMessage()
{
- return new ModifyMsg(super.getChangeNumber(), dn, mods);
+ return new ModifyMsg(super.getChangeNumber(), dn, mods, entryuuid);
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyMsg.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyMsg.java
index dcbe783..0156287 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ModifyMsg.java
@@ -26,7 +26,7 @@
*/
package org.opends.server.synchronization;
-import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION;
+import static org.opends.server.synchronization.OperationContext.*;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
@@ -52,7 +52,6 @@
public class ModifyMsg extends UpdateMessage
{
private static final long serialVersionUID = -4905520652801395185L;
- private String dn = null;
private byte[] encodedMods = null;
private byte[] encodedMsg = null;
@@ -63,9 +62,9 @@
*/
public ModifyMsg(ModifyOperation op)
{
- dn = op.getRawEntryDN().stringValue();
+ super((OperationContext) op.getAttachment(OperationContext.SYNCHROCONTEXT),
+ op.getRawEntryDN().stringValue());
encodedMods = modsToByte(op.getModifications());
- changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
}
/**
@@ -74,12 +73,15 @@
* @param changeNumber The ChangeNumber for the operation.
* @param dn The baseDN of the operation.
* @param mods The mod of the operation.
+ * @param entryuuid The unique id of the entry on which the modification
+ * needs to apply.
*/
- public ModifyMsg(ChangeNumber changeNumber, DN dn, List<Modification> mods)
+ public ModifyMsg(ChangeNumber changeNumber, DN dn, List<Modification> mods,
+ String entryuuid)
{
+ super(new ModifyContext(changeNumber, entryuuid),
+ dn.toNormalizedString());
this.encodedMods = modsToByte(mods);
- this.dn = dn.toNormalizedString();
- this.changeNumber = changeNumber;
}
/**
@@ -92,19 +94,8 @@
public ModifyMsg(byte[] in) throws DataFormatException,
UnsupportedEncodingException
{
+ super(in);
encodedMsg = in;
- decodeChangeNumber(in);
- }
-
- private void decodeChangeNumber(byte[] in) throws DataFormatException,
- UnsupportedEncodingException
- {
- /* read the changeNumber */
- int pos = 1;
- int length = getNextLength(encodedMsg, pos);
- String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
- pos += length +1;
- changeNumber = new ChangeNumber(changenumberStr);
}
/**
@@ -129,12 +120,12 @@
return encodedMsg;
}
-
/**
* {@inheritDoc}
*/
@Override
- public Operation createOperation(InternalClientConnection connection)
+ public Operation createOperation(InternalClientConnection connection,
+ String newDn)
throws LDAPException, ASN1Exception, DataFormatException
{
if (encodedMods == null)
@@ -142,6 +133,9 @@
decode();
}
+ if (newDn == null)
+ newDn = getDn();
+
ArrayList<LDAPModification> ldapmods;
ArrayList<ASN1Element> mods = null;
@@ -155,8 +149,9 @@
ModifyOperation mod = new ModifyOperation(connection,
InternalClientConnection.nextOperationID(),
InternalClientConnection.nextMessageID(), null,
- new ASN1OctetString(dn), ldapmods);
- mod.setAttachment(SYNCHRONIZATION, getChangeNumber());
+ new ASN1OctetString(newDn), ldapmods);
+ ModifyContext ctx = new ModifyContext(getChangeNumber(), getUniqueId());
+ mod.setAttachment(SYNCHROCONTEXT, ctx);
return mod;
}
@@ -167,30 +162,11 @@
*/
private void encode() throws UnsupportedEncodingException
{
- byte[] byteDn = dn.getBytes("UTF-8");
- byte[] changeNumberByte =
- this.getChangeNumber().toString().getBytes("UTF-8");
+ /* encode the header in a byte[] large enough to also contain the mods */
+ encodedMsg = encodeHeader(MSG_TYPE_MODIFY_REQUEST, encodedMods.length + 1);
+ int pos = encodedMsg.length - (encodedMods.length + 1);
- /* The Modify message is stored in the form :
- * <operation type>changenumber><dn><<mods>
- * the length of result byte array is therefore :
- * 1 + dn length + 1 + 24 + mods length
- */
- int length = 1 + changeNumberByte.length + 1 + byteDn.length + 1
- + encodedMods.length + 1;
- encodedMsg = new byte[length];
-
- /* put the type of the operation */
- encodedMsg[0] = MSG_TYPE_MODIFY_REQUEST;
- int pos = 1;
-
- /* put the ChangeNumber */
- pos = addByteArray(changeNumberByte, encodedMsg, pos);
-
- /* put the DN and a terminating 0 */
- pos = addByteArray(byteDn, encodedMsg, pos);
-
- /* put the mods */
+ /* add the mods */
pos = addByteArray(encodedMods, encodedMsg, pos);
}
@@ -201,34 +177,14 @@
*/
private void decode() throws DataFormatException
{
- /* first byte is the type */
- if (encodedMsg[0] != MSG_TYPE_MODIFY_REQUEST)
- throw new DataFormatException("byte[] is not a valid modify msg");
+ int pos = decodeHeader(MSG_TYPE_MODIFY_REQUEST, encodedMsg);
- try
+ /* Read the mods : all the remaining bytes but the terminating 0 */
+ encodedMods = new byte[encodedMsg.length-pos-1];
+ int i =0;
+ while (pos<encodedMsg.length-1)
{
- /* read the changeNumber */
- int pos = 1;
- int length = getNextLength(encodedMsg, pos);
- String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
- pos += length +1;
- changeNumber = new ChangeNumber(changenumberStr);
-
- /* read the dn */
- length = getNextLength(encodedMsg, pos);
- dn = new String(encodedMsg, pos, length, "UTF-8");
- pos += length +1;
-
- /* Read the mods : all the remaining bytes but the terminating 0 */
- encodedMods = new byte[encodedMsg.length-pos-1];
- int i =0;
- while (pos<encodedMsg.length-1)
- {
- encodedMods[i++] = encodedMsg[pos++];
- }
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ encodedMods[i++] = encodedMsg[pos++];
}
}
@@ -263,6 +219,6 @@
@Override
public String toString()
{
- return("Mod " + dn + " " + getChangeNumber());
+ return("Modify " + getDn() + " " + getChangeNumber());
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
index d4067a3..f9f2b10 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
@@ -72,6 +72,21 @@
new HashMap<DN, SynchronizationDomain>() ;
/**
+ * Get the ServerState associated to the SynchronizationDomain
+ * with a given DN.
+ *
+ * @param baseDn The DN of the Synchronization Domain for which the
+ * ServerState must be returned.
+ * @return the ServerState associated to the SynchronizationDomain
+ * with the DN in parameter.
+ */
+ public static ServerState getServerState(DN baseDn)
+ {
+ SynchronizationDomain domain = findDomain(baseDn);
+ return domain.getServerState();
+ }
+
+ /**
* {@inheritDoc}
*/
public void initializeSynchronizationProvider(ConfigEntry configEntry)
@@ -245,6 +260,47 @@
return domain.handleConflictResolution(modifyOperation);
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SynchronizationProviderResult handleConflictResolution(
+ AddOperation addOperation) throws DirectoryException
+ {
+ SynchronizationDomain domain = findDomain(addOperation.getEntryDN());
+ if (domain == null)
+ return new SynchronizationProviderResult(true);
+
+ return domain.handleConflictResolution(addOperation);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SynchronizationProviderResult handleConflictResolution(
+ DeleteOperation deleteOperation) throws DirectoryException
+ {
+ SynchronizationDomain domain = findDomain(deleteOperation.getEntryDN());
+ if (domain == null)
+ return new SynchronizationProviderResult(true);
+
+ return domain.handleConflictResolution(deleteOperation);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SynchronizationProviderResult handleConflictResolution(
+ ModifyDNOperation modifyDNOperation) throws DirectoryException
+ {
+ SynchronizationDomain domain = findDomain(modifyDNOperation.getEntryDN());
+ if (domain == null)
+ return new SynchronizationProviderResult(true);
+
+ return domain.handleConflictResolution(modifyDNOperation);
+ }
/**
* {@inheritDoc}
@@ -275,57 +331,35 @@
* {@inheritDoc}
*/
@Override
+ public SynchronizationProviderResult doPreOperation(
+ DeleteOperation deleteOperation) throws DirectoryException
+ {
+ return new SynchronizationProviderResult(true);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SynchronizationProviderResult doPreOperation(
+ ModifyDNOperation modifyDNOperation) throws DirectoryException
+ {
+ return new SynchronizationProviderResult(true);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
public SynchronizationProviderResult doPreOperation(AddOperation addOperation)
{
SynchronizationDomain domain = findDomain(addOperation.getEntryDN());
if (domain == null)
return new SynchronizationProviderResult(true);
- domain.setChangeNumber(addOperation);
- return new SynchronizationProviderResult(true);
- }
+ if (!addOperation.isSynchronizationOperation())
+ domain.doPreOperation(addOperation);
- /**
- * Pre-operation processing.
- * Called after operation has been processed by the core server
- * but before being committed to the backend
- * Generate the Change number and update the historical information
- *
- * @param deleteOperation the current operation
- * @return code indicating if operation must be processed
- */
- @Override
- public SynchronizationProviderResult
- doPreOperation(DeleteOperation deleteOperation)
- {
- SynchronizationDomain domain = findDomain(deleteOperation.getEntryDN());
- if (domain == null)
- return new SynchronizationProviderResult(true);
-
- domain.setChangeNumber(deleteOperation);
-
- return new SynchronizationProviderResult(true);
- }
-
- /**
- * Pre-operation processing.
- * Called after operation has been processed by the core server
- * but before being committed to the backend
- * Generate the Change number and update the historical information
- *
- * @param modifyDNOperation the current operation
- * @return code indicating if operation must be processed
- */
- @Override
- public SynchronizationProviderResult
- doPreOperation(ModifyDNOperation modifyDNOperation)
- {
-
- SynchronizationDomain domain = findDomain(modifyDNOperation.getEntryDN());
- if (domain == null)
- return new SynchronizationProviderResult(true);
-
- domain.setChangeNumber(modifyDNOperation);
return new SynchronizationProviderResult(true);
}
@@ -390,21 +424,6 @@
return;
}
- /**
- * Get the ServerState associated to the SynchronizationDomain
- * with a given DN.
- *
- * @param baseDn The DN of the Synchronization Domain for which the
- * ServerState must be returned.
- * @return the ServerState associated to the SynchronizationDomain
- * with the DN in parameter.
- */
- public static ServerState getServerState(DN baseDn)
- {
- SynchronizationDomain domain = findDomain(baseDn);
- return domain.getServerState();
- }
-
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/OperationContext.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/OperationContext.java
new file mode 100644
index 0000000..949c016
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/OperationContext.java
@@ -0,0 +1,121 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization;
+
+import org.opends.server.core.Operation;
+
+/**
+ * This class describe the Synchronization context that is attached
+ * to each Operation using the SYNCHROCONTEXT key.
+ */
+public abstract class OperationContext
+{
+ /**
+ * The identifier used to attach the context to operations.
+ */
+ public static final String SYNCHROCONTEXT = "synchronizationContext";
+
+ /**
+ * The change Number of the Operation.
+ */
+ private ChangeNumber changeNumber;
+
+ /**
+ * The unique Id of the entry that was modified in the original operation.
+ */
+ private String entryUid;
+
+ /**
+ * Create a new OperationContext.
+ * @param changeNumber The change number of the operation.
+ * @param uid The unique Identifier of the modified entry.
+ */
+ protected OperationContext(ChangeNumber changeNumber, String uid)
+ {
+ this.changeNumber = changeNumber;
+ this.entryUid = uid;
+ }
+
+ /**
+ * Gets The change number of the Operation.
+ *
+ * @return The change number of the Operation.
+ */
+ public ChangeNumber getChangeNumber()
+ {
+ return changeNumber;
+ }
+
+ /**
+ * Get the unique Identifier of the modiffied entry.
+ *
+ * @return the unique Identifier of the modiffied entry.
+ */
+ public String getEntryUid()
+ {
+ return entryUid;
+ }
+
+ /**
+ * Get the change number of an operation.
+ *
+ * @param op The operation.
+ * @return the hange number of the provided operation.
+ */
+ public static ChangeNumber getChangeNumber(Operation op)
+ {
+ OperationContext ctx = (OperationContext)op.getAttachment(SYNCHROCONTEXT);
+ return ctx.changeNumber;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof OperationContext)
+ {
+ OperationContext ctx = (OperationContext) obj;
+ return ((this.changeNumber.equals(ctx.getChangeNumber()) &&
+ (this.entryUid.equals(ctx.getEntryUid()))));
+ }
+ else
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int hashCode()
+ {
+ return changeNumber.hashCode() + entryUid.hashCode();
+ }
+
+
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchMessages.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchMessages.java
index 194b6d5..1c3a9aa 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchMessages.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchMessages.java
@@ -36,15 +36,10 @@
public class SynchMessages {
/**
- * name of Synchronization.
- */
- public static final String SYNCHRONIZATION = "synchronization";
-
- /**
* Name used to store attachment of historical information in the
* operation.
*/
- public static final String HISTORICAL = "historical";
+ public static final String HISTORICAL = "ds-synch-historical";
/**
* Invalid DN.
@@ -256,6 +251,12 @@
CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 33;
/**
+ * Exception while receiving a message.
+ */
+ public static final int MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE =
+ CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 34;
+
+ /**
* Register the messages from this class in the core server.
*
*/
@@ -288,7 +289,7 @@
"Changelog failed to start " +
"because the database %s could not be read");
MessageHandler.registerMessage(MSGID_EXCEPTION_REPLAYING_OPERATION,
- "Caught Exception %s when replaying operation %s : %s");
+ "An Exception was caught while replaying operation %s : %s");
MessageHandler.registerMessage(MSGID_NEED_CHANGELOG_PORT,
"The Changelog server port must be defined");
MessageHandler.registerMessage(MSGID_ERROR_UPDATING_RUV,
@@ -341,5 +342,8 @@
MessageHandler.registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ACK,
"An unexpected error happened sending an ack to %s." +
"This connection is going to be closed. ");
+ MessageHandler.registerMessage(
+ MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE,
+ "An Exception was caught while receiving synchronization message : %s");
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
index a37fe14..428c528 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -26,15 +26,21 @@
*/
package org.opends.server.synchronization;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.TimeThread.getTime;
import static org.opends.server.synchronization.SynchMessages.*;
import static org.opends.server.loggers.Error.*;
import static org.opends.server.messages.MessageHandler.*;
+import static org.opends.server.synchronization.OperationContext.SYNCHROCONTEXT;
+import static org.opends.server.synchronization.Historical.*;
import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.zip.DataFormatException;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
@@ -47,17 +53,27 @@
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.DirectoryException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.messages.MessageHandler;
+import org.opends.server.protocols.asn1.ASN1Exception;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
+import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchFilter;
+import org.opends.server.types.SearchResultEntry;
+import org.opends.server.types.SearchScope;
import org.opends.server.types.SynchronizationProviderResult;
/**
@@ -88,9 +104,6 @@
private ServerState state;
private int numReplayedPostOpCalled = 0;
- private boolean assuredFlag = false;
-
-
private int maxReceiveQueue = 0;
private int maxSendQueue = 0;
private int maxReceiveDelay = 0;
@@ -114,6 +127,8 @@
private DN configDn;
+ private InternalClientConnection conn = new InternalClientConnection();
+
static String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
static String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
static String SERVER_ID_ATTR = "ds-cfg-directory-server-id";
@@ -379,8 +394,7 @@
broker.restartReceive();
for (int i=0; i<listenerThreadNumber; i++)
{
- ListenerThread myThread = new ListenerThread(this,
- changeNumberGenerator);
+ ListenerThread myThread = new ListenerThread(this);
myThread.start();
synchroThreads.add(myThread);
}
@@ -416,6 +430,155 @@
}
/**
+ * Implement the handleConflictResolution phase of the deleteOperation.
+ *
+ * @param deleteOperation The deleteOperation.
+ * @return A SynchronizationProviderResult indicating if the operation
+ * can continue.
+ */
+ public SynchronizationProviderResult handleConflictResolution(
+ DeleteOperation deleteOperation)
+ {
+ DeleteContext ctx =
+ (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT);
+ Entry deletedEntry = deleteOperation.getEntryToDelete();
+
+ if (ctx != null)
+ {
+ /*
+ * This is a synchronization operation
+ * Check that the modified entry has the same entryuuid
+ * has was in the original message.
+ */
+ String operationEntryUUID = ctx.getEntryUid();
+ String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
+ if (!operationEntryUUID.equals(modifiedEntryUUID))
+ {
+ /*
+ * The changes entry is not the same entry as the one on
+ * the original change was performed.
+ * Probably the original entry was renamed and replaced with
+ * another entry.
+ * We must not let the change proceed, return a negative
+ * result and set the result code to NO_SUCH_OBJET.
+ * When the operation will return, the thread that started the
+ * operation will try to find the correct entry and restart a new
+ * operation.
+ */
+ deleteOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+ return new SynchronizationProviderResult(false);
+ }
+ }
+ else
+ {
+ // There is no Synchronization context attached to the operation
+ // so this is not a synchronization operation.
+ ChangeNumber changeNumber = generateChangeNumber(deleteOperation);
+ String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
+ ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
+ deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
+ }
+ return new SynchronizationProviderResult(true);
+ }
+
+ /**
+ * Implement the handleConflictResolution phase of the addOperation.
+ *
+ * @param addOperation The AddOperation.
+ * @return A SynchronizationProviderResult indicating if the operation
+ * can continue.
+ */
+ public SynchronizationProviderResult handleConflictResolution(
+ AddOperation addOperation)
+ {
+ if (addOperation.isSynchronizationOperation())
+ {
+ AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT);
+ /*
+ * If an entry with the same entry uniqueID already exist then
+ * this operation has already been replayed in the past.
+ */
+ String uuid = ctx.getEntryUid();
+ if (findEntryDN(uuid) != null)
+ {
+ addOperation.setResultCode(ResultCode.SUCCESS);
+ return new SynchronizationProviderResult(false);
+ }
+ }
+ return new SynchronizationProviderResult(true);
+ }
+
+ /**
+ * Implement the handleConflictResolution phase of the ModifyDNOperation.
+ *
+ * @param modifyDNOperation The ModifyDNOperation.
+ * @return A SynchronizationProviderResult indicating if the operation
+ * can continue.
+ */
+ public SynchronizationProviderResult handleConflictResolution(
+ ModifyDNOperation modifyDNOperation)
+ {
+ ModifyDnContext ctx =
+ (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT);
+ if (ctx != null)
+ {
+ /*
+ * This is a synchronization operation
+ * Check that the modified entry has the same entryuuid
+ * as was in the original message.
+ */
+ String modifiedEntryUUID =
+ Historical.getEntryUuid(modifyDNOperation.getOriginalEntry());
+ if (!modifiedEntryUUID.equals(ctx.getEntryUid()))
+ {
+ /*
+ * The modified entry is not the same entry as the one on
+ * the original change was performed.
+ * Probably the original entry was renamed and replaced with
+ * another entry.
+ * We must not let the change proceed, return a negative
+ * result and set the result code to NO_SUCH_OBJET.
+ * When the operation will return, the thread that started the
+ * operation will try to find the correct entry and restart a new
+ * operation.
+ */
+ modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+ return new SynchronizationProviderResult(false);
+ }
+ if (modifyDNOperation.getNewSuperior() != null)
+ {
+ /*
+ * Also check that the current id of the
+ * parent is the same as when the operation was performed.
+ */
+ String newParentId = findEntryId(modifyDNOperation.getNewSuperior());
+ if (!newParentId.equals(ctx.getNewParentId()))
+ {
+ modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+ return new SynchronizationProviderResult(false);
+ }
+ }
+ }
+ else
+ {
+ // There is no Synchronization context attached to the operation
+ // so this is not a synchronization operation.
+ ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation);
+ String newParentId = null;
+ if (modifyDNOperation.getNewSuperior() != null)
+ {
+ newParentId = findEntryId(modifyDNOperation.getNewSuperior());
+ }
+
+ Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
+ String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
+ ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId);
+ modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
+ }
+ return new SynchronizationProviderResult(true);
+ }
+
+ /**
* Handle the conflict resolution.
* Called by the core server after locking the entry and before
* starting the actual modification.
@@ -425,25 +588,43 @@
public SynchronizationProviderResult handleConflictResolution(
ModifyOperation modifyOperation)
{
- // If operation do not yet have a change number, generate it
- ChangeNumber changeNumber =
- (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION);
- if (changeNumber == null)
- {
- synchronized(pendingChanges)
- {
- changeNumber = changeNumberGenerator.NewChangeNumber();
- pendingChanges.put(changeNumber, new PendingChange(changeNumber,
- modifyOperation,
- null));
- }
- modifyOperation.setAttachment(SYNCHRONIZATION, changeNumber);
- }
+ ModifyContext ctx =
+ (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT);
- // if Operation is a synchronization operation, solve conflicts
- if (modifyOperation.isSynchronizationOperation())
+ Entry modifiedEntry = modifyOperation.getModifiedEntry();
+ if (ctx == null)
{
- Entry modifiedEntry = modifyOperation.getModifiedEntry();
+ // There is no Synchronization context attached to the operation
+ // so this is not a synchronization operation.
+ ChangeNumber changeNumber = generateChangeNumber(modifyOperation);
+ String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
+ ctx = new ModifyContext(changeNumber, modifiedEntryUUID);
+ modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
+ }
+ else
+ {
+ String modifiedEntryUUID = ctx.getEntryUid();
+ String currentEntryUUID = Historical.getEntryUuid(modifiedEntry);
+ if (!currentEntryUUID.equals(modifiedEntryUUID))
+ {
+ /*
+ * The current modified entry is not the same entry as the one on
+ * the original modification was performed.
+ * Probably the original entry was renamed and replaced with
+ * another entry.
+ * We must not let the modification proceed, return a negative
+ * result and set the result code to NO_SUCH_OBJET.
+ * When the operation will return, the thread that started the
+ * operation will try to find the correct entry and restart a new
+ * operation.
+ */
+ modifyOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+ return new SynchronizationProviderResult(false);
+ }
+
+ /*
+ * Solve the conflicts between modify operations
+ */
Historical historicalInformation = Historical.load(modifiedEntry);
modifyOperation.setAttachment(HISTORICAL, historicalInformation);
@@ -456,16 +637,29 @@
* stop the processing and send an OK result
*/
modifyOperation.setResultCode(ResultCode.SUCCESS);
- /*
- * TODO : check that post operation do get called and
- * that pendingChanges do get updated
- */
return new SynchronizationProviderResult(false);
}
}
return new SynchronizationProviderResult(true);
}
+ /**
+ * The preOperation phase for the add Operation.
+ * Its job is to generate the Synchronization context associated to the
+ * operation. It is necessary to do it in this phase because contrary to
+ * the other operations, the entry uid is not set when the handleConflict
+ * phase is called.
+ *
+ * @param addOperation The Add Operation.
+ */
+ public void doPreOperation(AddOperation addOperation)
+ {
+ AddContext ctx = new AddContext(generateChangeNumber(addOperation),
+ Historical.getEntryUuid(addOperation),
+ findEntryId(addOperation.getEntryDN().getParent()));
+
+ addOperation.setAttachment(SYNCHROCONTEXT, ctx);
+ }
/**
* Receive an update message from the changelog.
@@ -494,6 +688,7 @@
/**
* Do the necessary processing when an UpdateMessage was received.
+ *
* @param update The received UpdateMessage.
*/
public void receiveUpdate(UpdateMessage update)
@@ -548,49 +743,20 @@
{
numReplayedPostOpCalled++;
UpdateMessage msg = null;
- ChangeNumber curChangeNumber =
- (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
+ ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
- if (op.getResultCode() != ResultCode.SUCCESS)
- {
- if (curChangeNumber != null)
- {
- /*
- * This code can be executed by multiple threads
- * Since TreeMap is not synchronized, it is mandatory to synchronize
- * it now.
- */
- synchronized (pendingChanges)
- {
- pendingChanges.remove(curChangeNumber);
- }
- }
- return;
- }
+ ResultCode result = op.getResultCode();
+ boolean isAssured = isAssured(op);
- if (!op.isSynchronizationOperation())
+ if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
{
- switch (op.getOperationType())
+ msg = UpdateMessage.generateMsg(op, isAssured);
+
+ if (msg == null)
{
- case MODIFY :
- msg = new ModifyMsg((ModifyOperation) op);
- break;
- case ADD:
- msg = new AddMsg((AddOperation) op);
- break;
- case DELETE :
- msg = new DeleteMsg((DeleteOperation) op);
- break;
- case MODIFY_DN :
- msg = new ModifyDNMsg((ModifyDNOperation) op);
- break;
- default :
/*
* This is an operation type that we do not know about
- * It should never happen
- * This code can be executed by multiple threads
- * Since TreeMap is not synchronized, it is mandatory to synchronize
- * it now.
+ * It should never happen.
*/
synchronized (pendingChanges)
{
@@ -603,68 +769,44 @@
return;
}
}
- if (isAssured(op))
- {
- msg.setAssured();
- }
}
synchronized(pendingChanges)
{
- PendingChange curChange = pendingChanges.get(curChangeNumber);
- if (curChange == null)
+ if (result == ResultCode.SUCCESS)
{
- // This should never happen
- int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
- String message = getMessage(msgID, curChangeNumber.toString(),
- op.toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- return;
- }
- curChange.setCommitted(true);
-
- if (op.isSynchronizationOperation())
- curChange.setOp(op);
- else
- curChange.setMsg(msg);
-
- ChangeNumber firstChangeNumber = pendingChanges.firstKey();
- PendingChange firstChange = pendingChanges.get(firstChangeNumber);
- ChangeNumber lastCommittedChangeNumber = null;
-
- if (!op.isSynchronizationOperation() && msg.isAssured())
- {
- waitingAckMsgs.put(curChangeNumber, msg);
- }
-
- while ((firstChange != null) && firstChange.isCommitted())
- {
- if (firstChange.getOp().isSynchronizationOperation() == false)
+ PendingChange curChange = pendingChanges.get(curChangeNumber);
+ if (curChange == null)
{
- numSentUpdates++;
- broker.publish(firstChange.getMsg());
+ // This should never happen
+ int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
+ String message = getMessage(msgID, curChangeNumber.toString(),
+ op.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ return;
}
+ curChange.setCommitted(true);
- lastCommittedChangeNumber = firstChange.getChangeNumber();
-
- pendingChanges.remove(lastCommittedChangeNumber);
- if (pendingChanges.isEmpty())
- {
- firstChange = null;
- }
+ if (op.isSynchronizationOperation())
+ curChange.setOp(op);
else
+ curChange.setMsg(msg);
+
+ if (!op.isSynchronizationOperation() && isAssured && (msg != null))
{
- firstChangeNumber = pendingChanges.firstKey();
- firstChange = pendingChanges.get(firstChangeNumber);
+ waitingAckMsgs.put(curChangeNumber, msg);
}
}
- if (lastCommittedChangeNumber != null)
- state.update(lastCommittedChangeNumber);
+ else if (!op.isSynchronizationOperation())
+ pendingChanges.remove(curChangeNumber);
+
+ pushCommittedChanges();
}
- if (!op.isSynchronizationOperation() && msg.isAssured())
+ if ((!op.isSynchronizationOperation()) && msg.isAssured() && (msg != null)
+ && (result == ResultCode.SUCCESS))
{
synchronized (msg)
{
@@ -683,19 +825,6 @@
}
/**
- * Check if an operation must be processed as an assured operation.
- *
- * @param op the operation to be checked.
- * @return true if the operations must be processed as an assured operation.
- */
- private boolean isAssured(Operation op)
- {
- // TODO : should have a filtering mechanism for checking
- // operation that are assured and operations that are not.
- return assuredFlag;
- }
-
- /**
* get the number of updates received by the synchronization plugin.
*
* @return the number of updates received
@@ -790,27 +919,6 @@
}
/**
- * Generate and set the ChangeNumber of a given Operation.
- *
- * @param operation The Operation for which the ChangeNumber must be set.
- */
- public void setChangeNumber(Operation operation)
- {
- ChangeNumber changeNumber =
- (ChangeNumber) operation.getAttachment(SYNCHRONIZATION);
- if (changeNumber == null)
- {
- synchronized(pendingChanges)
- {
- changeNumber = changeNumberGenerator.NewChangeNumber();
- pendingChanges.put(changeNumber, new PendingChange(changeNumber,
- operation, null));
- }
- operation.setAttachment(SYNCHRONIZATION, changeNumber);
- }
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -826,7 +934,7 @@
synchroThreads = new ArrayList<ListenerThread>();
for (int i=0; i<10; i++)
{
- ListenerThread myThread = new ListenerThread(this, changeNumberGenerator);
+ ListenerThread myThread = new ListenerThread(this);
myThread.start();
synchroThreads.add(myThread);
}
@@ -913,6 +1021,126 @@
}
/**
+ * Create and replay a synchronized Operation from an UpdateMessage.
+ *
+ * @param msg The UpdateMessage to be replayed.
+ */
+ public void replay(UpdateMessage msg)
+ {
+ Operation op = null;
+ boolean done = false;
+ ChangeNumber changeNumber = null;
+
+ try
+ {
+ while (!done)
+ {
+ op = msg.createOperation(conn);
+
+ op.setInternalOperation(true);
+ op.setSynchronizationOperation(true);
+ changeNumber = OperationContext.getChangeNumber(op);
+ if (changeNumber != null)
+ changeNumberGenerator.adjust(changeNumber);
+
+ op.run();
+
+ ResultCode result = op.getResultCode();
+ if (result != ResultCode.SUCCESS)
+ {
+ if (op instanceof ModifyOperation)
+ {
+ ModifyOperation newOp = (ModifyOperation) op;
+ done = solveNamingConflict(newOp, msg);
+ }
+ else if (op instanceof DeleteOperation)
+ {
+ DeleteOperation newOp = (DeleteOperation) op;
+ done = solveNamingConflict(newOp, msg);
+ }
+ else if (op instanceof AddOperation)
+ {
+ AddOperation newOp = (AddOperation) op;
+ done = solveNamingConflict(newOp, msg);
+
+ } else if (op instanceof ModifyDNOperation)
+ {
+ ModifyDNOperation newOp = (ModifyDNOperation) op;
+ done = solveNamingConflict(newOp, msg);
+ }
+ else
+ {
+ done = true; // unknown type of operation ?!
+ }
+ }
+ else
+ {
+ done = true;
+ }
+ }
+ }
+ catch (ASN1Exception e)
+ {
+ int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+ String message = getMessage(msgID, msg) +
+ stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
+ catch (LDAPException e)
+ {
+ int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+ String message = getMessage(msgID, msg) +
+ stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
+ catch (DataFormatException e)
+ {
+ int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+ String message = getMessage(msgID, msg) +
+ stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
+ catch (Exception e)
+ {
+ if (changeNumber != null)
+ {
+ /*
+ * An Exception happened during the replay process.
+ * Continue with the next change but the servers will know start
+ * to be inconsistent.
+ * TODO : REPAIR : Should let the repair tool know about this
+ */
+ int msgID = MSGID_EXCEPTION_REPLAYING_OPERATION;
+ String message = getMessage(msgID, stackTraceToSingleLineString(e),
+ op.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+ updateError(changeNumber);
+ }
+ else
+ {
+ int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+ String message = getMessage(msgID, stackTraceToSingleLineString(e),
+ msg.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+ }
+ }
+ finally
+ {
+ if (msg.isAssured())
+ ack(msg.getChangeNumber());
+ incProcessedUpdates();
+ }
+ }
+
+ /**
* This methods is called when an error happends while replaying
* and operation.
* It is necessary because the postOPeration does not always get
@@ -925,6 +1153,434 @@
synchronized (pendingChanges)
{
pendingChanges.remove(changeNumber);
+ pushCommittedChanges();
}
}
+
+ /**
+ * Generate a new change number and insert it in the pending list.
+ *
+ * @param operation The operation for which the change number must be
+ * generated.
+ * @return The new change number.
+ */
+ private ChangeNumber generateChangeNumber(Operation operation)
+ {
+ ChangeNumber changeNumber;
+ synchronized(pendingChanges)
+ {
+ changeNumber = changeNumberGenerator.NewChangeNumber();
+ pendingChanges.put(changeNumber,
+ new PendingChange(changeNumber, operation, null));
+ }
+ return changeNumber;
+ }
+
+
+ /**
+ * Find the Unique Id of the entry with the provided DN by doing a
+ * search of the entry and extracting its uniqueID from its attributes.
+ *
+ * @param dn The dn of the entry for which the unique Id is searched.
+ *
+ * @return The unique Id of the entry whith the provided DN.
+ */
+ private String findEntryId(DN dn)
+ {
+ if (dn == null)
+ return null;
+ try
+ {
+ LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+ attrs.add(ENTRYUIDNAME);
+ InternalSearchOperation search = conn.processSearch(dn,
+ SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, 0, false,
+ SearchFilter.createFilterFromString("objectclass=*"),
+ attrs);
+
+ if (search.getResultCode() == ResultCode.SUCCESS)
+ {
+ LinkedList<SearchResultEntry> result = search.getSearchEntries();
+ if (!result.isEmpty())
+ {
+ SearchResultEntry resultEntry = result.getFirst();
+ if (resultEntry != null)
+ {
+ return Historical.getEntryUuid(resultEntry);
+ }
+ }
+ }
+ } catch (DirectoryException e)
+ {
+ // never happens because the filter is always valid.
+ }
+ return null;
+ }
+
+ /**
+ * find the current dn of an entry from its entry uuid.
+ *
+ * @param uuid the Entry Unique ID.
+ * @return The curernt dn of the entry or null if there is no entry with
+ * the specified uuid.
+ */
+ private DN findEntryDN(String uuid)
+ {
+ try
+ {
+ InternalSearchOperation search = conn.processSearch(baseDN,
+ SearchScope.WHOLE_SUBTREE,
+ SearchFilter.createFilterFromString("entryuuid="+uuid));
+ if (search.getResultCode() == ResultCode.SUCCESS)
+ {
+ LinkedList<SearchResultEntry> result = search.getSearchEntries();
+ if (!result.isEmpty())
+ {
+ SearchResultEntry resultEntry = result.getFirst();
+ if (resultEntry != null)
+ {
+ return resultEntry.getDN();
+ }
+ }
+ }
+ } catch (DirectoryException e)
+ {
+ // never happens because the filter is always valid.
+ }
+ return null;
+ }
+
+ /**
+ * Solve a conflict detected when replaying a modify operation.
+ *
+ * @param op The operation that triggered the conflict detection.
+ * @param msg The operation that triggered the conflict detection.
+ * @return true if the process is completed, false if it must continue..
+ */
+ private boolean solveNamingConflict(ModifyOperation op,
+ UpdateMessage msg)
+ {
+ ResultCode result = op.getResultCode();
+ ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
+ String entryUid = ctx.getEntryUid();
+
+ if (result == ResultCode.NO_SUCH_OBJECT)
+ {
+ /*
+ * This error may happen the operation is a modification but
+ * the entry had been renamed on a different master in the same time.
+ * search if the entry has been renamed, and return the new dn
+ * of the entry.
+ */
+ msg.setDn(findEntryDN(entryUid).toString());
+ return false;
+ }
+ return true;
+ }
+
+ /** Solve a conflict detected when replaying a delete operation.
+ *
+ * @param op The operation that triggered the conflict detection.
+ * @param msg The operation that triggered the conflict detection.
+ * @return true if the process is completed, false if it must continue..
+ */
+ private boolean solveNamingConflict(DeleteOperation op,
+ UpdateMessage msg)
+ {
+ ResultCode result = op.getResultCode();
+ DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
+ String entryUid = ctx.getEntryUid();
+
+ if (result == ResultCode.NO_SUCH_OBJECT)
+ {
+ /*
+ * Find if the entry is still in the database.
+ */
+ DN currentDn = findEntryDN(entryUid);
+ if (currentDn == null)
+ {
+ /*
+ * The entry has already been deleted, either because this delete
+ * has already been replayed or because another concurrent delete
+ * has already done the job.
+ * In any case, there is is nothing more to do.
+ */
+ return true;
+ }
+ else
+ {
+ /*
+ * This entry has been renamed, replay the delete using its new DN.
+ */
+ msg.setDn(currentDn.toString());
+ return false;
+ }
+ }
+ else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF)
+ {
+ /*
+ * This may happen when we replay a DELETE done on a master
+ * but children of this entry have been added on another master.
+ */
+
+ /*
+ * TODO : either delete all the childs or rename the child below
+ * the top suffix by adding entryuuid in dn and delete this entry.
+ */
+ }
+ return true;
+ }
+
+ /**
+ * Solve a conflict detected when replaying a ADD operation.
+ *
+ * @param op The operation that triggered the conflict detection.
+ * @param msg The operation that triggered the conflict detection.
+ * @return true if the process is completed, false if it must continue.
+ * @throws Exception When the operation is not valid.
+ */
+ private boolean solveNamingConflict(AddOperation op,
+ UpdateMessage msg) throws Exception
+ {
+ ResultCode result = op.getResultCode();
+ AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
+ String entryUid = ctx.getEntryUid();
+ String parentUniqueId = ctx.getParentUid();
+
+ if (result == ResultCode.NO_SUCH_OBJECT)
+ {
+ /*
+ * This can happen if the parent has been renamed or deleted
+ * find the parent dn and calculate a new dn for the entry
+ */
+ if (parentUniqueId == null)
+ {
+ /*
+ * This entry is the base dn of the backend.
+ * It is quite weird that the operation result be NO_SUCH_OBJECT.
+ * There is notthing more we can do except TODO log a
+ * message for the repair tool to look at this problem.
+ */
+ return true;
+ }
+ DN parentDn = findEntryDN(parentUniqueId);
+ if (parentDn == null)
+ {
+ /*
+ * The parent has been deleted, so this entry should not
+ * exist don't do the ADD.
+ */
+ return true;
+ }
+ else
+ {
+ RDN entryRdn = op.getEntryDN().getRDN();
+ msg.setDn(parentDn + "," + entryRdn);
+ return false;
+ }
+ }
+ else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
+ {
+ /*
+ * This can happen if
+ * - two adds are done on different servers but with the
+ * same target DN.
+ * - the same ADD is being replayed for the second time on this server.
+ * if the nsunique ID already exist, assume this is a replay and
+ * don't do anything
+ * if the entry unique id do not exist, generate conflict.
+ */
+ if (findEntryDN(entryUid) != null)
+ {
+ // entry already exist : this is a replay
+ return true;
+ }
+ else
+ {
+ addConflict(op);
+ msg.setDn(generateConflictDn(entryUid, msg.getDn()));
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Solve a conflict detected when replaying a Modify DN operation.
+ *
+ * @param op The operation that triggered the conflict detection.
+ * @param msg The operation that triggered the conflict detection.
+ * @return true if the process is completed, false if it must continue.
+ * @throws Exception When the operation is not valid.
+ */
+ private boolean solveNamingConflict(ModifyDNOperation op,
+ UpdateMessage msg) throws Exception
+ {
+ ResultCode result = op.getResultCode();
+ ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
+ String entryUid = ctx.getEntryUid();
+ String newSuperiorID = ctx.getNewParentId();
+
+ if (result == ResultCode.NO_SUCH_OBJECT)
+ {
+ ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
+
+ /*
+ * four possible cases :
+ * - the modified entry has been renamed
+ * - the new parent has been renamed
+ * - the operation is replayed for the second time.
+ * - the entry has been deleted
+ * action :
+ * - change the target dn and the new parent dn and
+ * restart the operation,
+ * - don't do anything if the operation is replayed.
+ */
+
+ // Construct the new DN to use for the entry.
+ DN entryDN = op.getEntryDN();
+ DN newSuperior = findEntryDN(newSuperiorID);
+ RDN newRDN = op.getNewRDN();
+ DN parentDN;
+
+ if (newSuperior == null)
+ {
+ parentDN = entryDN.getParent();
+ }
+ else
+ {
+ parentDN = newSuperior;
+ }
+
+ if ((parentDN == null) || parentDN.isNullDN())
+ {
+ /* this should never happen
+ * can't solve any conflict in this case.
+ */
+ throw new Exception("operation parameters are invalid");
+ }
+
+ RDN[] parentComponents = parentDN.getRDNComponents();
+ RDN[] newComponents = new RDN[parentComponents.length+1];
+ System.arraycopy(parentComponents, 0, newComponents, 1,
+ parentComponents.length);
+ newComponents[0] = newRDN;
+
+ DN newDN = new DN(newComponents);
+
+ // get the current DN of this entry in the database.
+ DN currentDN = findEntryDN(entryUid);
+
+ // if the newDN and the current DN match then the operation
+ // is a no-op (this was probably a second replay)
+ // don't do anything.
+ if (newDN.equals(currentDN))
+ {
+ return true;
+ }
+
+ msg.setDn(currentDN.toString());
+ modifyDnMsg.setNewSuperior(newSuperior.toString());
+ return false;
+ }
+ else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
+ {
+ /*
+ * This may happen when two modifyDn operation
+ * are done on different servers but with the same target DN
+ * add the conflict object class to the entry
+ * and rename it using its entryuuid.
+ */
+ generateAddConflictOp(op);
+ msg.setDn(generateConflictDn(entryUid, msg.getDn()));
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Generate a modification to add the conflict ObjectClass to an entry
+ * whose Dn is now conflicting with another entry.
+ *
+ * @param op The operation causing the conflict.
+ */
+ private void generateAddConflictOp(ModifyDNOperation op)
+ {
+ // TODO
+ }
+
+ /**
+ * Add the conflict object class to an entry that could
+ * not be added because it is conflicting with another entry.
+ *
+ * @param addOp The conflicting Add Operation.
+ */
+ private void addConflict(AddOperation addOp)
+ {
+ /*
+ * TODO
+ */
+ }
+
+ /**
+ * Generate the Dn to use for a conflicting entry.
+ *
+ * @param op Operation that generated the conflict
+ * @param dn Original dn.
+ * @return The generated Dn for a conflicting entry.
+ */
+ private String generateConflictDn(String entryUid, String dn)
+ {
+ return dn + "entryuuid=" + entryUid;
+ }
+
+ /**
+ * Check if an operation must be processed as an assured operation.
+ *
+ * @param op the operation to be checked.
+ * @return true if the operations must be processed as an assured operation.
+ */
+ private boolean isAssured(Operation op)
+ {
+ // TODO : should have a filtering mechanism for checking
+ // operation that are assured and operations that are not.
+ return false;
+ }
+
+ /**
+ * Push all committed local changes to the changelog service.
+ * PRECONDITION : The pendingChanges lock must be held before calling
+ * this method.
+ */
+ private void pushCommittedChanges()
+ {
+ if (pendingChanges.isEmpty())
+ return;
+
+ ChangeNumber firstChangeNumber = pendingChanges.firstKey();
+ PendingChange firstChange = pendingChanges.get(firstChangeNumber);
+
+ while ((firstChange != null) && firstChange.isCommitted())
+ {
+ if (firstChange.getOp().isSynchronizationOperation() == false)
+ {
+ numSentUpdates++;
+ broker.publish(firstChange.getMsg());
+ }
+ state.update(firstChangeNumber);
+ pendingChanges.remove(firstChangeNumber);
+
+ if (pendingChanges.isEmpty())
+ {
+ firstChange = null;
+ }
+ else
+ {
+ firstChangeNumber = pendingChanges.firstKey();
+ firstChange = pendingChanges.get(firstChangeNumber);
+ }
+ }
+ }
+
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java
index f7c4bc7..6dc44b6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/UpdateMessage.java
@@ -27,8 +27,13 @@
package org.opends.server.synchronization;
import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
+import org.opends.server.core.AddOperation;
+import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.ModifyDNOperation;
+import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -45,7 +50,12 @@
/**
* The ChangeNumber of this update.
*/
- protected ChangeNumber changeNumber;
+ private ChangeNumber changeNumber;
+
+ /**
+ * The DN on which the update was originally done.
+ */
+ private String dn = null;
/**
* True when the update must use assured replication.
@@ -53,6 +63,83 @@
private boolean assuredFlag = false;
/**
+ * The UniqueId of the entry that was updated.
+ */
+ private String UniqueId;
+
+ /**
+ * Creates a new UpdateMessage with the given informations.
+ *
+ * @param ctx The Synchronization Context of the operation for which the
+ * update message must be created,.
+ * @param dn The dn of the entry on which the change
+ * that caused the creation of this object happened
+ */
+ public UpdateMessage(OperationContext ctx, String dn)
+ {
+ this.changeNumber = ctx.getChangeNumber();
+ this.UniqueId = ctx.getEntryUid();
+ this.dn = dn;
+ }
+
+ /**
+ * Creates a new UpdateMessage from an ecoded byte array.
+ *
+ * @param in The encoded byte array containind the UpdateMessage.
+ * @throws DataFormatException if the encoded byte array is not valid.
+ * @throws UnsupportedEncodingException if UTF-8 is not supprted.
+ */
+ public UpdateMessage(byte[] in) throws DataFormatException,
+ UnsupportedEncodingException
+ {
+ /* read the changeNumber */
+ int pos = 1;
+ int length = getNextLength(in, pos);
+ String changenumberStr = new String(in, pos, length, "UTF-8");
+ this.changeNumber = new ChangeNumber(changenumberStr);
+ }
+
+ /**
+ * Generates an Update Message which the provided information.
+ *
+ * @param op The operation fo which the message must be created.
+ * @param isAssured flag indicating if the operation is an assured operation.
+ * @return The generated message.
+ */
+ public static UpdateMessage generateMsg(Operation op, boolean isAssured)
+ {
+ UpdateMessage msg = null;
+ switch (op.getOperationType())
+ {
+ case MODIFY :
+ msg = new ModifyMsg((ModifyOperation) op);
+ if (isAssured)
+ msg.setAssured();
+ break;
+
+ case ADD:
+ msg = new AddMsg((AddOperation) op);
+ if (isAssured)
+ msg.setAssured();
+ break;
+
+ case DELETE :
+ msg = new DeleteMsg((DeleteOperation) op);
+ if (isAssured)
+ msg.setAssured();
+ break;
+
+ case MODIFY_DN :
+ msg = new ModifyDNMsg((ModifyDNOperation) op);
+ if (isAssured)
+ msg.setAssured();
+ break;
+ }
+
+ return msg;
+ }
+
+ /**
* Get the ChangeNumber from the message.
* @return the ChangeNumber
*/
@@ -62,6 +149,35 @@
}
/**
+ * Get the DN on which the operation happened.
+ *
+ * @return The DN on which the operations happened.
+ */
+ public String getDn()
+ {
+ return dn;
+ }
+
+ /**
+ * Set the DN.
+ * @param dn The dn that must now be used for this message.
+ */
+ public void setDn(String dn)
+ {
+ this.dn = dn;
+ }
+
+ /**
+ * Get the Unique Identifier of the entry on which the operation happened.
+ *
+ * @return The Unique Identifier of the entry on which the operation happened.
+ */
+ public String getUniqueId()
+ {
+ return UniqueId;
+ }
+
+ /**
* Get a boolean indicating if the Update must be processed as an
* Asynchronous or as an assured synchronization.
*
@@ -117,10 +233,115 @@
* @throws ASN1Exception In case of ASN1 decoding exception.
* @throws DataFormatException In case of bad msg format.
*/
- public abstract Operation createOperation(InternalClientConnection conn)
+ public Operation createOperation(InternalClientConnection conn)
+ throws LDAPException, ASN1Exception, DataFormatException
+ {
+ return createOperation(conn, dn);
+ }
+
+
+ /**
+ * Create and Operation from the message using the provided DN.
+ *
+ * @param conn connection to use when creating the message.
+ * @param newDn the DN to use when creating the operation.
+ * @return the created Operation.
+ * @throws LDAPException In case of LDAP decoding exception.
+ * @throws ASN1Exception In case of ASN1 decoding exception.
+ * @throws DataFormatException In case of bad msg format.
+ */
+ public abstract Operation createOperation(InternalClientConnection conn,
+ String newDn)
throws LDAPException, ASN1Exception, DataFormatException;
+ /**
+ * Encode the common header for all the UpdateMessage.
+ *
+ * @param type the type of UpdateMessage to encode.
+ * @param additionalLength additional length needed to encode the remaining
+ * part of the UpdateMessage.
+ * @return a byte array containing the common header and enough space to
+ * encode the reamining bytes of the UpdateMessage as was specified
+ * by the additionalLength.
+ * (byte array length = common header length + additionalLength)
+ * @throws UnsupportedEncodingException if UTF-8 is not supported.
+ */
+ public byte[] encodeHeader(byte type, int additionalLength)
+ throws UnsupportedEncodingException
+ {
+ byte[] byteDn = dn.getBytes("UTF-8");
+ byte[] changeNumberByte =
+ this.getChangeNumber().toString().getBytes("UTF-8");
+ byte[] byteEntryuuid = getUniqueId().getBytes("UTF-8");
+ /* The message header is stored in the form :
+ * <operation type>changenumber><dn><entryuuid><change>
+ * the length of result byte array is therefore :
+ * 1 + dn length + 1 + 24 + additional_length
+ */
+ int length = 1 + changeNumberByte.length + 1 + byteDn.length + 1
+ + byteEntryuuid.length + 1 + additionalLength;
+
+ byte[] encodedMsg = new byte[length];
+
+ /* put the type of the operation */
+ encodedMsg[0] = type;
+ int pos = 1;
+
+ /* put the ChangeNumber */
+ pos = addByteArray(changeNumberByte, encodedMsg, pos);
+
+ /* put the DN and a terminating 0 */
+ pos = addByteArray(byteDn, encodedMsg, pos);
+
+ /* put the entry uuid and a terminating 0 */
+ pos = addByteArray(byteEntryuuid, encodedMsg, pos);
+
+ return encodedMsg;
+ }
+
+ /**
+ * Decode the Header part of this Update Message, and check its type.
+ *
+ * @param type The type of this Update Message.
+ * @param encodedMsg the encoded form of the UpdateMessage.
+ * @return the position at which the remaining part of the message starts.
+ * @throws DataFormatException if the encodedMsg does not contain a valid
+ * common header.
+ */
+ public int decodeHeader(byte type, byte [] encodedMsg)
+ throws DataFormatException
+ {
+ /* first byte is the type */
+ if (encodedMsg[0] != type)
+ throw new DataFormatException("byte[] is not a valid msg");
+
+ try
+ {
+ /* read the changeNumber */
+ int pos = 1;
+ int length = getNextLength(encodedMsg, pos);
+ String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
+ pos += length + 1;
+ changeNumber = new ChangeNumber(changenumberStr);
+
+ /* read the dn */
+ length = getNextLength(encodedMsg, pos);
+ dn = new String(encodedMsg, pos, length, "UTF-8");
+ pos += length + 1;
+
+ /* read the entryuuid */
+ length = getNextLength(encodedMsg, pos);
+ UniqueId = new String(encodedMsg, pos, length, "UTF-8");
+ pos += length + 1;
+
+ return pos;
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
+
+ }
/**
* {@inheritDoc}
*/
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ModifyConflictTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ModifyConflictTest.java
index 4d05a7a..2ed2b54 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ModifyConflictTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ModifyConflictTest.java
@@ -36,6 +36,8 @@
import org.testng.annotations.Test;
import static org.testng.Assert.*;
+import static org.opends.server.synchronization.OperationContext.*;
+
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -47,13 +49,12 @@
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ObjectClass;
-import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION;
/*
* Test the conflict resolution for modify operations
* This is still a work in progress.
* currently implemented tests
- * - check that an replace with a smaller csn is ignored
+ * - check that an replace with a smaller csn is ignored
* should test :
* - conflict with multi-valued attributes
* - conflict with single-valued attributes
@@ -74,53 +75,53 @@
@Test()
public void replaceAndAdd()
throws Exception
- {
+ {
/*
* Objectclass and DN do not have any impact on the modifty conflict
* resolution for the description attribute.
* Always use the same values for all these tests.
*/
- DN dn = DN.decode("dc=com");
+ DN dn = DN.decode("dc=com");
Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>();
ObjectClass org = DirectoryServer.getObjectClass("organization");
objectClasses.put(org, "organization");
-
+
/*
* start with a new entry with an empty description
*/
Entry entry = new Entry(dn, objectClasses, null, null);
Historical hist = Historical.load(entry);
-
+
/*
* simulate a modify-replace done at time t10
*/
testModify(entry, hist, "description", ModificationType.REPLACE,
"init value", 10, true);
-
+
/*
* Now simulate an add at an earlier date that the previous replace
* conflict resolution should remove it.
- */
+ */
testModify(entry, hist, "description", ModificationType.ADD,
"older value", 1, false);
-
+
/*
* Now simulate an add at an earlier date that the previous replace
* conflict resolution should remove it.
* (a second time to make sure...)
- */
+ */
testModify(entry, hist, "description", ModificationType.ADD,
"older value", 2, false);
-
+
/*
* Now simulate an add at a later date that the previous replace.
* conflict resolution should keep it
- */
+ */
testModify(entry, hist, "description", ModificationType.ADD,
"new value", 11, true);
-
+
}
-
+
/**
* Test that conflict between a modify-delete-attribute and modify-add
* for multi-valued attributes are handled correctly.
@@ -128,53 +129,53 @@
@Test()
public void deleteAndAdd()
throws Exception
- {
+ {
/*
* Objectclass and DN do not have any impact on the modifty conflict
* resolution for the description attribute.
* Always use the same values for all these tests.
*/
- DN dn = DN.decode("dc=com");
+ DN dn = DN.decode("dc=com");
Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>();
ObjectClass org = DirectoryServer.getObjectClass("organization");
objectClasses.put(org, "organization");
-
+
/*
* start with a new entry with an empty description
*/
Entry entry = new Entry(dn, objectClasses, null, null);
Historical hist = Historical.load(entry);
-
+
/*
* simulate a delete of the whole description attribute done at time t10
*/
testModify(entry, hist, "description", ModificationType.DELETE,
null, 10, true);
-
+
/*
* Now simulate an add at an earlier date that the previous delete.
* The conflict resolution should detect that this add must be ignored.
- */
+ */
testModify(entry, hist, "description", ModificationType.ADD,
"older value", 1, false);
-
+
/*
* Now simulate an add at an earlier date that the previous delete.
* The conflict resolution should detect that this add must be ignored.
* (a second time to make sure that historical information is kept...)
- */
+ */
testModify(entry, hist, "description", ModificationType.ADD,
"older value", 2, false);
-
+
/*
* Now simulate an add at a later date that the previous delete.
* conflict resolution should keep it
- */
+ */
testModify(entry, hist, "description", ModificationType.ADD,
"new value", 11, true);
-
+
}
-
+
/**
* Test that conflict between a modify-add and modify-add
* for multi-valued attributes are handled correctly.
@@ -182,67 +183,67 @@
@Test()
public void addAndAdd()
throws Exception
- {
+ {
/*
* Objectclass and DN do not have any impact on the modifty conflict
* resolution for the description attribute.
* Always use the same values for all these tests.
*/
- DN dn = DN.decode("dc=com");
+ DN dn = DN.decode("dc=com");
Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>();
ObjectClass org = DirectoryServer.getObjectClass("organization");
objectClasses.put(org, "organization");
-
+
/*
* start with a new entry with an empty description
*/
Entry entry = new Entry(dn, objectClasses, null, null);
Historical hist = Historical.load(entry);
-
+
/*
* simulate a add of the description attribute done at time t10
*/
testModify(entry, hist, "description", ModificationType.ADD,
"init value", 10, true);
-
+
/*
* Now simulate an add at an earlier date that the previous add.
* The conflict resolution should detect that this add must be kept.
- */
- testModify(entry, hist, "description", ModificationType.ADD,
+ */
+ testModify(entry, hist, "description", ModificationType.ADD,
"older value", 1, true);
-
+
/*
* Now simulate an add at an earlier date that the previous add.
* The conflict resolution should detect that this add must be kept.
* (a second time to make sure that historical information is kept...)
- */
+ */
testModify(entry, hist, "description", ModificationType.ADD,
"older value", 2, false);
-
+
/*
* Now simulate an add at a later date that the previous add.
* conflict resolution should keep it
- */
+ */
testModify(entry, hist, "description", ModificationType.ADD,
- "new value", 11, true);
+ "new value", 11, true);
}
-
+
/*
* helper function.
*/
- private static void testModify(Entry entry,
- Historical hist, String attrName,
+ private static void testModify(Entry entry,
+ Historical hist, String attrName,
ModificationType modType, String value,
int date, boolean keepChangeResult)
{
InternalClientConnection connection = new InternalClientConnection();
ChangeNumber t = new ChangeNumber(date, (short) 0, (short) 0);
-
+
/* create AttributeType description that will be usedfor this test */
AttributeType attrType =
DirectoryServer.getAttributeType(attrName, true);
-
+
LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
if (value != null)
values.add(new AttributeValue(attrType, value));
@@ -250,17 +251,17 @@
List<Modification> mods = new ArrayList<Modification>();
Modification mod = new Modification(modType, attr);
mods.add(mod);
-
+
ModifyOperation modOp = new ModifyOperation(connection, 1, 1, null,
entry.getDN(), mods);
-
- modOp.setAttachment(SYNCHRONIZATION, t);
-
+ ModifyContext ctx = new ModifyContext(t, "uniqueId");
+ modOp.setAttachment(SYNCHROCONTEXT, ctx);
+
hist.replayOperation(modOp, entry);
-
+
/*
- * The last older change should have been detected as conflicting
- * and should be removed by the conflict resolution code.
+ * The last older change should have been detected as conflicting
+ * and should be removed by the conflict resolution code.
*/
if (keepChangeResult)
{
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
index 73c1be2..74fdec7 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
@@ -51,7 +51,7 @@
import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
-import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION;
+import static org.opends.server.synchronization.OperationContext.*;
/**
* Test the contructors, encoders and decoders of the synchronization
@@ -130,10 +130,10 @@
{
DN dn = DN.decode(rawdn);
InternalClientConnection connection = new InternalClientConnection();
- ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods);
+ ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid");
ModifyMsg generatedMsg = new ModifyMsg(msg.getBytes());
- assertEquals(msg.changeNumber, generatedMsg.changeNumber);
+ assertEquals(msg.getChangeNumber(), generatedMsg.getChangeNumber());
Operation op = msg.createOperation(connection);
Operation generatedOperation = generatedMsg.createOperation(connection);
@@ -145,6 +145,8 @@
ModifyOperation mod2 = (ModifyOperation) generatedOperation;
assertEquals(mod1.getRawEntryDN(), mod2.getRawEntryDN());
+ assertEquals( mod1.getAttachment(SYNCHROCONTEXT),
+ mod2.getAttachment(SYNCHROCONTEXT));
/*
* TODO : test that the generated mod equals the original mod.
@@ -178,11 +180,11 @@
DN.decode(rawDN));
ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
(short) 123, (short) 45);
- op.setAttachment(SYNCHRONIZATION, cn);
+ op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid"));
DeleteMsg msg = new DeleteMsg(op);
DeleteMsg generatedMsg = new DeleteMsg(msg.getBytes());
- assertEquals(msg.changeNumber, generatedMsg.changeNumber);
+ assertEquals(msg.getChangeNumber(), generatedMsg.getChangeNumber());
Operation generatedOperation = generatedMsg.createOperation(connection);
@@ -219,13 +221,14 @@
ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
(short) 123, (short) 45);
- op.setAttachment(SYNCHRONIZATION, cn);
+ op.setAttachment(SYNCHROCONTEXT,
+ new ModifyDnContext(cn, "uniqueid", "newparentId"));
ModifyDNMsg msg = new ModifyDNMsg(op);
ModifyDNMsg generatedMsg = new ModifyDNMsg(msg.getBytes());
Operation generatedOperation = generatedMsg.createOperation(connection);
ModifyDNOperation mod2 = (ModifyDNOperation) generatedOperation;
- assertEquals(msg.changeNumber, generatedMsg.changeNumber);
+ assertEquals(msg.getChangeNumber(), generatedMsg.getChangeNumber());
assertEquals(op.getRawEntryDN(), mod2.getRawEntryDN());
assertEquals(op.getRawNewRDN(), mod2.getRawNewRDN());
assertEquals(op.deleteOldRDN(), mod2.deleteOldRDN());
@@ -238,7 +241,7 @@
{"dc=test,dc=com"},
};
}
-
+
@Test(dataProvider = "addEncodeDecode")
public void addEncodeDecode(String rawDN)
throws Exception
@@ -269,7 +272,8 @@
ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
(short) 123, (short) 45);
- AddMsg msg = new AddMsg(cn, rawDN, objectClass, userAttributes,
+ AddMsg msg = new AddMsg(cn, rawDN, "thisIsaUniqueID", "parentUniqueId",
+ objectClass, userAttributes,
operationalAttributes);
AddMsg generatedMsg = new AddMsg(msg.getBytes());
assertEquals(msg.getBytes(), generatedMsg.getBytes());
--
Gitblit v1.10.0