mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
14.29.2009 bcf686add35bda4a6ac5c3d085abe151ea018e8e
opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
@@ -35,9 +35,13 @@
                                     java.lang.Comparable<ChangeNumber>
{
  private static final long serialVersionUID = -8802722277749190740L;
  private long timeStamp;
  private int seqnum;
  private short serverId;
  private final long timeStamp;
  private final int seqnum;
  private final short serverId;
  // A String representation of the ChangeNumber suitable for network
  // transmission.
  private String formatedString = null;;
  /**
   * Create a new ChangeNumber from a String.
@@ -54,6 +58,8 @@
    temp = str.substring(20, 28);
    seqnum = Integer.parseInt(temp, 16);
    formatedString = str;
  }
  /**
@@ -141,11 +147,27 @@
   */
  public String toString()
  {
    return format();
  }
  /**
   * Convert the ChangeNumber to a String that is suitable for network
   * transmission.
   *
   * @return the string
   */
  public String format()
  {
    if (formatedString != null)
      return formatedString;
    return String.format("%016x%04x%08x", timeStamp, serverId, seqnum);
  }
  /**
   * Convert the ChangeNumber to a printable String that is .
   * Convert the ChangeNumber to a printable String with a user friendly
   * format.
   *
   * @return the string
   */
  public String toStringUI()
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -49,6 +49,7 @@
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -869,7 +870,18 @@
        }
        else
        {
          pendingChanges.commit(curChangeNumber, msg);
          // If assured replication is configured, this will prepare blocking
          // mechanism. If assured replication is disabled, this returns
          // immediately
          prepareWaitForAckIfAssuredEnabled(msg);
          try
          {
            msg.encode();
          } catch (UnsupportedEncodingException e)
          {
            // will be caught at publish time.
          }
          pendingChanges.commitAndPushCommittedChanges(curChangeNumber, msg);
        }
      }
      catch  (NoSuchElementException e)
@@ -892,18 +904,12 @@
      if (curChangeNumber != null)
      {
        pendingChanges.remove(curChangeNumber);
        pendingChanges.pushCommittedChanges();
      }
    }
    if (!op.isSynchronizationOperation())
    {
      // If assured replication is configured, this will prepare blocking
      // mechanism. If assured replication is disabled, this returns
      // immediately
      prepareWaitForAckIfAssuredEnabled(msg);
      pendingChanges.pushCommittedChanges();
      // If assured replication is enabled, this will wait for the matching
      // ack or time out. If assured replication is disabled, this returns
      // immediately
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
@@ -113,6 +113,18 @@
  public synchronized void commit(ChangeNumber changeNumber,
      LDAPUpdateMsg msg)
  {
    _commit(changeNumber, msg);
  }
  /**
   * Mark an update message as committed.
   *
   * @param changeNumber The ChangeNumber of the update message that must be
   *                     set as committed.
   * @param msg          The message associated to the update.
   */
  public void _commit(ChangeNumber changeNumber,
      LDAPUpdateMsg msg)
  {
    PendingChange curChange = pendingChanges.get(changeNumber);
    if (curChange == null)
    {
@@ -149,6 +161,18 @@
   */
  public synchronized ChangeNumber putLocalOperation(PluginOperation operation)
  {
    return _putLocalOperation(operation);
  }
  /**
   * Add a new UpdateMsg to the pending list from the provided local
   * operation.
   *
   * @param operation The local operation for which an UpdateMsg must
   *                  be added in the pending list.
   * @return The ChangeNumber now associated to the operation.
   */
  public  ChangeNumber _putLocalOperation(PluginOperation operation)
  {
    ChangeNumber changeNumber;
    changeNumber = changeNumberGenerator.newChangeNumber();
@@ -165,6 +189,15 @@
   */
  public synchronized int pushCommittedChanges()
  {
    return _pushCommittedChanges();
  }
  /**
   * Push all committed local changes to the replicationServer service.
   *
   * @return The number of pushed updates.
   */
  public int _pushCommittedChanges()
  {
    int numSentUpdates = 0;
    if (pendingChanges.isEmpty())
      return numSentUpdates;
@@ -195,4 +228,24 @@
    }
    return numSentUpdates;
  }
  /**
   * Mark an update message as committed, then
   * push all committed local changes to the replicationServer service
   * in a single atomic operation.
   *
   *
   * @param changeNumber The ChangeNumber of the update message that must be
   *                     set as committed.
   * @param msg          The message associated to the update.
   *
   * @return The number of pushed updates.
   */
  public synchronized int commitAndPushCommittedChanges(
      ChangeNumber changeNumber,
      LDAPUpdateMsg msg)
  {
    _commit(changeNumber, msg);
    return _pushCommittedChanges();
  }
}
opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -241,13 +241,13 @@
       * error><failed server ids>
       */
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
      ByteArrayOutputStream oStream = new ByteArrayOutputStream(200);
      /* Put the type of the operation */
      oStream.write(MSG_TYPE_ACK);
      /* Put the ChangeNumber */
      byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
      byte[] changeNumberByte = changeNumber.format().getBytes("UTF-8");
      oStream.write(changeNumberByte);
      oStream.write(0);
opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -258,34 +258,41 @@
  @Override
  public byte[] getBytes() throws UnsupportedEncodingException
  {
    int length = encodedAttributes.length;
    byte[] byteParentId = null;
    if (parentUniqueId != null)
    if (bytes == null)
    {
      byteParentId = parentUniqueId.getBytes("UTF-8");
      length += byteParentId.length + 1;
      int length = encodedAttributes.length;
      byte[] byteParentId = null;
      if (parentUniqueId != null)
      {
        byteParentId = parentUniqueId.getBytes("UTF-8");
        length += byteParentId.length + 1;
      }
      else
      {
        length += 1;
      }
      /* encode the header in a byte[] large enough to also contain the mods */
      byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, length);
      int pos = resultByteArray.length - length;
      if (byteParentId != null)
        pos = addByteArray(byteParentId, resultByteArray, pos);
      else
        resultByteArray[pos++] = 0;
      /* put the attributes */
      for (int i=0; i<encodedAttributes.length; i++,pos++)
      {
        resultByteArray[pos] = encodedAttributes[i];
      }
      return resultByteArray;
    }
    else
    {
      length += 1;
      return bytes;
    }
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, length);
    int pos = resultByteArray.length - length;
    if (byteParentId != null)
      pos = addByteArray(byteParentId, resultByteArray, pos);
    else
      resultByteArray[pos++] = 0;
    /* put the attributes */
    for (int i=0; i<encodedAttributes.length; i++,pos++)
    {
      resultByteArray[pos] = encodedAttributes[i];
    }
    return resultByteArray;
  }
  /**
opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -106,7 +106,14 @@
  @Override
  public byte[] getBytes() throws UnsupportedEncodingException
  {
    return encodeHeader(MSG_TYPE_DELETE, 0);
    if (bytes == null)
    {
     return encodeHeader(MSG_TYPE_DELETE, 0);
    }
    else
    {
      return bytes;
    }
  }
  /**
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -58,6 +58,11 @@
  protected String uniqueId;
  /**
   * Encoded form of the LDAPUpdateMsg.
   */
  protected byte[] bytes = null;
  /**
   * Creates a new UpdateMsg.
   */
  public LDAPUpdateMsg()
