mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
01.04.2006 b5acb25ee2ad9bf8b166b9de1a34e6aab6ea23b7
issue 604 : solve the naming conflict that might happen when several masters are used
there are 3 main parts in this commit :
- attach the replication context in an OperationContext
- if operation replay fails then fix the problem
- in the pre-op checks for conflict and cause failure if necessary
most of the time there should be no conflict and the operation should be processed normally
5 files added
17 files modified
2561 ■■■■ changed files
opends/resource/config/config.ldif 15 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/changelog/ChangelogCache.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/changelog/ChangelogDB.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/core/ModifyDNOperation.java 61 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/AddContext.java 62 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/AddMsg.java 121 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/DeleteContext.java 44 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/DeleteMsg.java 91 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/Historical.java 75 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/ListenerThread.java 108 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/ModifyContext.java 46 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java 184 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/ModifyDnContext.java 63 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/ModifyFakeOperation.java 7 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/ModifyMsg.java 102 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java 137 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/OperationContext.java 121 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/SynchMessages.java 18 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java 948 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/synchronization/UpdateMessage.java 225 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ModifyConflictTest.java 101 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java 22 ●●●●● patch | view | raw | blame | history
opends/resource/config/config.ldif
@@ -126,7 +126,6 @@
ds-cfg-index-type: presence
ds-cfg-index-type: equality
ds-cfg-index-type: substring
ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=givenName,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
@@ -135,7 +134,6 @@
ds-cfg-index-type: presence
ds-cfg-index-type: equality
ds-cfg-index-type: substring
ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=mail,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
@@ -144,14 +142,12 @@
ds-cfg-index-type: presence
ds-cfg-index-type: equality
ds-cfg-index-type: substring
ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=member,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
objectClass: ds-cfg-je-index
ds-cfg-index-attribute: member
ds-cfg-index-type: equality
ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=sn,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
@@ -160,7 +156,6 @@
ds-cfg-index-type: presence
ds-cfg-index-type: equality
ds-cfg-index-type: substring
ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=telephoneNumber,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
@@ -169,22 +164,24 @@
ds-cfg-index-type: presence
ds-cfg-index-type: equality
ds-cfg-index-type: substring
ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=uid,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
objectClass: ds-cfg-je-index
ds-cfg-index-attribute: uid
ds-cfg-index-type: equality
ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=ds-sync-hist,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
objectClass: ds-cfg-je-index
objectClass: extensibleObject
ds-cfg-index-attribute: ds-sync-hist
ds-cfg-index-type: ordering
ds-cfg-index-entry-limit: 4000
dn: ds-cfg-index-attribute=entryuuid,cn=Index,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
objectClass: ds-cfg-je-index
ds-cfg-index-attribute: entryuuid
ds-cfg-index-type: equality
dn: cn=JE Database,ds-cfg-backend-id=userRoot,cn=Backends,cn=config
objectClass: top
opends/src/server/org/opends/server/changelog/ChangelogCache.java
@@ -265,8 +265,6 @@
  {
    /*
     * create the balanced tree that will be used to forward changes
     * TODO initialize it will all the previous changes that this replicaID
     * has not seen
     */
    synchronized (connectedServers)
    {
@@ -305,8 +303,6 @@
  {
    /*
     * create the balanced tree that will be used to forward changes
     * TODO initialize it will all the previous changes that this replicaID
     * has not seen
     * TODO throw proper exception
     */
    synchronized (changelogServers)
@@ -482,7 +478,6 @@
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
                      short serverId)
  {
    // TODO Auto-generated method stub
    ServerHandler handler;
    if (isLDAPserver)
      handler = connectedServers.get(serverId);
opends/src/server/org/opends/server/changelog/ChangelogDB.java
@@ -349,7 +349,7 @@
      catch (DatabaseException dbe)
      {
      }
      /* database is faulty : TODO : log better message */
      /* database is faulty */
      int    msgID   = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR;
      String message = getMessage(msgID) + stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
@@ -553,7 +553,8 @@
           * to continue with the next record.
           * In such case, it is therefore possible that we miss some changes.
           * TODO. log an error message.
           * TODO. Such problem should be handled by the repair functionality.
           * TODO : REPAIR : Such problem should be handled by the
           *        repair functionality.
           */
        }
      }
opends/src/server/org/opends/server/core/ModifyDNOperation.java
@@ -1137,36 +1137,6 @@
        }
        // Invoke any conflict resolution processing that might be needed by the
        // synchronization provider.
        for (SynchronizationProvider provider :
             DirectoryServer.getSynchronizationProviders())
        {
          try
          {
            SynchronizationProviderResult result =
                 provider.handleConflictResolution(this);
            if (! result.continueOperationProcessing())
            {
              break modifyDNProcessing;
            }
          }
          catch (DirectoryException de)
          {
            assert debugException(CLASS_NAME, "run", de);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                     ErrorLogSeverity.SEVERE_ERROR,
                     MSGID_MODDN_SYNCH_CONFLICT_RESOLUTION_FAILED,
                     getConnectionID(), getOperationID(),
                     stackTraceToSingleLineString(de));
            setResponseData(de);
            break modifyDNProcessing;
          }
        }
        // Get the current entry from the appropriate backend.  If it doesn't
        // exist, then fail.
        try
@@ -1215,6 +1185,37 @@
          break modifyDNProcessing;
        }
        // Invoke any conflict resolution processing that might be needed by the
        // synchronization provider.
        for (SynchronizationProvider provider :
             DirectoryServer.getSynchronizationProviders())
        {
          try
          {
            SynchronizationProviderResult result =
                 provider.handleConflictResolution(this);
            if (! result.continueOperationProcessing())
            {
              break modifyDNProcessing;
            }
          }
          catch (DirectoryException de)
          {
            assert debugException(CLASS_NAME, "run", de);
            logError(ErrorLogCategory.SYNCHRONIZATION,
                     ErrorLogSeverity.SEVERE_ERROR,
                     MSGID_MODDN_SYNCH_CONFLICT_RESOLUTION_FAILED,
                     getConnectionID(), getOperationID(),
                     stackTraceToSingleLineString(de));
            setResponseData(de);
            break modifyDNProcessing;
          }
        }
        // Check to see if the client has permission to perform the
        // modify DN.
opends/src/server/org/opends/server/synchronization/AddContext.java
New file
@@ -0,0 +1,62 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
/**
 * This class describe the Synchronization contexte that is attached to
 * Add Operation.
 */
public class AddContext extends OperationContext
{
  /**
   * The Unique Id of the parent entry od the added entry.
   */
  private String parentUid;
  /**
   * Creates a new AddContext with the provided information.
   *
   * @param changeNumber The change number of the add operation.
   * @param uid the Unique Id of the added entry.
   * @param parentUid The unique Id of the parent of the added entry.
   */
  public AddContext(ChangeNumber changeNumber, String uid, String parentUid)
  {
    super(changeNumber, uid);
    this.parentUid = parentUid;
  }
  /**
   * Get the Unique Id of the parent of the added entry.
   *
   * @return Returns the Unique Id of the parent of the added entry.
   */
  public String getParentUid()
  {
    return parentUid;
  }
}
opends/src/server/org/opends/server/synchronization/AddMsg.java
@@ -45,7 +45,7 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeValue;
import static org.opends.server.synchronization.SynchMessages.*;
import static org.opends.server.synchronization.OperationContext.*;
import static org.opends.server.util.StaticUtils.toLowerCase;
/**
@@ -55,8 +55,8 @@
public class AddMsg extends UpdateMessage
{
  private static final long serialVersionUID = -4905520652801395185L;
  private String dn;
  private byte[] encodedAttributes ;
  private String parentUniqueId;
  /**
   * Creates a new AddMessage.
@@ -64,6 +64,12 @@
   */
  public AddMsg(AddOperation op)
  {
    super((AddContext) op.getAttachment(SYNCHROCONTEXT),
          op.getRawEntryDN().stringValue());
    AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
    this.parentUniqueId = ctx.getParentUid();
    // Encode the object classes (SET OF LDAPString).
    LinkedHashSet<AttributeValue> ocValues =
      new LinkedHashSet<AttributeValue>(op.getObjectClasses().size());
@@ -99,12 +105,8 @@
      }
    }
    dn = op.getRawEntryDN().stringValue();
    // Encode the sequence.
    encodedAttributes = ASN1Element.encodeValue(elems);
    changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
  }
  /**
@@ -112,18 +114,22 @@
   *
   * @param cn ChangeNumber of the add.
   * @param dn DN of the added entry.
   * @param uniqueId The Unique identifier of the added entry.
   * @param parentId The unique Id of the parent of the added entry.
   * @param objectClass objectclass of the added entry.
   * @param userAttributes user attributes of the added entry.
   * @param operationalAttributes operational attributes of the added entry.
   */
  public AddMsg(ChangeNumber cn,
                String dn,
                String uniqueId,
                String parentId,
                Attribute objectClass,
                Collection<Attribute> userAttributes,
                Collection<Attribute> operationalAttributes)
  {
    this.dn = dn;
    this.changeNumber = cn;
    super (new AddContext(cn, uniqueId, parentId), dn);
    this.parentUniqueId = parentId;
    ArrayList<ASN1Element> elems = new ArrayList<ASN1Element>();
    elems.add(new LDAPAttribute(objectClass).encode());
@@ -142,40 +148,29 @@
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException The input byte[] is not a valid AddMsg
   * @throws UnsupportedEncodingException If UTF8 is not supported by the jvm
   */
  public AddMsg(byte[] in) throws DataFormatException
  public AddMsg(byte[] in) throws DataFormatException,
                                  UnsupportedEncodingException
  {
    /* first byte is the type */
    if (in[0] != MSG_TYPE_ADD_REQUEST)
      throw new DataFormatException("byte[] is not a valid add msg");
    int pos = 1;
    super(in);
    /* read the dn
     * first calculate the length then construct the string
     */
    int length = 0;
    int offset = pos;
    while (in[pos++] != 0)
    int  pos = decodeHeader(MSG_TYPE_ADD_REQUEST, in);
    // read the parent unique Id
    int length = getNextLength(in, pos);
    if (length != 0)
    {
      if (pos > in.length)
        throw new DataFormatException("byte[] is not a valid add msg");
      length++;
      parentUniqueId = new String(in, pos, length, "UTF-8");
      pos += length + 1;
    }
    try
    else
    {
      dn = new String(in, offset, length, "UTF-8");
      /* read the changeNumber
       * it is always 24 characters long
       */
      String changenumberStr = new  String(in, pos, 24, "UTF-8");
      changeNumber = new ChangeNumber(changenumberStr);
      pos +=24;
    } catch (UnsupportedEncodingException e ) {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      parentUniqueId = null;
      pos += 1;
    }
    /* Read the attributes : all the remaining bytes */
    // Read the attributes : all the remaining bytes
    encodedAttributes = new byte[in.length-pos];
    int i =0;
    while (pos<in.length)
@@ -185,14 +180,11 @@
  }
  /**
   * Create and return an AddOperation from a received ADD message.
   * @param connection The connection where we received the message.
   * @return The created operation.
   * @throws LDAPException In case msg encoding was not valid.
   * @throws ASN1Exception In case msg encoding was not valid.
   * {@inheritDoc}
   */
  @Override
  public AddOperation createOperation(InternalClientConnection connection)
  public AddOperation createOperation(InternalClientConnection connection,
                                      String newDn)
                      throws LDAPException, ASN1Exception
  {
    ArrayList<LDAPAttribute> attr = new ArrayList<LDAPAttribute>();
@@ -207,9 +199,10 @@
    AddOperation add =  new AddOperation(connection,
                            InternalClientConnection.nextOperationID(),
                            InternalClientConnection.nextMessageID(), null,
                            new ASN1OctetString(dn), attr);
    add.setAttachment(SYNCHRONIZATION, getChangeNumber());
                            new ASN1OctetString(newDn), attr);
    AddContext ctx = new AddContext(getChangeNumber(), getUniqueId(),
                                    parentUniqueId);
    add.setAttachment(SYNCHROCONTEXT, ctx);
    return add;
  }
@@ -220,36 +213,30 @@
  @Override
  public byte[] getBytes()
  {
    byte[] byteDn;
    try
    {
      byteDn = dn.getBytes("UTF-8");
      /* The ad message is stored in the form :
       * <operation type><dn><changenumber><attributes>
       * the length of result byte array is therefore :
       *   1 + dn length + 1 + 24 + attribute length
       */
      int length = 1 + byteDn.length + 1  + 24 + encodedAttributes.length;
      byte[] resultByteArray = new byte[length];
      int pos = 1;
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_ADD_REQUEST;
      /* put the DN and a terminating 0 */
      for (int i = 0; i< byteDn.length; i++,pos++)
      int length = encodedAttributes.length;
      byte[] byteParentId = null;
      if (parentUniqueId != null)
      {
        resultByteArray[pos] = byteDn[i];
        byteParentId = parentUniqueId.getBytes("UTF-8");
        length += byteParentId.length + 1;
      }
      resultByteArray[pos++] = 0;
      /* put the ChangeNumber */
      byte[] changeNumberByte =
                      this.getChangeNumber().toString().getBytes("UTF-8");
      for (int i=0; i<24; i++,pos++)
      else
      {
        resultByteArray[pos] = changeNumberByte[i];
        length += 1;
      }
      /* encode the header in a byte[] large enough to also contain the mods */
      byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD_REQUEST, length);
      int pos = resultByteArray.length - length;
      if (byteParentId != null)
        pos = addByteArray(byteParentId, resultByteArray, pos);
      else
        resultByteArray[pos++] = 0;
      /* put the attributes */
      for (int i=0; i<encodedAttributes.length; i++,pos++)
      {
@@ -271,6 +258,6 @@
  @Override
  public String toString()
  {
    return ("ADD " + dn + " " + getChangeNumber());
    return ("ADD " + getDn() + " " + getChangeNumber());
  }
}
opends/src/server/org/opends/server/synchronization/DeleteContext.java
New file
@@ -0,0 +1,44 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
/**
 * This class is used to describe the context attached to a Delete Operation.
 */
public class DeleteContext extends OperationContext
{
  /**
   * Creates a new DeleteContext with the provided information.
   *
   * @param changeNumber The change number of the Delete Operation.
   * @param uid The unique Id of the deleted entry.
   */
  public DeleteContext(ChangeNumber changeNumber, String uid)
  {
    super(changeNumber, uid);
  }
}
opends/src/server/org/opends/server/synchronization/DeleteMsg.java
@@ -26,7 +26,7 @@
 */
package org.opends.server.synchronization;
import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION;
import static org.opends.server.synchronization.OperationContext.*;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
@@ -41,7 +41,6 @@
 */
public class DeleteMsg extends UpdateMessage
{
  private String dn;
  private static final long serialVersionUID = -4905520652801395185L;
  /**
@@ -50,8 +49,8 @@
   */
  public DeleteMsg(DeleteOperation op)
  {
    dn = op.getRawEntryDN().stringValue();
    changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
    super((OperationContext) op.getAttachment(SYNCHROCONTEXT),
           op.getRawEntryDN().stringValue());
  }
  /**
@@ -59,55 +58,29 @@
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException The input byte[] is not a valid AddMsg
   * @throws UnsupportedEncodingException  If UTF8 is not supported by the jvm
   */
  public DeleteMsg(byte[] in) throws DataFormatException
  public DeleteMsg(byte[] in) throws DataFormatException,
                                     UnsupportedEncodingException
  {
    /* first byte is the type */
    if (in[0] != MSG_TYPE_DELETE_REQUEST)
      throw new DataFormatException("byte[] is not a valid delete msg");
    int pos = 1;
    /* read the dn
     * first calculate the length then construct the string
     */
    int length = 0;
    int offset = pos;
    while (in[pos++] != 0)
    {
      if (pos > in.length)
        throw new DataFormatException("byte[] is not a valid delete msg");
      length++;
    }
    try
    {
      dn = new String(in, offset, length, "UTF-8");
      /* read the changeNumber
       * it is always 24 characters long
       */
      String changenumberStr = new String(in, pos, 24, "UTF-8");
      changeNumber = new ChangeNumber(changenumberStr);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    super(in);
    decodeHeader(MSG_TYPE_DELETE_REQUEST, in);
  }
  /**
   * Create an Operation from a delete Message.
   *
   * @param connection the connection
   * @return the Operation from which the message was received
   * {@inheritDoc}
   */
  @Override
  public Operation createOperation(InternalClientConnection connection)
  public Operation createOperation(InternalClientConnection connection,
      String newDn)
  {
    DeleteOperation del =  new DeleteOperation(connection,
                               InternalClientConnection.nextOperationID(),
                               InternalClientConnection.nextMessageID(), null,
                               new ASN1OctetString(dn));
    del.setAttachment(SYNCHRONIZATION, getChangeNumber());
                               new ASN1OctetString(newDn));
    DeleteContext ctx = new DeleteContext(getChangeNumber(), getUniqueId());
    del.setAttachment(SYNCHROCONTEXT, ctx);
    return del;
  }
@@ -119,44 +92,14 @@
  @Override
  public byte[] getBytes()
  {
    byte[] byteDn;
    try
    {
      byteDn = dn.getBytes("UTF-8");
      /* The Delete message is stored in the form :
       * <operation type><dn><changenumber>
       * the length of result byte array is therefore :
       *   1 + dn length + 1 + 24
       */
      int length = 1 + byteDn.length + 1  + 24;
      byte[] resultByteArray = new byte[length];
      int pos = 1;
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_DELETE_REQUEST;
      /* put the DN and a terminating 0 */
      for (int i = 0; i< byteDn.length; i++,pos++)
      {
        resultByteArray[pos] = byteDn[i];
      }
      resultByteArray[pos++] = 0;
      /* put the ChangeNumber */
      byte[] changeNumberByte =
                      this.getChangeNumber().toString().getBytes("UTF-8");
      for (int i=0; i<24; i++,pos++)
      {
        resultByteArray[pos] = changeNumberByte[i];
      }
      return resultByteArray;
      return encodeHeader(MSG_TYPE_DELETE_REQUEST, 0);
    } catch (UnsupportedEncodingException e)
    {
      // should never happen : TODO : log error properly
      return null;
    }
    return null;
  }
  /**
@@ -165,6 +108,6 @@
  @Override
  public String toString()
  {
    return ("DEL " + dn + " " + getChangeNumber());
    return ("DEL " + getDn() + " " + getChangeNumber());
  }
}
opends/src/server/org/opends/server/synchronization/Historical.java
@@ -35,6 +35,7 @@
import java.util.Set;
import java.util.TreeMap;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.types.Attribute;
@@ -44,7 +45,6 @@
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import static org.opends.server.synchronization.SynchMessages.*;
/**
 * This class is used to store historical information that is
@@ -68,6 +68,9 @@
  static final String HISTORICALATTRIBUTENAME = "ds-sync-hist";
  static final AttributeType historicalAttrType =
    DirectoryServer.getSchema().getAttributeType(HISTORICALATTRIBUTENAME);
  static final String ENTRYUIDNAME = "entryuuid";
  static final AttributeType entryuuidAttrType =
    DirectoryServer.getSchema().getAttributeType(ENTRYUIDNAME);
  /*
   * The last update seen on this entry, allows fast conflict detection.
@@ -109,7 +112,7 @@
  {
    List<Modification> mods = modifyOperation.getModifications();
    ChangeNumber changeNumber =
      (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION);
      OperationContext.getChangeNumber(modifyOperation);
    for (Iterator modsIterator = mods.iterator(); modsIterator.hasNext();)
    {
@@ -288,7 +291,8 @@
    List<Modification> mods = modifyOperation.getModifications();
    Entry modifiedEntry = modifyOperation.getModifiedEntry();
    ChangeNumber changeNumber =
              (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION);
      OperationContext.getChangeNumber(modifyOperation);
    /*
     * If this is a local operation we need first to update the historical
     * information, then update the entry with the historical information
@@ -802,7 +806,7 @@
            } catch (Exception e)
            {
              /*
               *  TODO This Exception shows that there are some
               *  TODO : REPAIR : This Exception shows that there are some
               *  inconsistency in the historical information.
               *  This method can't fix the problem.
               *  This should be logged and somehow the repair
@@ -812,13 +816,70 @@
          }
          else
          {
            modifyFakeOperation = new ModifyFakeOperation(entry.getDN(),cn);
            modifyFakeOperation.addModification(mod);
            operations.put(histVal.getCn(), modifyFakeOperation);
            String uuidString = getEntryUuid(entry);
            if (uuidString != null)
            {
                modifyFakeOperation = new ModifyFakeOperation(entry.getDN(),
                      cn, uuidString);
                modifyFakeOperation.addModification(mod);
                operations.put(histVal.getCn(), modifyFakeOperation);
            }
          }
        }
      }
    }
    return operations.values();
  }
  /**
   * Get the entry unique Id in String form.
   *
   * @param entry The entry for which the unique id should be returned.
   *
   * @return The Unique Id of the entry if it has one. null, otherwise.
   */
  public static String getEntryUuid(Entry entry)
  {
    String uuidString = null;
    List<Attribute> uuidAttrs =
             entry.getOperationalAttribute(entryuuidAttrType);
    if (uuidAttrs != null)
    {
      Attribute uuid = uuidAttrs.get(0);
      if (uuid.hasValue())
      {
        AttributeValue uuidVal = uuid.getValues().iterator().next();
        uuidString =  uuidVal.getStringValue();
      }
    }
    return uuidString;
  }
  /**
   * Get the Entry Unique Id from an add operation.
   * This must be called after the entry uuid preop plugin (i.e no
   * sooner than the synchronization provider pre-op)
   *
   * @param op The operation
   * @return The Entry Unique Id String form.
   */
  public static String getEntryUuid(AddOperation op)
  {
    String uuidString = null;
    Map<AttributeType, List<Attribute>> attrs = op.getOperationalAttributes();
    List<Attribute> uuidAttrs = attrs.get(entryuuidAttrType);
    if (uuidAttrs != null)
    {
      Attribute uuid = uuidAttrs.get(0);
      if (uuid.hasValue())
      {
        AttributeValue uuidVal = uuid.getValues().iterator().next();
        uuidString =  uuidVal.getStringValue();
      }
    }
    return uuidString;
  }
}
opends/src/server/org/opends/server/synchronization/ListenerThread.java
@@ -26,22 +26,15 @@
 */
package org.opends.server.synchronization;
import java.util.zip.DataFormatException;
import org.opends.server.api.DirectoryThread;
import org.opends.server.core.Operation;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.ResultCode;
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.SynchMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.api.DirectoryThread;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
/**
 * Thread that is used to get messages from the Changelog servers
 * and replay them in the current server.
@@ -49,22 +42,17 @@
public class ListenerThread extends DirectoryThread
{
  private SynchronizationDomain listener;
  private ChangeNumberGenerator CNgen;
  private boolean shutdown = false;
  /**
   * Constructor for the ListenerThread.
   *
   * @param listener the Plugin that created this thread
   * @param gen the Generator to use to get new ChangeNumber
   */
  public ListenerThread(SynchronizationDomain listener,
                        ChangeNumberGenerator gen)
  public ListenerThread(SynchronizationDomain listener)
  {
     super("Listener thread");
     super("Synchronization Listener thread");
     this.listener = listener;
     this.CNgen = gen;
     setName("Synchronization Listener");
  }
  /**
@@ -80,82 +68,24 @@
   */
  public void run()
  {
    InternalClientConnection conn = new InternalClientConnection();
    UpdateMessage msg;
    while (((msg = listener.receive()) != null) && (shutdown == false))
    try
    {
      Operation op;
      try
      while (((msg = listener.receive()) != null) && (shutdown == false))
      {
        op = msg.createOperation(conn);
        op.setInternalOperation(true);
        op.setSynchronizationOperation(true);
        ChangeNumber changeNumber =
          (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
        if (changeNumber != null)
          CNgen.adjust(changeNumber);
        try
        {
          op.run();
          if (op.getResultCode() != ResultCode.SUCCESS)
          {
            int msgID = MSGID_ERROR_REPLAYING_OPERATION;
            String message = getMessage(msgID,
                op.getResultCode().getResultCodeName(),
                changeNumber.toString(),
                op.toString(), op.getErrorMessage());
            logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.SEVERE_ERROR,
                message, msgID);
            listener.updateError(changeNumber);
          }
        } catch (Exception e)
        {
          int msgID = MSGID_EXCEPTION_REPLAYING_OPERATION;
          String message = getMessage(msgID, stackTraceToSingleLineString(e),
              op.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          listener.updateError(changeNumber);
        }
        listener.replay(msg);
      }
      catch (ASN1Exception e)
      {
        int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
        String message = getMessage(msgID, msg) +
                         stackTraceToSingleLineString(e);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
      }
      catch (LDAPException e)
      {
        int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
        String message = getMessage(msgID, msg) +
                         stackTraceToSingleLineString(e);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
      }
      catch (DataFormatException e)
      {
        int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
        String message = getMessage(msgID, msg) +
                         stackTraceToSingleLineString(e);
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR,
            message, msgID);
      }
      finally
      {
        if (msg.isAssured())
          listener.ack(msg.getChangeNumber());
        listener.incProcessedUpdates();
      }
    } catch (Exception e)
    {
      /*
       * catch all exceptions happening in listener.receive and listener.replay
       * so that the thread never dies even in case of problems.
       */
      int msgID = MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE;
      String message = getMessage(msgID, stackTraceToSingleLineString(e));
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.SEVERE_ERROR, message, msgID);
    }
  }
}
opends/src/server/org/opends/server/synchronization/ModifyContext.java
New file
@@ -0,0 +1,46 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
/**
 * This class describe the synchronization context that is attached to
 * Modify operation.
 */