@@ -158,6 +163,20 @@
    return uniqueId;
  }
  /**
   * Do all the work necessary for the encoding.
   *
   * This is useful in case when one wants to perform this outside
   * of a synchronized portion of code.
   *
   * This method is not synchronized and therefore not MT safe.
   *
   * @throws UnsupportedEncodingException when encoding fails.
   */
  public void encode() throws UnsupportedEncodingException
  {
    bytes = getBytes();
  }
  /**
   * Create and Operation from the message.
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -524,56 +524,63 @@
   */
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  {
    byte[] byteNewRdn = newRDN.getBytes("UTF-8");
    byte[] byteNewSuperior = null;
    byte[] byteNewSuperiorId = null;
    // calculate the length necessary to encode the parameters
    int length = byteNewRdn.length + 1 + 1;
    if (newSuperior != null)
    if (bytes == null)
    {
      byteNewSuperior = newSuperior.getBytes("UTF-8");
      length += byteNewSuperior.length + 1;
      byte[] byteNewRdn = newRDN.getBytes("UTF-8");
      byte[] byteNewSuperior = null;
      byte[] byteNewSuperiorId = null;
      // calculate the length necessary to encode the parameters
      int length = byteNewRdn.length + 1 + 1;
      if (newSuperior != null)
      {
        byteNewSuperior = newSuperior.getBytes("UTF-8");
        length += byteNewSuperior.length + 1;
      }
      else
        length += 1;
      if (newSuperiorId != null)
      {
        byteNewSuperiorId = newSuperiorId.getBytes("UTF-8");
        length += byteNewSuperiorId.length + 1;
      }
      else
        length += 1;
      byte[] resultByteArray = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, length);
      int pos = resultByteArray.length - length;
      /* put the new RDN and a terminating 0 */
      pos = addByteArray(byteNewRdn, resultByteArray, pos);
      /* put the newsuperior and a terminating 0 */
      if (newSuperior != null)
      {
        pos = addByteArray(byteNewSuperior, resultByteArray, pos);
      }
      else
        resultByteArray[pos++] = 0;
      /* put the newsuperiorId and a terminating 0 */
      if (newSuperiorId != null)
      {
        pos = addByteArray(byteNewSuperiorId, resultByteArray, pos);
      }
      else
        resultByteArray[pos++] = 0;
      /* put the deleteoldrdn flag */
      if (deleteOldRdn)
        resultByteArray[pos++] = 1;
      else
        resultByteArray[pos++] = 0;
      return resultByteArray;
    }
    else
      length += 1;
    if (newSuperiorId != null)
    {
      byteNewSuperiorId = newSuperiorId.getBytes("UTF-8");
      length += byteNewSuperiorId.length + 1;
      return bytes;
    }
    else
      length += 1;
    byte[] resultByteArray = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, length);
    int pos = resultByteArray.length - length;
    /* put the new RDN and a terminating 0 */
    pos = addByteArray(byteNewRdn, resultByteArray, pos);
    /* put the newsuperior and a terminating 0 */
    if (newSuperior != null)
    {
      pos = addByteArray(byteNewSuperior, resultByteArray, pos);
    }
    else
      resultByteArray[pos++] = 0;
    /* put the newsuperiorId and a terminating 0 */
    if (newSuperiorId != null)
    {
      pos = addByteArray(byteNewSuperiorId, resultByteArray, pos);
    }
    else
      resultByteArray[pos++] = 0;
    /* put the deleteoldrdn flag */
    if (deleteOldRdn)
      resultByteArray[pos++] = 1;
    else
      resultByteArray[pos++] = 0;
    return resultByteArray;
  }
}
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -92,6 +92,8 @@
  public ModifyMsg(byte[] in) throws DataFormatException,
                                     UnsupportedEncodingException
  {
    bytes = in;
    // Decode header
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_MODIFY;
@@ -117,19 +119,44 @@
  }
  /**
   * Creates a new Modify message from a V1 byte[].
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
   * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM.
   *
   * @return The created ModifyMsg.
   */
  public static ModifyMsg createV1(byte[] in) throws DataFormatException,
                                     UnsupportedEncodingException
  {
    ModifyMsg msg = new ModifyMsg(in);
    msg.bytes = null;
    return msg;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public byte[] getBytes() throws UnsupportedEncodingException
  {
    /* encode the header in a byte[] large enough to also contain the mods */
    byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1);
    if (bytes == null)
    {
      /* encode the header in a byte[] large enough to also contain the mods */
      byte[] mybytes = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1);
    /* add the mods */
    int pos = encodedMsg.length - (encodedMods.length + 1);
    addByteArray(encodedMods, encodedMsg, pos);
      /* add the mods */
      int pos = mybytes.length - (encodedMods.length + 1);
      addByteArray(encodedMods, mybytes, pos);
    return encodedMsg;
      return mybytes;
    }
    else
    {
      return bytes;
    }
  }
  /**
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.protocol;
@@ -143,8 +143,10 @@
        throw new NotSupportedOldVersionPDUException("Replication Server Info",
          ProtocolVersion.REPLICATION_PROTOCOL_V1, buffer[0]);
      case MSG_TYPE_MODIFY:
        msg = new ModifyMsg(buffer);
      break;
      case MSG_TYPE_MODIFY_V1:
          msg = new ModifyMsg(buffer);
          msg = ModifyMsg.createV1(buffer);
      break;
      case MSG_TYPE_ADD:
      case MSG_TYPE_ADD_V1:
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -86,13 +86,15 @@
  // the threads calling add() method will be blocked if the size of
  // msgQueue becomes larger than the  queueHimark and will resume
  // only when the size of the msgQueue goes below queueLowmark.
  int queueHimark = 5000;
  int queueLowmark = 4000;
  int queueMaxSize = 5000;
  int queueLowmark = 1000;
  int queueHimark = 4000;
  // The queue himark and lowmark in bytes, this is set to 100 times the
  // himark and lowmark in number of updates.
  int queueHimarkBytes = 100 * queueHimark;
  int queueMaxBytes = 100 * queueMaxSize;
  int queueLowmarkBytes = 100 * queueLowmark;
  int queueHimarkBytes = 100 * queueHimark;
  // The number of bytes currently in the queue
  int queueByteSize = 0;
@@ -140,10 +142,12 @@
    serverId = id;
    this.baseDn = baseDn;
    trimage = replicationServer.getTrimage();
    queueHimark = queueSize;
    queueLowmark = queueSize * 4 / 5;
    queueHimarkBytes = 100 * queueHimark;
    queueLowmarkBytes = 100 * queueLowmark;
    queueMaxSize = queueSize;
    queueLowmark = queueSize * 1 / 5;
    queueHimark = queueSize * 4 / 5;
    queueMaxBytes = 200 * queueMaxSize;
    queueLowmarkBytes = 200 * queueLowmark;
    queueHimarkBytes = 200 * queueLowmark;
    db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
@@ -171,11 +175,14 @@
    synchronized (msgQueue)
    {
      int size = msgQueue.size();
      while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
      if ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
        msgQueue.notify();
      while ((size > queueMaxSize) || (queueByteSize > queueMaxBytes))
      {
        try
        {
          msgQueue.wait(500);
          msgQueue.wait(5000);
        } catch (InterruptedException e)
        {
          // simply loop to try again.
@@ -379,17 +386,22 @@
  {
    while (shutdown == false)
    {
      try {
      try
      {
        flush();
        trim();
        synchronized (this)
        synchronized (msgQueue)
        {
          try
          if ((msgQueue.size() < queueLowmark) &&
              (queueByteSize < queueLowmarkBytes))
          {
            this.wait(1000);
          } catch (InterruptedException e)
          { }
            try
            {
              msgQueue.wait(10000);
            } catch (InterruptedException e)
            { }
          }
        }
      } catch (Exception end)
      {
@@ -434,56 +446,59 @@
    int tries = 0;
    while ((tries++ < DEADLOCK_RETRIES) && (!done))
    {
      /* the trim is done by group in order to save some CPU and IO bandwidth
       * start the transaction then do a bunch of remove then commit
       */
      ReplServerDBCursor cursor;
      cursor = db.openDeleteCursor();
      try
      synchronized (flushLock)
      {
        while ((size < 5000 ) &&  (!finished))
        /* the trim is done by group in order to save some CPU and IO bandwidth
         * start the transaction then do a bunch of remove then commit
         */
        ReplServerDBCursor cursor;
        cursor = db.openDeleteCursor();
        try
        {
          ChangeNumber changeNumber = cursor.nextChangeNumber();
          if (changeNumber != null)
          while ((size < 5000 ) &&  (!finished))
          {
            if ((!changeNumber.equals(lastChange))
                && (changeNumber.older(trimDate)))
            ChangeNumber changeNumber = cursor.nextChangeNumber();
            if (changeNumber != null)
            {
              size++;
              cursor.delete();
              if ((!changeNumber.equals(lastChange))
                  && (changeNumber.older(trimDate)))
              {
                size++;
                cursor.delete();
              }
              else
              {
                firstChange = changeNumber;
                finished = true;
              }
            }
            else
            {
              firstChange = changeNumber;
              finished = true;
            }
          }
          else
            finished = true;
          cursor.close();
          done = true;
        }
        cursor.close();
        done = true;
      }
      catch (DeadlockException e)
      {
        cursor.abort();
        if (tries == DEADLOCK_RETRIES)
        catch (DeadlockException e)
        {
          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
          // shutdown the ReplicationServer.
          cursor.abort();
          if (tries == DEADLOCK_RETRIES)
          {
            // could not handle the Deadlock after DEADLOCK_RETRIES tries.
            // shutdown the ReplicationServer.
            shutdown = true;
            throw (e);
          }
        }
        catch (DatabaseException e)
        {
          // mark shutdown for this db so that we don't try again to
          // stop it from cursor.close() or methods called by cursor.close()
          shutdown = true;
          cursor.abort();
          throw (e);
        }
      }
      catch (DatabaseException e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        shutdown = true;
        cursor.abort();
        throw (e);
      }
    }
  }
@@ -493,7 +508,7 @@
  private void flush()
  {
    int size;
    int chunksize = (500 < queueHimark ? 500 : queueHimark);
    int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
    do
    {
@@ -630,9 +645,10 @@
    {
      msgQueue.clear();
      queueByteSize = 0;
      db.clear();
      firstChange = db.readFirstChange();
      lastChange = db.readLastChange();
    }
    db.clear();
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -626,12 +626,12 @@
      logError(errorMsg);
    } else if (sourceGroupId != groupId)
    {
      // Assured feature does not cross different group ids
      // Assured feature does not cross different group IDS
    } else
    {
      if ((generationId > 0) &&
        (generationId == sourceHandler.getGenerationId()))
        // Ignore assured updates from wrong generationid servers
        // Ignore assured updates from wrong generationId servers
      {
        if (sourceHandler.isLDAPserver())
        {
@@ -662,7 +662,7 @@
            }
          }
        } else
        { // A RS sent us the safe data message, for sure no futher acks to wait
        { // A RS sent us the safe data message, for sure no further ack to wait
          if (safeDataLevel == (byte) 1)
          {
            /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication;
@@ -978,7 +978,7 @@
      Message expectedMessage)
  {
    TaskState taskState = null;
    int cpt=10;
    int cpt=40;
    try
    {
      SearchFilter filter =
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;