public class ModifyContext extends OperationContext
{
  /**
   * Creates a new Modify Context with the provided parameters.
   *
   * @param changeNumber The change number of the operation.
   * @param uid the unique Id of the modified entry.
   */
  public ModifyContext(ChangeNumber changeNumber, String uid)
  {
    super(changeNumber, uid);
  }
}
opends/src/server/org/opends/server/synchronization/ModifyDNMsg.java
@@ -26,7 +26,7 @@
 */
package org.opends.server.synchronization;
import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION;
import static org.opends.server.synchronization.OperationContext.*;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
@@ -41,10 +41,10 @@
 */
public class ModifyDNMsg extends UpdateMessage
{
  private String dn;
  private String newRDN;
  private String newSuperior;
  private boolean deleteOldRdn;
  private String newSuperiorId;
  private static final long serialVersionUID = -4905520652801395185L;
  /**
@@ -54,106 +54,83 @@
   */
  public ModifyDNMsg(ModifyDNOperation op)
  {
    dn = op.getRawEntryDN().stringValue();
    super((OperationContext) op.getAttachment(SYNCHROCONTEXT),
        op.getRawEntryDN().stringValue());
    ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
    newSuperiorId = ctx.getNewParentId();
    deleteOldRdn = op.deleteOldRDN();
    if (op.getRawNewSuperior() != null)
      newSuperior = op.getRawNewSuperior().stringValue();
    else
      newSuperior = null;
    newRDN = op.getRawNewRDN().stringValue();
    changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
  }
  /**
   * Creates a new Add message from a byte[].
   * Creates a new ModifyDN message from a byte[].
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException The input byte[] is not a valid AddMsg
   * @throws DataFormatException The input byte[] is not a valid AddMsg.
   * @throws UnsupportedEncodingException If UTF8 is not supported.
   */
  public ModifyDNMsg(byte[] in) throws DataFormatException
  public ModifyDNMsg(byte[] in) throws DataFormatException,
                                       UnsupportedEncodingException
  {
    /* first byte is the type */
    if (in[0] != MSG_TYPE_MODIFYDN_REQUEST)
      throw new DataFormatException("byte[] is not a valid add msg");
    int pos = 1;
    super(in);
    /* read the dn
    int pos = decodeHeader(MSG_TYPE_MODIFYDN_REQUEST, in);
    /* read the newRDN
     * first calculate the length then construct the string
     */
    int length = 0;
    int offset = pos;
    while (in[pos++] != 0)
    {
      if (pos > in.length)
        throw new DataFormatException("byte[] is not a valid add msg");
      length++;
    }
    try
    {
      dn = new String(in, offset, length, "UTF-8");
    int length = getNextLength(in, pos);
    newRDN = new String(in, pos, length, "UTF-8");
    pos += length + 1;
      /* read the changeNumber
       * it is always 24 characters long
       */
      String changenumberStr = new  String(in, pos, 24, "UTF-8");
      changeNumber = new ChangeNumber(changenumberStr);
      pos +=24;
    /* read the newSuperior
     * first calculate the length then construct the string
     */
    length = getNextLength(in, pos);
    if (length != 0)
      newSuperior = new String(in, pos, length, "UTF-8");
    else
      newSuperior = null;
    pos += length + 1;
      /* read the newRDN
       * first calculate the length then construct the string
       */
      length = 0;
      offset = pos;
      while (in[pos++] != 0)
      {
        if (pos > in.length)
          throw new DataFormatException("byte[] is not a valid add msg");
        length++;
      }
      newRDN = new String(in, offset, length, "UTF-8");
    /* read the new parent Id
     */
    length = getNextLength(in, pos);
    if (length != 0)
      newSuperiorId = new String(in, pos, length, "UTF-8");
    else
      newSuperiorId = null;
    pos += length + 1;
      /* read the newSuperior
       * first calculate the length then construct the string
       */
      length = 0;
      offset = pos;
      while (in[pos++] != 0)
      {
        if (pos > in.length)
          throw new DataFormatException("byte[] is not a valid add msg");
        length++;
      }
      if (length != 0)
        newSuperior = new String(in, offset, length, "UTF-8");
      else
        newSuperior = null;
      /* get the deleteoldrdn flag */
      if (in[pos] == 0)
        deleteOldRdn = false;
      else
        deleteOldRdn = true;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    /* get the deleteoldrdn flag */
    if (in[pos] == 0)
      deleteOldRdn = false;
    else
      deleteOldRdn = true;
  }
  /**
   * Create an operation from this ModifyDN message.
   * @param connection the connection to use when creating the operation
   * @return the created operation
   * {@inheritDoc}
   */
  @Override
  public Operation createOperation(InternalClientConnection connection)
  public Operation createOperation(InternalClientConnection connection,
      String newDn)
  {
    ModifyDNOperation moddn =  new ModifyDNOperation(connection,
               InternalClientConnection.nextOperationID(),
               InternalClientConnection.nextMessageID(), null,
               new ASN1OctetString(dn), new ASN1OctetString(newRDN),
               new ASN1OctetString(newDn), new ASN1OctetString(newRDN),
               deleteOldRdn,
               (newSuperior == null ? null : new ASN1OctetString(newSuperior)));
    moddn.setAttachment(SYNCHRONIZATION, getChangeNumber());
    ModifyDnContext ctx = new ModifyDnContext(getChangeNumber(), getUniqueId(),
                                              newSuperiorId);
    moddn.setAttachment(SYNCHROCONTEXT, ctx);
    return moddn;
  }
@@ -167,16 +144,12 @@
  {
    try
    {
      byte[] byteDn = dn.getBytes("UTF-8");
      byte[] byteNewRdn = newRDN.getBytes("UTF-8");
      byte[] byteNewSuperior = null;
      byte[] byteNewSuperiorId = null;
      /* The Modify DN message is stored in the form :
       * <operation type><dn><changenumber><newrdn><newsuperior><deleteoldrdn>
       * the length of result byte array is therefore :
       *   1 + dn length+1 + 24 + newrdn length+1 + newsuperior length+1 +1
       */
      int length = 1 + byteDn.length + 1 + 24 + byteNewRdn.length + 1 + 1;
      // calculate the length necessary to encode the parameters
      int length = byteNewRdn.length + 1 + 1;
      if (newSuperior != null)
      {
        byteNewSuperior = newSuperior.getBytes("UTF-8");
@@ -185,42 +158,32 @@
      else
        length += 1;
      byte[] resultByteArray = new byte[length];
      int pos = 1;
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_MODIFYDN_REQUEST;
      /* put the DN and a terminating 0 */
      for (int i = 0; i< byteDn.length; i++,pos++)
      if (newSuperiorId != null)
      {
        resultByteArray[pos] = byteDn[i];
        byteNewSuperiorId = newSuperiorId.getBytes("UTF-8");
        length += byteNewSuperiorId.length + 1;
      }
      resultByteArray[pos++] = 0;
      else
        length += 1;
      /* put the ChangeNumber */
      byte[] changeNumberByte =
                      this.getChangeNumber().toString().getBytes("UTF-8");
      for (int i=0; i<24; i++,pos++)
      {
        resultByteArray[pos] = changeNumberByte[i];
      }
      byte[] resultByteArray = encodeHeader(MSG_TYPE_MODIFYDN_REQUEST, length);
      int pos = resultByteArray.length - length;
      /* put the new RDN and a terminating 0 */
      for (int i = 0; i< byteNewRdn.length; i++,pos++)
      {
        resultByteArray[pos] = byteNewRdn[i];
      }
      resultByteArray[pos++] = 0;
      pos = addByteArray(byteNewRdn, resultByteArray, pos);
      /* put the newsuperior and a terminating 0 */
      if (newSuperior != null)
      {
        for (int i = 0; i< byteNewSuperior.length; i++,pos++)
        {
          resultByteArray[pos] = byteNewSuperior[i];
        }
        pos = addByteArray(byteNewSuperior, resultByteArray, pos);
      }
      else
        resultByteArray[pos++] = 0;
      /* put the newsuperiorId and a terminating 0 */
      if (newSuperiorId != null)
      {
        pos = addByteArray(byteNewSuperiorId, resultByteArray, pos);
      }
      else
        resultByteArray[pos++] = 0;
@@ -245,7 +208,16 @@
  @Override
  public String toString()
  {
    return ("Modify DN " + dn + " " + newRDN + " " + newSuperior + " " +
    return ("Modify DN " + getDn() + " " + newRDN + " " + newSuperior + " " +
            getChangeNumber());
  }
  /**
   * Set the new superior.
   * @param string the new superior.
   */
  public void setNewSuperior(String string)
  {
    newSuperior = string;
  }
}
opends/src/server/org/opends/server/synchronization/ModifyDnContext.java
New file
@@ -0,0 +1,63 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
/**
 * This class describe the synchronization context that is attached to
 * ModifyDN operation.
 */
public class ModifyDnContext extends OperationContext
{
  private String newParentId;
  /**
   * Creates a new ModifyDN Context with the provided parameters.
   *
   * @param changeNumber The change number of the operation.
   * @param uid the unique Id of the modified entry.
   * @param newParentId The unique Identifier of the new parent,
   *                    can be null if the entry is to stay below the same
   *                    parent.
   */
  public ModifyDnContext(ChangeNumber changeNumber, String uid,
                         String newParentId)
  {
    super(changeNumber, uid);
    this.newParentId = newParentId;
  }
  /**
   * Get the unique Identifier of the new parent.
   * Can be null if the entry is to stay below the same parent.
   *
   * @return Returns the unique Identifier of the new parent..
   */
  public String getNewParentId()
  {
    return newParentId;
  }
}
opends/src/server/org/opends/server/synchronization/ModifyFakeOperation.java
@@ -44,17 +44,20 @@
{
  private ArrayList<Modification> mods = new ArrayList<Modification>();
  private DN dn;
  private String entryuuid;
  /**
   * Creates a new ModifyFakeOperation with the provided information.
   *
   * @param dn The dn on which the Operation was applied.
   * @param changenumber The ChangeNumber of the operation.
   * @param entryuuid The unique ID of the entry on which the Operation applies.
   */
  public ModifyFakeOperation(DN dn, ChangeNumber changenumber)
  public ModifyFakeOperation(DN dn, ChangeNumber changenumber, String entryuuid)
  {
    super(changenumber);
    this.dn = dn;
    this.entryuuid = entryuuid;
  }
  /**
@@ -75,6 +78,6 @@
  @Override
  public SynchronizationMessage generateMessage()
  {
    return new ModifyMsg(super.getChangeNumber(), dn, mods);
    return new ModifyMsg(super.getChangeNumber(), dn, mods, entryuuid);
  }
}
opends/src/server/org/opends/server/synchronization/ModifyMsg.java
@@ -26,7 +26,7 @@
 */
package org.opends.server.synchronization;
import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION;
import static org.opends.server.synchronization.OperationContext.*;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
@@ -52,7 +52,6 @@
public class ModifyMsg extends UpdateMessage
{
  private static final long serialVersionUID = -4905520652801395185L;
  private String dn = null;
  private byte[] encodedMods = null;
  private byte[] encodedMsg = null;
@@ -63,9 +62,9 @@
   */
  public ModifyMsg(ModifyOperation op)
  {
    dn = op.getRawEntryDN().stringValue();
    super((OperationContext) op.getAttachment(OperationContext.SYNCHROCONTEXT),
          op.getRawEntryDN().stringValue());
    encodedMods = modsToByte(op.getModifications());
    changeNumber = (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
  }
  /**
@@ -74,12 +73,15 @@
   * @param changeNumber The ChangeNumber for the operation.
   * @param dn           The baseDN of the operation.
   * @param mods         The mod of the operation.
   * @param entryuuid    The unique id of the entry on which the modification
   *                     needs to apply.
   */
  public ModifyMsg(ChangeNumber changeNumber, DN dn, List<Modification> mods)
  public ModifyMsg(ChangeNumber changeNumber, DN dn, List<Modification> mods,
                   String entryuuid)
  {
    super(new ModifyContext(changeNumber, entryuuid),
          dn.toNormalizedString());
    this.encodedMods = modsToByte(mods);
    this.dn = dn.toNormalizedString();
    this.changeNumber = changeNumber;
  }
  /**
@@ -92,19 +94,8 @@
  public ModifyMsg(byte[] in) throws DataFormatException,
                                     UnsupportedEncodingException
  {
    super(in);
    encodedMsg = in;
    decodeChangeNumber(in);
  }
  private void decodeChangeNumber(byte[] in) throws DataFormatException,
                                             UnsupportedEncodingException
  {
    /* read the changeNumber */
    int pos = 1;
    int length = getNextLength(encodedMsg, pos);
    String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
    pos += length +1;
    changeNumber = new ChangeNumber(changenumberStr);
  }
  /**
@@ -129,12 +120,12 @@
    return encodedMsg;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public Operation createOperation(InternalClientConnection connection)
  public Operation createOperation(InternalClientConnection connection,
                   String newDn)
                   throws LDAPException, ASN1Exception, DataFormatException
  {
    if (encodedMods == null)
@@ -142,6 +133,9 @@
      decode();
    }
    if (newDn == null)
      newDn = getDn();
    ArrayList<LDAPModification> ldapmods;
    ArrayList<ASN1Element> mods = null;
@@ -155,8 +149,9 @@
    ModifyOperation mod = new ModifyOperation(connection,
                               InternalClientConnection.nextOperationID(),
                               InternalClientConnection.nextMessageID(), null,
                               new ASN1OctetString(dn), ldapmods);
    mod.setAttachment(SYNCHRONIZATION, getChangeNumber());
                               new ASN1OctetString(newDn), ldapmods);
    ModifyContext ctx = new ModifyContext(getChangeNumber(), getUniqueId());
    mod.setAttachment(SYNCHROCONTEXT, ctx);
    return mod;
  }
@@ -167,30 +162,11 @@
   */
  private void encode() throws UnsupportedEncodingException
  {
    byte[] byteDn = dn.getBytes("UTF-8");
    byte[] changeNumberByte =
      this.getChangeNumber().toString().getBytes("UTF-8");
    /* encode the header in a byte[] large enough to also contain the mods */
    encodedMsg = encodeHeader(MSG_TYPE_MODIFY_REQUEST, encodedMods.length + 1);
    int pos = encodedMsg.length - (encodedMods.length + 1);
    /* The Modify message is stored in the form :
     * <operation type>changenumber><dn><<mods>
     * the length of result byte array is therefore :
     *   1 + dn length + 1 + 24 + mods length
     */
    int length = 1 + changeNumberByte.length + 1 + byteDn.length + 1
                 + encodedMods.length + 1;
    encodedMsg = new byte[length];
    /* put the type of the operation */
    encodedMsg[0] = MSG_TYPE_MODIFY_REQUEST;
    int pos = 1;
    /* put the ChangeNumber */
    pos = addByteArray(changeNumberByte, encodedMsg, pos);
    /* put the DN and a terminating 0 */
    pos = addByteArray(byteDn, encodedMsg, pos);
    /* put the mods */
    /* add the mods */
    pos = addByteArray(encodedMods, encodedMsg, pos);
  }
@@ -201,34 +177,14 @@
   */
  private void decode() throws DataFormatException
  {
    /* first byte is the type */
    if (encodedMsg[0] != MSG_TYPE_MODIFY_REQUEST)
      throw new DataFormatException("byte[] is not a valid modify msg");
    int pos = decodeHeader(MSG_TYPE_MODIFY_REQUEST, encodedMsg);
    try
    /* Read the mods : all the remaining bytes but the terminating 0 */
    encodedMods = new byte[encodedMsg.length-pos-1];
    int i =0;
    while (pos<encodedMsg.length-1)
    {
      /* read the changeNumber */
      int pos = 1;
      int length = getNextLength(encodedMsg, pos);
      String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
      pos += length +1;
      changeNumber = new ChangeNumber(changenumberStr);
      /* read the dn */
      length = getNextLength(encodedMsg, pos);
      dn = new String(encodedMsg, pos, length, "UTF-8");
      pos += length +1;
      /* Read the mods : all the remaining bytes but the terminating 0 */
      encodedMods = new byte[encodedMsg.length-pos-1];
      int i =0;
      while (pos<encodedMsg.length-1)
      {
        encodedMods[i++] = encodedMsg[pos++];
      }
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      encodedMods[i++] = encodedMsg[pos++];
    }
  }
@@ -263,6 +219,6 @@
  @Override
  public String toString()
  {
    return("Mod " + dn + " " + getChangeNumber());
    return("Modify " + getDn() + " " + getChangeNumber());
  }
}
opends/src/server/org/opends/server/synchronization/MultimasterSynchronization.java
@@ -72,6 +72,21 @@
    new HashMap<DN, SynchronizationDomain>() ;
  /**
   * Get the ServerState associated to the SynchronizationDomain
   * with a given DN.
   *
   * @param baseDn The DN of the Synchronization Domain for which the
   *               ServerState must be returned.
   * @return the ServerState associated to the SynchronizationDomain
   *         with the DN in parameter.
   */
  public static ServerState getServerState(DN baseDn)
  {
    SynchronizationDomain domain = findDomain(baseDn);
    return domain.getServerState();
  }
  /**
   * {@inheritDoc}
   */
  public void initializeSynchronizationProvider(ConfigEntry configEntry)
@@ -245,6 +260,47 @@
    return domain.handleConflictResolution(modifyOperation);
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public SynchronizationProviderResult handleConflictResolution(
      AddOperation addOperation) throws DirectoryException
  {
    SynchronizationDomain domain = findDomain(addOperation.getEntryDN());
    if (domain == null)
      return new SynchronizationProviderResult(true);
    return domain.handleConflictResolution(addOperation);
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public SynchronizationProviderResult handleConflictResolution(
      DeleteOperation deleteOperation) throws DirectoryException
  {
    SynchronizationDomain domain = findDomain(deleteOperation.getEntryDN());
    if (domain == null)
      return new SynchronizationProviderResult(true);
    return domain.handleConflictResolution(deleteOperation);
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public SynchronizationProviderResult handleConflictResolution(
      ModifyDNOperation modifyDNOperation) throws DirectoryException
  {
    SynchronizationDomain domain = findDomain(modifyDNOperation.getEntryDN());
    if (domain == null)
      return new SynchronizationProviderResult(true);
    return domain.handleConflictResolution(modifyDNOperation);
  }
  /**
   * {@inheritDoc}
@@ -275,57 +331,35 @@
   * {@inheritDoc}
   */
  @Override
  public SynchronizationProviderResult doPreOperation(
      DeleteOperation deleteOperation) throws DirectoryException
  {
    return new SynchronizationProviderResult(true);
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public SynchronizationProviderResult doPreOperation(
      ModifyDNOperation modifyDNOperation) throws DirectoryException
  {
    return new SynchronizationProviderResult(true);
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public SynchronizationProviderResult doPreOperation(AddOperation addOperation)
  {
    SynchronizationDomain domain = findDomain(addOperation.getEntryDN());
    if (domain == null)
      return new SynchronizationProviderResult(true);
    domain.setChangeNumber(addOperation);
    return new SynchronizationProviderResult(true);
  }
    if (!addOperation.isSynchronizationOperation())
      domain.doPreOperation(addOperation);
  /**
   * Pre-operation processing.
   * Called after operation has been processed by the core server
   * but before being committed to the backend
   * Generate the Change number and update the historical information
   *
   * @param deleteOperation the current operation
   * @return code indicating if operation must be processed
   */
  @Override
  public SynchronizationProviderResult
                  doPreOperation(DeleteOperation deleteOperation)
  {
    SynchronizationDomain domain = findDomain(deleteOperation.getEntryDN());
    if (domain == null)
      return new SynchronizationProviderResult(true);
    domain.setChangeNumber(deleteOperation);
    return new SynchronizationProviderResult(true);
  }
  /**
   * Pre-operation processing.
   * Called after operation has been processed by the core server
   * but before being committed to the backend
   * Generate the Change number and update the historical information
   *
   * @param modifyDNOperation the current operation
   * @return code indicating if operation must be processed
   */
  @Override
  public SynchronizationProviderResult
                  doPreOperation(ModifyDNOperation modifyDNOperation)
  {
    SynchronizationDomain domain = findDomain(modifyDNOperation.getEntryDN());
    if (domain == null)
      return new SynchronizationProviderResult(true);
    domain.setChangeNumber(modifyDNOperation);
    return new SynchronizationProviderResult(true);
  }
@@ -390,21 +424,6 @@
    return;
  }
  /**
   * Get the ServerState associated to the SynchronizationDomain
   * with a given DN.
   *
   * @param baseDn The DN of the Synchronization Domain for which the
   *               ServerState must be returned.
   * @return the ServerState associated to the SynchronizationDomain
   *         with the DN in parameter.
   */
  public static ServerState getServerState(DN baseDn)
  {
    SynchronizationDomain domain = findDomain(baseDn);
    return domain.getServerState();
  }
}
opends/src/server/org/opends/server/synchronization/OperationContext.java
New file
@@ -0,0 +1,121 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006 Sun Microsystems, Inc.
 */
package org.opends.server.synchronization;
import org.opends.server.core.Operation;
/**
 * This class describe the Synchronization context that is attached
 * to each Operation using the SYNCHROCONTEXT key.
 */
public abstract class OperationContext
{
  /**
   * The identifier used to attach the context to operations.
   */
  public static final String SYNCHROCONTEXT = "synchronizationContext";
  /**
   * The change Number of the Operation.
   */
  private ChangeNumber changeNumber;
  /**
   * The unique Id of the entry that was modified in the original operation.
   */
  private String entryUid;
  /**
   * Create a new OperationContext.
   * @param changeNumber The change number of the operation.
   * @param uid The unique Identifier of the modified entry.
   */
  protected OperationContext(ChangeNumber changeNumber, String uid)
  {
    this.changeNumber = changeNumber;
    this.entryUid = uid;
  }
  /**
   * Gets The change number of the Operation.
   *
   * @return The change number of the Operation.
   */
  public ChangeNumber getChangeNumber()
  {
    return changeNumber;
  }
  /**
   * Get the unique Identifier of the modiffied entry.
   *
   * @return the unique Identifier of the modiffied entry.
   */
  public String getEntryUid()
  {
    return entryUid;
  }
  /**
   * Get the change number of an operation.
   *
   * @param op The operation.
   * @return the hange number of the provided operation.
   */
  public static ChangeNumber getChangeNumber(Operation op)
  {
    OperationContext ctx = (OperationContext)op.getAttachment(SYNCHROCONTEXT);
    return ctx.changeNumber;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean equals(Object obj)
  {
    if (obj instanceof OperationContext)
    {
      OperationContext ctx = (OperationContext) obj;
      return ((this.changeNumber.equals(ctx.getChangeNumber()) &&
          (this.entryUid.equals(ctx.getEntryUid()))));
    }
    else
      return false;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public int hashCode()
  {
    return changeNumber.hashCode() + entryUid.hashCode();
  }
}
opends/src/server/org/opends/server/synchronization/SynchMessages.java
@@ -36,15 +36,10 @@
public class SynchMessages {
  /**
   * name of Synchronization.
   */
  public static final String SYNCHRONIZATION = "synchronization";
  /**
   * Name used to store attachment of historical information in the
   * operation.
   */
  public static final String HISTORICAL = "historical";
  public static final String HISTORICAL = "ds-synch-historical";
  /**
   * Invalid DN.
@@ -256,6 +251,12 @@
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 33;
  /**
   * Exception while receiving a message.
   */
  public static final int MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE =
    CATEGORY_MASK_SYNC | SEVERITY_MASK_SEVERE_ERROR | 34;
  /**
   * Register the messages from this class in the core server.
   *
   */
@@ -288,7 +289,7 @@
        "Changelog failed to start " +
        "because the database %s could not be read");
    MessageHandler.registerMessage(MSGID_EXCEPTION_REPLAYING_OPERATION,
         "Caught Exception %s when replaying operation %s : %s");
         "An Exception was caught while replaying operation %s : %s");
    MessageHandler.registerMessage(MSGID_NEED_CHANGELOG_PORT,
         "The Changelog server port must be defined");
    MessageHandler.registerMessage(MSGID_ERROR_UPDATING_RUV,
@@ -341,5 +342,8 @@
    MessageHandler.registerMessage(MSGID_CHANGELOG_ERROR_SENDING_ACK,
        "An unexpected error happened sending an ack to %s." +
        "This connection is going to be closed. ");
    MessageHandler.registerMessage(
        MSGID_EXCEPTION_RECEIVING_SYNCHRONIZATION_MESSAGE,
        "An Exception was caught while receiving synchronization message : %s");
  }
}
opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -26,15 +26,21 @@
 */
package org.opends.server.synchronization;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.TimeThread.getTime;
import static org.opends.server.synchronization.SynchMessages.*;
import static org.opends.server.loggers.Error.*;
import static org.opends.server.messages.MessageHandler.*;
import static org.opends.server.synchronization.OperationContext.SYNCHROCONTEXT;
import static org.opends.server.synchronization.Historical.*;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.zip.DataFormatException;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
@@ -47,17 +53,27 @@
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.messages.MessageHandler;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SynchronizationProviderResult;
/**
@@ -88,9 +104,6 @@
  private ServerState state;
  private int numReplayedPostOpCalled = 0;
  private boolean assuredFlag = false;
  private int maxReceiveQueue = 0;
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
@@ -114,6 +127,8 @@
  private DN configDn;
  private InternalClientConnection conn = new InternalClientConnection();
  static String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
  static String SERVER_ID_ATTR = "ds-cfg-directory-server-id";
@@ -379,8 +394,7 @@
          broker.restartReceive();
          for (int i=0; i<listenerThreadNumber; i++)
          {
            ListenerThread myThread = new ListenerThread(this,
                                                         changeNumberGenerator);
            ListenerThread myThread = new ListenerThread(this);
            myThread.start();
            synchroThreads.add(myThread);
          }
@@ -416,6 +430,155 @@
  }
  /**
   * Implement the  handleConflictResolution phase of the deleteOperation.
   *
   * @param deleteOperation The deleteOperation.
   * @return A SynchronizationProviderResult indicating if the operation
   *         can continue.
   */
  public SynchronizationProviderResult handleConflictResolution(
      DeleteOperation deleteOperation)
  {
    DeleteContext ctx =
      (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT);
    Entry deletedEntry = deleteOperation.getEntryToDelete();
    if (ctx != null)
    {
      /*
       * This is a synchronization operation
       * Check that the modified entry has the same entryuuid
       * has was in the original message.
       */
      String operationEntryUUID = ctx.getEntryUid();
      String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
      if (!operationEntryUUID.equals(modifiedEntryUUID))
      {
        /*
         * The changes entry is not the same entry as the one on
         * the original change was performed.
         * Probably the original entry was renamed and replaced with
         * another entry.
         * We must not let the change proceed, return a negative
         * result and set the result code to NO_SUCH_OBJET.
         * When the operation will return, the thread that started the
         * operation will try to find the correct entry and restart a new
         * operation.
         */
        deleteOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
        return new SynchronizationProviderResult(false);
      }
    }
    else
    {
      // There is no Synchronization context attached to the operation
      // so this is not a synchronization operation.
      ChangeNumber changeNumber = generateChangeNumber(deleteOperation);
      String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
      ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
      deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
    }
    return new SynchronizationProviderResult(true);
  }
  /**
   * Implement the  handleConflictResolution phase of the addOperation.
   *
   * @param addOperation The AddOperation.
   * @return A SynchronizationProviderResult indicating if the operation
   *         can continue.
   */
  public SynchronizationProviderResult handleConflictResolution(
      AddOperation addOperation)
  {
    if (addOperation.isSynchronizationOperation())
    {
      AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT);
      /*
       * If an entry with the same entry uniqueID already exist then
       * this operation has already been replayed in the past.
       */
      String uuid = ctx.getEntryUid();
      if (findEntryDN(uuid) != null)
      {
        addOperation.setResultCode(ResultCode.SUCCESS);
        return new SynchronizationProviderResult(false);
      }
    }
    return new SynchronizationProviderResult(true);
  }
  /**
   * Implement the  handleConflictResolution phase of the ModifyDNOperation.
   *
   * @param modifyDNOperation The ModifyDNOperation.
   * @return A SynchronizationProviderResult indicating if the operation
   *         can continue.
   */
  public SynchronizationProviderResult handleConflictResolution(
      ModifyDNOperation modifyDNOperation)
  {
    ModifyDnContext ctx =
      (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT);
    if (ctx != null)
    {
      /*
       * This is a synchronization operation
       * Check that the modified entry has the same entryuuid
       * as was in the original message.
       */
      String modifiedEntryUUID =
        Historical.getEntryUuid(modifyDNOperation.getOriginalEntry());
      if (!modifiedEntryUUID.equals(ctx.getEntryUid()))
      {
        /*
         * The modified entry is not the same entry as the one on
         * the original change was performed.
         * Probably the original entry was renamed and replaced with
         * another entry.
         * We must not let the change proceed, return a negative
         * result and set the result code to NO_SUCH_OBJET.
         * When the operation will return, the thread that started the
         * operation will try to find the correct entry and restart a new
         * operation.
         */
        modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
        return new SynchronizationProviderResult(false);
      }
      if (modifyDNOperation.getNewSuperior() != null)
      {
        /*
         * Also check that the current id of the
         * parent is the same as when the operation was performed.
         */
        String newParentId = findEntryId(modifyDNOperation.getNewSuperior());
        if (!newParentId.equals(ctx.getNewParentId()))
        {
          modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
          return new SynchronizationProviderResult(false);
        }
      }
    }
    else
    {
      // There is no Synchronization context attached to the operation
      // so this is not a synchronization operation.
      ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation);
      String newParentId = null;
      if (modifyDNOperation.getNewSuperior() != null)
      {
        newParentId = findEntryId(modifyDNOperation.getNewSuperior());
      }
      Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
      String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
      ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId);
      modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
    }
    return new SynchronizationProviderResult(true);
  }
  /**
   * Handle the conflict resolution.
   * Called by the core server after locking the entry and before
   * starting the actual modification.
@@ -425,25 +588,43 @@
  public SynchronizationProviderResult handleConflictResolution(
                                                ModifyOperation modifyOperation)
  {
    //  If operation do not yet have a change number, generate it
    ChangeNumber changeNumber =
      (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION);
    if (changeNumber == null)
    {
      synchronized(pendingChanges)
      {
        changeNumber = changeNumberGenerator.NewChangeNumber();
        pendingChanges.put(changeNumber, new PendingChange(changeNumber,
                                                           modifyOperation,
                                                           null));
      }
      modifyOperation.setAttachment(SYNCHRONIZATION, changeNumber);
    }
    ModifyContext ctx =
      (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT);
    // if Operation is a synchronization operation, solve conflicts
    if (modifyOperation.isSynchronizationOperation())
    Entry modifiedEntry = modifyOperation.getModifiedEntry();
    if (ctx == null)
    {
      Entry modifiedEntry = modifyOperation.getModifiedEntry();
      // There is no Synchronization context attached to the operation
      // so this is not a synchronization operation.
      ChangeNumber changeNumber = generateChangeNumber(modifyOperation);
      String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
      ctx = new ModifyContext(changeNumber, modifiedEntryUUID);
      modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
    }
    else
    {
      String modifiedEntryUUID = ctx.getEntryUid();
      String currentEntryUUID = Historical.getEntryUuid(modifiedEntry);
      if (!currentEntryUUID.equals(modifiedEntryUUID))
      {
        /*
         * The current modified entry is not the same entry as the one on
         * the original modification was performed.
         * Probably the original entry was renamed and replaced with
         * another entry.
         * We must not let the modification proceed, return a negative
         * result and set the result code to NO_SUCH_OBJET.
         * When the operation will return, the thread that started the
         * operation will try to find the correct entry and restart a new
         * operation.
         */
        modifyOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
        return new SynchronizationProviderResult(false);
      }
      /*
       * Solve the conflicts between modify operations
       */
      Historical historicalInformation = Historical.load(modifiedEntry);
      modifyOperation.setAttachment(HISTORICAL, historicalInformation);
@@ -456,16 +637,29 @@
         * stop the processing and send an OK result
         */
        modifyOperation.setResultCode(ResultCode.SUCCESS);
        /*
         * TODO : check that post operation do get called and
         * that pendingChanges do get updated
         */
        return new SynchronizationProviderResult(false);
      }
    }
    return new SynchronizationProviderResult(true);
  }
  /**
   * The preOperation phase for the add Operation.
   * Its job is to generate the Synchronization context associated to the
   * operation. It is necessary to do it in this phase because contrary to
   * the other operations, the entry uid is not set when the handleConflict
   * phase is called.
   *
   * @param addOperation The Add Operation.
   */
  public void doPreOperation(AddOperation addOperation)
  {
    AddContext ctx = new AddContext(generateChangeNumber(addOperation),
        Historical.getEntryUuid(addOperation),
        findEntryId(addOperation.getEntryDN().getParent()));
    addOperation.setAttachment(SYNCHROCONTEXT, ctx);
  }
  /**
   * Receive an update message from the changelog.
@@ -494,6 +688,7 @@
  /**
   * Do the necessary processing when an UpdateMessage was received.
   *
   * @param update The received UpdateMessage.
   */
  public void receiveUpdate(UpdateMessage update)
@@ -548,49 +743,20 @@
  {
    numReplayedPostOpCalled++;
    UpdateMessage msg = null;
    ChangeNumber curChangeNumber =
      (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
    ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
    if (op.getResultCode() != ResultCode.SUCCESS)
    {
      if (curChangeNumber != null)
      {
        /*
         * This code can be executed by multiple threads
         * Since TreeMap is not synchronized, it is mandatory to synchronize
         * it now.
         */
        synchronized (pendingChanges)
        {
          pendingChanges.remove(curChangeNumber);
        }
      }
      return;
    }
    ResultCode result = op.getResultCode();
    boolean isAssured = isAssured(op);
    if (!op.isSynchronizationOperation())
    if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
    {
      switch (op.getOperationType())
      msg = UpdateMessage.generateMsg(op, isAssured);
      if (msg == null)
      {
      case MODIFY :
        msg = new ModifyMsg((ModifyOperation) op);
        break;
      case ADD:
        msg = new AddMsg((AddOperation) op);
        break;
      case DELETE :
        msg = new DeleteMsg((DeleteOperation) op);
        break;
      case MODIFY_DN :
        msg = new ModifyDNMsg((ModifyDNOperation) op);
        break;
      default :
        /*
         * This is an operation type that we do not know about
         * It should never happen
         * This code can be executed by multiple threads
         * Since TreeMap is not synchronized, it is mandatory to synchronize
         * it now.
         * It should never happen.
         */
        synchronized (pendingChanges)
        {
@@ -603,68 +769,44 @@
          return;
        }
      }
      if (isAssured(op))
      {
        msg.setAssured();
      }
    }
    synchronized(pendingChanges)
    {
      PendingChange curChange = pendingChanges.get(curChangeNumber);
      if (curChange == null)
      if (result == ResultCode.SUCCESS)
      {
        // This should never happen
        int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
        String message = getMessage(msgID, curChangeNumber.toString(),
                                    op.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        return;
      }
      curChange.setCommitted(true);
      if (op.isSynchronizationOperation())
        curChange.setOp(op);
      else
        curChange.setMsg(msg);
      ChangeNumber firstChangeNumber = pendingChanges.firstKey();
      PendingChange firstChange = pendingChanges.get(firstChangeNumber);
      ChangeNumber lastCommittedChangeNumber = null;
      if (!op.isSynchronizationOperation() && msg.isAssured())
      {
        waitingAckMsgs.put(curChangeNumber, msg);
      }
      while ((firstChange != null) && firstChange.isCommitted())
      {
        if (firstChange.getOp().isSynchronizationOperation() == false)
        PendingChange curChange = pendingChanges.get(curChangeNumber);
        if (curChange == null)
        {
          numSentUpdates++;
          broker.publish(firstChange.getMsg());
          // This should never happen
          int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
          String message = getMessage(msgID, curChangeNumber.toString(),
              op.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          return;
        }
        curChange.setCommitted(true);
        lastCommittedChangeNumber = firstChange.getChangeNumber();
        pendingChanges.remove(lastCommittedChangeNumber);
        if (pendingChanges.isEmpty())
        {
          firstChange = null;
        }
        if (op.isSynchronizationOperation())
          curChange.setOp(op);
        else
          curChange.setMsg(msg);
        if (!op.isSynchronizationOperation() && isAssured && (msg != null))
        {
          firstChangeNumber = pendingChanges.firstKey();
          firstChange = pendingChanges.get(firstChangeNumber);
          waitingAckMsgs.put(curChangeNumber, msg);
        }
      }
      if (lastCommittedChangeNumber != null)
        state.update(lastCommittedChangeNumber);
      else if (!op.isSynchronizationOperation())
        pendingChanges.remove(curChangeNumber);
      pushCommittedChanges();
    }
    if (!op.isSynchronizationOperation() && msg.isAssured())
    if ((!op.isSynchronizationOperation()) && msg.isAssured() && (msg != null)
        && (result == ResultCode.SUCCESS))
    {
      synchronized (msg)
      {
@@ -683,19 +825,6 @@
  }
  /**
   * Check if an operation must be processed as an assured operation.
   *
   * @param op the operation to be checked.
   * @return true if the operations must be processed as an assured operation.
   */
  private boolean isAssured(Operation op)
  {
    // TODO : should have a filtering mechanism for checking
    // operation that are assured and operations that are not.
    return assuredFlag;
  }
  /**
   * get the number of updates received by the synchronization plugin.
   *
   * @return the number of updates received
@@ -790,27 +919,6 @@
  }
  /**
   * Generate and set the ChangeNumber of a given Operation.
   *
   * @param operation The Operation for which the ChangeNumber must be set.
   */
  public void setChangeNumber(Operation operation)
  {
    ChangeNumber changeNumber =
      (ChangeNumber) operation.getAttachment(SYNCHRONIZATION);
    if (changeNumber == null)
    {
      synchronized(pendingChanges)
      {
        changeNumber = changeNumberGenerator.NewChangeNumber();
        pendingChanges.put(changeNumber, new PendingChange(changeNumber,
            operation, null));
      }
      operation.setAttachment(SYNCHRONIZATION, changeNumber);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
@@ -826,7 +934,7 @@
    synchroThreads = new ArrayList<ListenerThread>();
    for (int i=0; i<10; i++)
    {
      ListenerThread myThread = new ListenerThread(this, changeNumberGenerator);
      ListenerThread myThread = new ListenerThread(this);
      myThread.start();
      synchroThreads.add(myThread);
    }
@@ -913,6 +1021,126 @@
  }
  /**
   * Create and replay a synchronized Operation from an UpdateMessage.
   *
   * @param msg The UpdateMessage to be replayed.
   */
  public void replay(UpdateMessage msg)
  {
    Operation op = null;
    boolean done = false;
    ChangeNumber changeNumber = null;
    try
    {
      while (!done)
      {
        op = msg.createOperation(conn);
        op.setInternalOperation(true);
        op.setSynchronizationOperation(true);
        changeNumber = OperationContext.getChangeNumber(op);
        if (changeNumber != null)
          changeNumberGenerator.adjust(changeNumber);
        op.run();
        ResultCode result = op.getResultCode();
        if (result != ResultCode.SUCCESS)
        {
          if (op instanceof ModifyOperation)
          {
            ModifyOperation newOp = (ModifyOperation) op;
            done = solveNamingConflict(newOp, msg);
          }
          else if (op instanceof DeleteOperation)
          {
            DeleteOperation newOp = (DeleteOperation) op;
            done = solveNamingConflict(newOp, msg);
          }
          else if (op instanceof AddOperation)
          {
            AddOperation newOp = (AddOperation) op;
            done = solveNamingConflict(newOp, msg);
          } else if (op instanceof ModifyDNOperation)
          {
            ModifyDNOperation newOp = (ModifyDNOperation) op;
            done = solveNamingConflict(newOp, msg);
          }
          else
          {
            done = true;  // unknown type of operation ?!
          }
        }
        else
        {
          done = true;
        }
      }
    }
    catch (ASN1Exception e)
    {
      int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
      String message = getMessage(msgID, msg) +
      stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
    catch (LDAPException e)
    {
      int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
      String message = getMessage(msgID, msg) +
      stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
    catch (DataFormatException e)
    {
      int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
      String message = getMessage(msgID, msg) +
      stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
    catch (Exception e)
    {
      if (changeNumber != null)
      {
        /*
         * An Exception happened during the replay process.
         * Continue with the next change but the servers will know start
         * to be inconsistent.
         * TODO : REPAIR : Should let the repair tool know about this
         */
        int msgID = MSGID_EXCEPTION_REPLAYING_OPERATION;
        String message = getMessage(msgID, stackTraceToSingleLineString(e),
            op.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
        updateError(changeNumber);
      }
      else
      {
        int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
        String message = getMessage(msgID, stackTraceToSingleLineString(e),
            msg.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      }
    }
    finally
    {
      if (msg.isAssured())
        ack(msg.getChangeNumber());
      incProcessedUpdates();
    }
  }
  /**
   * This methods is called when an error happends while replaying
   * and operation.
   * It is necessary because the postOPeration does not always get
@@ -925,6 +1153,434 @@
    synchronized (pendingChanges)
    {
      pendingChanges.remove(changeNumber);
      pushCommittedChanges();
    }
  }
  /**
   * Generate a new change number and insert it in the pending list.
   *
   * @param operation The operation for which the change number must be
   *                  generated.
   * @return The new change number.
   */
  private ChangeNumber generateChangeNumber(Operation operation)
  {
    ChangeNumber changeNumber;
    synchronized(pendingChanges)
    {
      changeNumber = changeNumberGenerator.NewChangeNumber();
      pendingChanges.put(changeNumber,
          new PendingChange(changeNumber, operation, null));
    }
    return changeNumber;
  }
  /**
   * Find the Unique Id of the entry with the provided DN by doing a
   * search of the entry and extracting its uniqueID from its attributes.
   *
   * @param dn The dn of the entry for which the unique Id is searched.
   *
   * @return The unique Id of the entry whith the provided DN.
   */
  private String findEntryId(DN dn)
  {
    if (dn == null)
      return null;
    try
    {
      LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
      attrs.add(ENTRYUIDNAME);
      InternalSearchOperation search = conn.processSearch(dn,
            SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES,
            0, 0, false,
            SearchFilter.createFilterFromString("objectclass=*"),
            attrs);
      if (search.getResultCode() == ResultCode.SUCCESS)
      {
        LinkedList<SearchResultEntry> result = search.getSearchEntries();
        if (!result.isEmpty())
        {
          SearchResultEntry resultEntry = result.getFirst();
          if (resultEntry != null)
          {
            return Historical.getEntryUuid(resultEntry);
          }
        }
      }
    } catch (DirectoryException e)
    {
      // never happens because the filter is always valid.
    }
    return null;
  }
  /**
   * find the current dn of an entry from its entry uuid.
   *
   * @param uuid the Entry Unique ID.
   * @return The curernt dn of the entry or null if there is no entry with
   *         the specified uuid.
   */
  private DN findEntryDN(String uuid)
  {
    try
    {
      InternalSearchOperation search = conn.processSearch(baseDN,
            SearchScope.WHOLE_SUBTREE,
            SearchFilter.createFilterFromString("entryuuid="+uuid));
      if (search.getResultCode() == ResultCode.SUCCESS)
      {
        LinkedList<SearchResultEntry> result = search.getSearchEntries();
        if (!result.isEmpty())
        {
          SearchResultEntry resultEntry = result.getFirst();
          if (resultEntry != null)
          {
            return resultEntry.getDN();
          }
        }
      }
    } catch (DirectoryException e)
    {
      // never happens because the filter is always valid.
    }
    return null;
  }
  /**
   * Solve a conflict detected when replaying a modify operation.
   *
   * @param op The operation that triggered the conflict detection.
   * @param msg The operation that triggered the conflict detection.
   * @return true if the process is completed, false if it must continue..
   */
  private boolean solveNamingConflict(ModifyOperation op,
      UpdateMessage msg)
  {
    ResultCode result = op.getResultCode();
    ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
    String entryUid = ctx.getEntryUid();
    if (result == ResultCode.NO_SUCH_OBJECT)
    {
      /*
       * This error may happen the operation is a modification but
       * the entry had been renamed on a different master in the same time.
       * search if the entry has been renamed, and return the new dn
       * of the entry.
       */
      msg.setDn(findEntryDN(entryUid).toString());
      return false;
    }
    return true;
  }
 /** Solve a conflict detected when replaying a delete operation.
  *
  * @param op The operation that triggered the conflict detection.
  * @param msg The operation that triggered the conflict detection.
  * @return true if the process is completed, false if it must continue..
  */
 private boolean solveNamingConflict(DeleteOperation op,
     UpdateMessage msg)
 {
   ResultCode result = op.getResultCode();
   DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
   String entryUid = ctx.getEntryUid();
   if (result == ResultCode.NO_SUCH_OBJECT)
   {
     /*
      * Find if the entry is still in the database.
      */
     DN currentDn = findEntryDN(entryUid);
     if (currentDn == null)
     {
       /*
        * The entry has already been deleted, either because this delete
        * has already been replayed or because another concurrent delete
        * has already done the job.
        * In any case, there is is nothing more to do.
        */
       return true;
     }
     else
     {
       /*
        * This entry has been renamed, replay the delete using its new DN.
        */
       msg.setDn(currentDn.toString());
       return false;
     }
   }
   else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF)
   {
     /*
      * This may happen when we replay a DELETE done on a master
      * but children of this entry have been added on another master.
      */
     /*
      * TODO : either delete all the childs or rename the child below
      * the top suffix by adding entryuuid in dn and delete this entry.
      */
   }
   return true;
 }
  /**
   * Solve a conflict detected when replaying a ADD operation.
   *
   * @param op The operation that triggered the conflict detection.
   * @param msg The operation that triggered the conflict detection.
   * @return true if the process is completed, false if it must continue.
   * @throws Exception When the operation is not valid.
   */
  private boolean solveNamingConflict(AddOperation op,
      UpdateMessage msg) throws Exception
  {
    ResultCode result = op.getResultCode();
    AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
    String entryUid = ctx.getEntryUid();
    String parentUniqueId = ctx.getParentUid();
    if (result == ResultCode.NO_SUCH_OBJECT)
    {
      /*
       * This can happen if the parent has been renamed or deleted
       * find the parent dn and calculate a new dn for the entry
       */
      if (parentUniqueId == null)
      {
        /*
         * This entry is the base dn of the backend.
         * It is quite weird that the operation result be NO_SUCH_OBJECT.
         * There is notthing more we can do except TODO log a
         * message for the repair tool to look at this problem.
         */
        return true;
      }
      DN parentDn = findEntryDN(parentUniqueId);
      if (parentDn == null)
      {
        /*
         * The parent has been deleted, so this entry should not
         * exist don't do the ADD.
         */
        return true;
      }
      else
      {
        RDN entryRdn = op.getEntryDN().getRDN();
        msg.setDn(parentDn + "," + entryRdn);
        return false;
      }
    }
    else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
    {
      /*
       * This can happen if
       *  - two adds are done on different servers but with the
       *    same target DN.
       *  - the same ADD is being replayed for the second time on this server.
       * if the nsunique ID already exist, assume this is a replay and
       *        don't do anything
       * if the entry unique id do not exist, generate conflict.
       */
      if (findEntryDN(entryUid) != null)
      {
        // entry already exist : this is a replay
        return true;
      }
      else
      {
        addConflict(op);
        msg.setDn(generateConflictDn(entryUid, msg.getDn()));
        return false;
      }
    }
    return true;
  }
  /**
   * Solve a conflict detected when replaying a Modify DN operation.
   *
   * @param op The operation that triggered the conflict detection.
   * @param msg The operation that triggered the conflict detection.
   * @return true if the process is completed, false if it must continue.
   * @throws Exception When the operation is not valid.
   */
  private boolean solveNamingConflict(ModifyDNOperation op,
      UpdateMessage msg) throws Exception
  {
    ResultCode result = op.getResultCode();
    ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
    String entryUid = ctx.getEntryUid();
    String newSuperiorID = ctx.getNewParentId();
    if (result == ResultCode.NO_SUCH_OBJECT)
    {
      ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
      /*
       * four possible cases :
       * - the modified entry has been renamed
       * - the new parent has been renamed
       * - the operation is replayed for the second time.
       * - the entry has been deleted
       * action :
       *  - change the target dn and the new parent dn and
       *        restart the operation,
       *  - don't do anything if the operation is replayed.
       */
      // Construct the new DN to use for the entry.
      DN entryDN = op.getEntryDN();
      DN newSuperior = findEntryDN(newSuperiorID);
      RDN newRDN = op.getNewRDN();
      DN parentDN;
      if (newSuperior == null)
      {
        parentDN = entryDN.getParent();
      }
      else
      {
        parentDN = newSuperior;
      }
      if ((parentDN == null) || parentDN.isNullDN())
      {
        /* this should never happen
         * can't solve any conflict in this case.
         */
        throw new Exception("operation parameters are invalid");
      }
      RDN[] parentComponents = parentDN.getRDNComponents();
      RDN[] newComponents    = new RDN[parentComponents.length+1];
      System.arraycopy(parentComponents, 0, newComponents, 1,
          parentComponents.length);
      newComponents[0] = newRDN;
      DN newDN = new DN(newComponents);
      // get the current DN of this entry in the database.
      DN currentDN = findEntryDN(entryUid);
      // if the newDN and the current DN match then the operation
      // is a no-op (this was probably a second replay)
      // don't do anything.
      if (newDN.equals(currentDN))
      {
        return true;
      }
      msg.setDn(currentDN.toString());
      modifyDnMsg.setNewSuperior(newSuperior.toString());
      return false;
    }
    else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
    {
      /*
       * This may happen when two modifyDn operation
       * are done on different servers but with the same target DN
       * add the conflict object class to the entry
       * and rename it using its entryuuid.
       */
      generateAddConflictOp(op);
      msg.setDn(generateConflictDn(entryUid, msg.getDn()));
      return false;
    }
    return true;
  }
  /**
   * Generate a modification to add the conflict ObjectClass to an entry
   * whose Dn is now conflicting with another entry.
   *
   * @param op The operation causing the conflict.
   */
  private void generateAddConflictOp(ModifyDNOperation op)
  {
    // TODO
  }
  /**
   * Add the conflict object class to an entry that could
   * not be added because it is conflicting with another entry.
   *
   * @param addOp The conflicting Add Operation.
   */
  private void addConflict(AddOperation addOp)
  {
    /*
     * TODO
     */
  }
  /**
   * Generate the Dn to use for a conflicting entry.
   *
   * @param op Operation that generated the conflict
   * @param dn Original dn.
   * @return The generated Dn for a conflicting entry.
   */
  private String generateConflictDn(String entryUid, String dn)
  {
    return dn + "entryuuid=" + entryUid;
  }
  /**
   * Check if an operation must be processed as an assured operation.
   *
   * @param op the operation to be checked.
   * @return true if the operations must be processed as an assured operation.
   */
  private boolean isAssured(Operation op)
  {
    // TODO : should have a filtering mechanism for checking
    // operation that are assured and operations that are not.
    return false;
  }
  /**
   * Push all committed local changes to the changelog service.
   * PRECONDITION : The pendingChanges lock must be held before calling
   * this method.
   */
  private void pushCommittedChanges()
  {
    if (pendingChanges.isEmpty())
      return;
    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
    while ((firstChange != null) && firstChange.isCommitted())
    {
      if (firstChange.getOp().isSynchronizationOperation() == false)
      {
        numSentUpdates++;
        broker.publish(firstChange.getMsg());
      }
      state.update(firstChangeNumber);
      pendingChanges.remove(firstChangeNumber);
      if (pendingChanges.isEmpty())
      {
        firstChange = null;
      }
      else
      {
        firstChangeNumber = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstChangeNumber);
      }
    }
  }
}
opends/src/server/org/opends/server/synchronization/UpdateMessage.java
@@ -27,8 +27,13 @@
package org.opends.server.synchronization;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -45,7 +50,12 @@
  /**
   * The ChangeNumber of this update.
   */
  protected ChangeNumber changeNumber;
  private ChangeNumber changeNumber;
  /**
   * The DN on which the update was originally done.
   */
  private String dn = null;
  /**
   * True when the update must use assured replication.
@@ -53,6 +63,83 @@
  private boolean assuredFlag = false;
  /**
   * The UniqueId of the entry that was updated.
   */
  private String UniqueId;
  /**
   * Creates a new UpdateMessage with the given informations.
   *
   * @param ctx The Synchronization Context of the operation for which the
   *            update message must be created,.
   * @param dn The dn of the entry on which the change
   *           that caused the creation of this object happened
   */
  public UpdateMessage(OperationContext ctx, String dn)
  {
    this.changeNumber = ctx.getChangeNumber();
    this.UniqueId = ctx.getEntryUid();
    this.dn = dn;
  }
  /**
   * Creates a new UpdateMessage from an ecoded byte array.
   *
   * @param in The encoded byte array containind the UpdateMessage.
   * @throws DataFormatException if the encoded byte array is not valid.
   * @throws UnsupportedEncodingException if UTF-8 is not supprted.
   */
  public UpdateMessage(byte[] in) throws DataFormatException,
                                         UnsupportedEncodingException
  {
    /* read the changeNumber */
    int pos = 1;
    int length = getNextLength(in, pos);
    String changenumberStr = new String(in, pos, length, "UTF-8");
    this.changeNumber = new ChangeNumber(changenumberStr);
  }
  /**
   * Generates an Update Message which the provided information.
   *
   * @param op The operation fo which the message must be created.
   * @param isAssured flag indicating if the operation is an assured operation.
   * @return The generated message.
   */
  public static UpdateMessage generateMsg(Operation op, boolean isAssured)
  {
    UpdateMessage msg = null;
    switch (op.getOperationType())
    {
    case MODIFY :
      msg = new ModifyMsg((ModifyOperation) op);
      if (isAssured)
        msg.setAssured();
      break;
    case ADD:
      msg = new AddMsg((AddOperation) op);
      if (isAssured)
        msg.setAssured();
      break;
    case DELETE :
      msg = new DeleteMsg((DeleteOperation) op);
      if (isAssured)
        msg.setAssured();
      break;
    case MODIFY_DN :
      msg = new ModifyDNMsg((ModifyDNOperation) op);
      if (isAssured)
        msg.setAssured();
      break;
    }
    return msg;
  }
  /**
   * Get the ChangeNumber from the message.
   * @return the ChangeNumber
   */
@@ -62,6 +149,35 @@
  }
  /**
   * Get the DN on which the operation happened.
   *
   * @return The DN on which the operations happened.
   */
  public String getDn()
  {
    return dn;
  }
  /**
   * Set the DN.
   * @param dn The dn that must now be used for this message.
   */
  public void setDn(String dn)
  {
    this.dn = dn;
  }
  /**
   * Get the Unique Identifier of the entry on which the operation happened.
   *
   * @return The Unique Identifier of the entry on which the operation happened.
   */
  public String getUniqueId()
  {
    return UniqueId;
  }
  /**
   * Get a boolean indicating if the Update must be processed as an
   * Asynchronous or as an assured synchronization.
   *
@@ -117,10 +233,115 @@
   * @throws  ASN1Exception In case of ASN1 decoding exception.
   * @throws DataFormatException In case of bad msg format.
   */
  public abstract Operation createOperation(InternalClientConnection conn)
  public Operation createOperation(InternalClientConnection conn)
         throws LDAPException, ASN1Exception, DataFormatException
  {
    return createOperation(conn, dn);
  }
  /**
   * Create and Operation from the message using the provided DN.
   *
   * @param   conn connection to use when creating the message.
   * @param   newDn the DN to use when creating the operation.
   * @return  the created Operation.
   * @throws  LDAPException In case of LDAP decoding exception.
   * @throws  ASN1Exception In case of ASN1 decoding exception.
   * @throws DataFormatException In case of bad msg format.
   */
  public abstract Operation createOperation(InternalClientConnection conn,
                                            String newDn)
         throws LDAPException, ASN1Exception, DataFormatException;
  /**
   * Encode the common header for all the UpdateMessage.
   *
   * @param type the type of UpdateMessage to encode.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @return a byte array containing the common header and enough space to
   *         encode the reamining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   */
  public byte[] encodeHeader(byte type, int additionalLength)
    throws UnsupportedEncodingException
  {
    byte[] byteDn = dn.getBytes("UTF-8");
    byte[] changeNumberByte =
      this.getChangeNumber().toString().getBytes("UTF-8");
    byte[] byteEntryuuid = getUniqueId().getBytes("UTF-8");
    /* The message header is stored in the form :
     * <operation type>changenumber><dn><entryuuid><change>
     * the length of result byte array is therefore :
     *   1 + dn length + 1 + 24 + additional_length
     */
    int length = 1 + changeNumberByte.length + 1 + byteDn.length + 1
                 + byteEntryuuid.length + 1 + additionalLength;
    byte[] encodedMsg = new byte[length];
    /* put the type of the operation */
    encodedMsg[0] = type;
    int pos = 1;
    /* put the ChangeNumber */
    pos = addByteArray(changeNumberByte, encodedMsg, pos);
    /* put the DN and a terminating 0 */
    pos = addByteArray(byteDn, encodedMsg, pos);
    /* put the entry uuid and a terminating 0 */
    pos = addByteArray(byteEntryuuid, encodedMsg, pos);
    return encodedMsg;
  }
  /**
   * Decode the Header part of this Update Message, and check its type.
   *
   * @param type The type of this Update Message.
   * @param encodedMsg the encoded form of the UpdateMessage.
   * @return the position at which the remaining part of the message starts.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   */
  public int decodeHeader(byte type, byte [] encodedMsg)
                          throws DataFormatException
  {
    /* first byte is the type */
    if (encodedMsg[0] != type)
      throw new DataFormatException("byte[] is not a valid msg");
    try
    {
      /* read the changeNumber */
      int pos = 1;
      int length = getNextLength(encodedMsg, pos);
      String changenumberStr = new String(encodedMsg, pos, length, "UTF-8");
      pos += length + 1;
      changeNumber = new ChangeNumber(changenumberStr);
      /* read the dn */
      length = getNextLength(encodedMsg, pos);
      dn = new String(encodedMsg, pos, length, "UTF-8");
      pos += length + 1;
      /* read the entryuuid */
      length = getNextLength(encodedMsg, pos);
      UniqueId = new String(encodedMsg, pos, length, "UTF-8");
      pos += length + 1;
      return pos;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
   */
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/ModifyConflictTest.java
@@ -36,6 +36,8 @@
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import static org.opends.server.synchronization.OperationContext.*;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -47,13 +49,12 @@
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ObjectClass;
import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION;
/*
 * Test the conflict resolution for modify operations
 * This is still a work in progress.
 * currently implemented tests
 *  - check that an replace with a smaller csn is ignored
 *  - check that an replace with a smaller csn is ignored
 * should test :
 *  - conflict with multi-valued attributes
 *  - conflict with single-valued attributes
@@ -74,53 +75,53 @@
  @Test()
  public void replaceAndAdd()
         throws Exception
  {
  {
    /*
     * Objectclass and DN do not have any impact on the modifty conflict
     * resolution for the description attribute.
     * Always use the same values for all these tests.
     */
    DN dn = DN.decode("dc=com");
    DN dn = DN.decode("dc=com");
    Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>();
    ObjectClass org = DirectoryServer.getObjectClass("organization");
    objectClasses.put(org, "organization");
    /*
     * start with a new entry with an empty description
     */
    Entry entry = new Entry(dn, objectClasses, null, null);
    Historical hist = Historical.load(entry);
    /*
     * simulate a modify-replace done at time t10
     */
    testModify(entry, hist, "description", ModificationType.REPLACE,
               "init value", 10, true);
    /*
     * Now simulate an add at an earlier date that the previous replace
     * conflict resolution should remove it.
     */
     */
    testModify(entry, hist, "description", ModificationType.ADD,
               "older value", 1, false);
    /*
     * Now simulate an add at an earlier date that the previous replace
     * conflict resolution should remove it.
     * (a second time to make sure...)
     */
     */
    testModify(entry, hist, "description", ModificationType.ADD,
               "older value", 2, false);
    /*
     * Now simulate an add at a later date that the previous replace.
     * conflict resolution should keep it
     */
     */
    testModify(entry, hist, "description", ModificationType.ADD,
               "new value", 11, true);
  }
  /**
   * Test that conflict between a modify-delete-attribute and modify-add
   * for multi-valued attributes are handled correctly.
@@ -128,53 +129,53 @@
  @Test()
  public void deleteAndAdd()
         throws Exception
  {
  {
    /*
     * Objectclass and DN do not have any impact on the modifty conflict
     * resolution for the description attribute.
     * Always use the same values for all these tests.
     */
    DN dn = DN.decode("dc=com");
    DN dn = DN.decode("dc=com");
    Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>();
    ObjectClass org = DirectoryServer.getObjectClass("organization");
    objectClasses.put(org, "organization");
    /*
     * start with a new entry with an empty description
     */
    Entry entry = new Entry(dn, objectClasses, null, null);
    Historical hist = Historical.load(entry);
    /*
     * simulate a delete of the whole description attribute done at time t10
     */
    testModify(entry, hist, "description", ModificationType.DELETE,
               null, 10, true);
    /*
     * Now simulate an add at an earlier date that the previous delete.
     * The conflict resolution should detect that this add must be ignored.
     */
     */
    testModify(entry, hist, "description",  ModificationType.ADD,
               "older value", 1, false);
    /*
     * Now simulate an add at an earlier date that the previous delete.
     * The conflict resolution should detect that this add must be ignored.
     * (a second time to make sure that historical information is kept...)
     */
     */
    testModify(entry, hist, "description", ModificationType.ADD,
               "older value", 2, false);
    /*
     * Now simulate an add at a later date that the previous delete.
     * conflict resolution should keep it
     */
     */
    testModify(entry, hist, "description", ModificationType.ADD,
               "new value", 11, true);
  }
  /**
  * Test that conflict between a modify-add and modify-add
  * for multi-valued attributes are handled correctly.
@@ -182,67 +183,67 @@
 @Test()
 public void addAndAdd()
        throws Exception
 {
 {
   /*
    * Objectclass and DN do not have any impact on the modifty conflict
    * resolution for the description attribute.
    * Always use the same values for all these tests.
    */
   DN dn = DN.decode("dc=com");
   DN dn = DN.decode("dc=com");
   Map<ObjectClass, String> objectClasses = new HashMap<ObjectClass, String>();
   ObjectClass org = DirectoryServer.getObjectClass("organization");
   objectClasses.put(org, "organization");
   /*
    * start with a new entry with an empty description
    */
   Entry entry = new Entry(dn, objectClasses, null, null);
   Historical hist = Historical.load(entry);
   /*
    * simulate a add of the description attribute done at time t10
    */
   testModify(entry, hist, "description", ModificationType.ADD,
              "init value", 10, true);
   /*
    * Now simulate an add at an earlier date that the previous add.
    * The conflict resolution should detect that this add must be kept.
    */
   testModify(entry, hist, "description", ModificationType.ADD,
    */
   testModify(entry, hist, "description", ModificationType.ADD,
              "older value", 1, true);
   /*
    * Now simulate an add at an earlier date that the previous add.
    * The conflict resolution should detect that this add must be kept.
    * (a second time to make sure that historical information is kept...)
    */
    */
   testModify(entry, hist, "description", ModificationType.ADD,
              "older value", 2, false);
   /*
    * Now simulate an add at a later date that the previous add.
    * conflict resolution should keep it
    */
    */
   testModify(entry, hist, "description", ModificationType.ADD,
              "new value", 11, true);
              "new value", 11, true);
 }
  /*
   * helper function.
   */
  private static void testModify(Entry entry,
      Historical hist, String attrName,
  private static void testModify(Entry entry,
      Historical hist, String attrName,
      ModificationType modType, String value,
      int date, boolean keepChangeResult)
  {
    InternalClientConnection connection = new InternalClientConnection();
    ChangeNumber t = new ChangeNumber(date, (short) 0, (short) 0);
    /* create AttributeType description that will be usedfor this test */
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName, true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    if (value != null)
      values.add(new AttributeValue(attrType, value));
@@ -250,17 +251,17 @@
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(modType, attr);
    mods.add(mod);
    ModifyOperation modOp = new ModifyOperation(connection, 1, 1, null,
                                              entry.getDN(), mods);
    modOp.setAttachment(SYNCHRONIZATION, t);
    ModifyContext ctx = new ModifyContext(t, "uniqueId");
    modOp.setAttachment(SYNCHROCONTEXT, ctx);
    hist.replayOperation(modOp, entry);
    /*
     * The last older change should have been detected as conflicting
     * and should be removed by the conflict resolution code.
     * The last older change should have been detected as conflicting
     * and should be removed by the conflict resolution code.
     */
    if (keepChangeResult)
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/SynchronizationMsgTest.java
@@ -51,7 +51,7 @@
import org.opends.server.types.RDN;
import org.opends.server.util.TimeThread;
import static org.opends.server.synchronization.SynchMessages.SYNCHRONIZATION;
import static org.opends.server.synchronization.OperationContext.*;
/**
 * Test the contructors, encoders and decoders of the synchronization
@@ -130,10 +130,10 @@
  {
    DN dn = DN.decode(rawdn);
    InternalClientConnection connection = new InternalClientConnection();
    ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods);
    ModifyMsg msg = new ModifyMsg(changeNumber, dn, mods, "fakeuniqueid");
    ModifyMsg generatedMsg = new ModifyMsg(msg.getBytes());
    assertEquals(msg.changeNumber, generatedMsg.changeNumber);
    assertEquals(msg.getChangeNumber(), generatedMsg.getChangeNumber());
    Operation op = msg.createOperation(connection);
    Operation generatedOperation = generatedMsg.createOperation(connection);
@@ -145,6 +145,8 @@
    ModifyOperation mod2 = (ModifyOperation) generatedOperation;
    assertEquals(mod1.getRawEntryDN(), mod2.getRawEntryDN());
    assertEquals( mod1.getAttachment(SYNCHROCONTEXT),
                  mod2.getAttachment(SYNCHROCONTEXT));
    /*
     * TODO : test that the generated mod equals the original mod.
@@ -178,11 +180,11 @@
                                             DN.decode(rawDN));
    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
      (short) 123, (short) 45);
    op.setAttachment(SYNCHRONIZATION, cn);
    op.setAttachment(SYNCHROCONTEXT, new DeleteContext(cn, "uniqueid"));
    DeleteMsg msg = new DeleteMsg(op);
    DeleteMsg generatedMsg = new DeleteMsg(msg.getBytes());
    assertEquals(msg.changeNumber, generatedMsg.changeNumber);
    assertEquals(msg.getChangeNumber(), generatedMsg.getChangeNumber());
    Operation generatedOperation = generatedMsg.createOperation(connection);
@@ -219,13 +221,14 @@
    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
                                      (short) 123, (short) 45);
    op.setAttachment(SYNCHRONIZATION, cn);
    op.setAttachment(SYNCHROCONTEXT,
        new ModifyDnContext(cn, "uniqueid", "newparentId"));
    ModifyDNMsg msg = new ModifyDNMsg(op);
    ModifyDNMsg generatedMsg = new ModifyDNMsg(msg.getBytes());
    Operation generatedOperation = generatedMsg.createOperation(connection);
    ModifyDNOperation mod2 = (ModifyDNOperation) generatedOperation;
    assertEquals(msg.changeNumber, generatedMsg.changeNumber);
    assertEquals(msg.getChangeNumber(), generatedMsg.getChangeNumber());
    assertEquals(op.getRawEntryDN(), mod2.getRawEntryDN());
    assertEquals(op.getRawNewRDN(), mod2.getRawNewRDN());
    assertEquals(op.deleteOldRDN(), mod2.deleteOldRDN());
@@ -238,7 +241,7 @@
        {"dc=test,dc=com"},
        };
  }
  @Test(dataProvider = "addEncodeDecode")
  public void addEncodeDecode(String rawDN)
         throws Exception
@@ -269,7 +272,8 @@
    ChangeNumber cn = new ChangeNumber(TimeThread.getTime(),
                                      (short) 123, (short) 45);
    AddMsg msg = new AddMsg(cn, rawDN, objectClass, userAttributes,
    AddMsg msg = new AddMsg(cn, rawDN, "thisIsaUniqueID", "parentUniqueId",
                            objectClass, userAttributes,
                            operationalAttributes);
    AddMsg generatedMsg = new AddMsg(msg.getBytes());
    assertEquals(msg.getBytes(), generatedMsg.getBytes());