mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.13.2007 1a26464190204cdcb6d66d4125dd5f058cfb08f8
Fix 2321 - Fixes specially  ReplLDIFInputStream and ReplLDIFOutput stream regarding different cases of entry sizes and buffer sizes
6 files modified
184 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java 50 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java 41 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 77 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 6 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java
@@ -45,6 +45,10 @@
  // The domain associated to this import.
  ReplicationDomain domain;
  private byte[] bytes;
  private int index;
  /**
   * Creates a new ReplLDIFInputStream that will import entries
   * for a synchronzation domain.
@@ -86,20 +90,50 @@
    if (closed)
      return -1;
    byte[] bytes = domain.receiveEntryBytes();
    int receivedLength;
    int copiedLength;
    if (bytes==null)
    if (bytes == null)
    {
      closed = true;
      return -1;
      // First time this method is called or the previous entry was
      // finished. Read a new entry and return it.
      bytes = domain.receiveEntryBytes();
      if (bytes==null)
      {
        closed = true;
        return -1;
      }
      receivedLength = bytes.length;
      index = 0;
    }
    else
    {
      // Some data was left from the previous entry, feed the read with this
      // data.
      receivedLength = bytes.length - index;
    }
    int l = bytes.length;
    for (int i =0; i<l; i++)
    if (receivedLength <= len)
    {
      b[off+i] = bytes[i];
      copiedLength = receivedLength;
    }
    return l;
    else
    {
      copiedLength = len;
    }
    for (int i =0; i<copiedLength; i++)
    {
      b[off+i] = bytes[index+i];
    }
    index += copiedLength;
    if (copiedLength <= len)
      bytes = null;
    return copiedLength;
  }
  /**
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -26,9 +26,12 @@
 */
package org.opends.server.replication.plugin;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.io.IOException;
import java.io.OutputStream;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.ServerConstants;
/**
@@ -38,6 +41,11 @@
public class ReplLDIFOutputStream
       extends OutputStream
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  // The synchronization domain on which the export is done
  ReplicationDomain domain;
@@ -75,21 +83,25 @@
  public void write(byte b[], int off, int len) throws IOException
  {
    int endOfEntryIndex;
    int startOfEntryIndex = off;
    int bytesToRead = len;
    int endIndex;
    String ebytes = "";
    ebytes = ebytes.concat(entryBuffer);
    entryBuffer = "";
    ebytes = ebytes.concat(new String(b, off, len));
    endIndex = ebytes.length();
    while (true)
    {
      // if we have the bytes for an entry, let's make an entry and send it
      String ebytes = new String(b,startOfEntryIndex,bytesToRead);
      endOfEntryIndex = ebytes.indexOf(ServerConstants.EOL +
          ServerConstants.EOL);
      if ( endOfEntryIndex >= 0 )
      {
        endOfEntryIndex += 2;
        entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex);
        entryBuffer = ebytes.substring(0, endOfEntryIndex);
        // Send the entry
        if ((numEntries>0) && (getNumExportedEntries() > numEntries))
@@ -100,16 +112,25 @@
        }
        domain.exportLDIFEntry(entryBuffer);
        numExportedEntries++;
        startOfEntryIndex = startOfEntryIndex + endOfEntryIndex;
        entryBuffer = "";
        bytesToRead -= endOfEntryIndex;
        if (bytesToRead==0)
        if (endIndex == endOfEntryIndex)
        {
          // no more data to process
          break;
        }
        else
        {
          // loop to the data of the next entry
          ebytes = ebytes.substring(endOfEntryIndex,
                                    endIndex);
          endIndex = ebytes.length();
        }
      }
      else
      {
        entryBuffer = new String(b, startOfEntryIndex, bytesToRead);
        // a next call to us will provide more bytes to make an entry
        entryBuffer = entryBuffer.concat(ebytes);
        break;
      }
    }
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -25,21 +25,19 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ToolMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.protocols.asn1.ASN1OctetString;
import static org.opends.server.util.ServerConstants.*;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
@@ -52,11 +50,12 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
import java.util.zip.Adler32;
import java.io.OutputStream;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
@@ -75,7 +74,9 @@
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.ModifyOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
@@ -84,7 +85,25 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.AddContext;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteContext;
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.HeartbeatMessage;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.InitializeTargetMessage;
import org.opends.server.replication.protocol.ModifyContext;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
@@ -281,11 +300,11 @@
     * Initializes the import/export counters with the provider value.
     * @param count The value with which to initialize the counters.
     */
    public void initImportExportCounters(long count)
    public void setCounters(long total, long left)
      throws DirectoryException
    {
      entryCount = count;
      entryLeftCount = count;
      entryCount = total;
      entryLeftCount = left;
      if (initializeTask != null)
      {
@@ -323,6 +342,15 @@
        }
      }
    }
    /**
     * {@inheritDoc}
     */
    public String toString()
    {
      return new String("[ Entry count=" + this.entryCount +
                        ", Entry left count=" + this.entryLeftCount + "]");
    }
  }
  /**
@@ -815,7 +843,7 @@
            if (debugEnabled())
              if (!(msg instanceof HeartbeatMessage))
                TRACER.debugInfo("Message received <" + msg + ">");
                TRACER.debugVerbose("Message received <" + msg + ">");
            if (msg instanceof AckMessage)
            {
@@ -2526,7 +2554,7 @@
        msg = broker.receive();
        if (debugEnabled())
          TRACER.debugInfo(
          TRACER.debugVerbose(
              " sid:" + this.serverId +
              " base DN:" + this.baseDN +
              " Import EntryBytes received " + msg);
@@ -2586,6 +2614,12 @@
    // FIXME TBD Treat the case where the error happens while entries
    // are being exported
    if (debugEnabled())
      TRACER.debugVerbose(
          " abandonImportExport:" + this.serverId +
          " base DN:" + this.baseDN +
          " Error Msg received " + errorMsg);
    if (ieContext != null)
    {
      ieContext.exception = new DirectoryException(ResultCode.OTHER,
@@ -2841,7 +2875,6 @@
    if (ieContext.checksumOutput == false)
    {
      // Actually send the entry
      EntryMessage entryMessage = new EntryMessage(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
      broker.publish(entryMessage);
@@ -3037,8 +3070,8 @@
      if (initTask != null)
      {
        ieContext.initializeTask = initTask;
        ieContext.initImportExportCounters(entryCount);
      }
      ieContext.setCounters(entryCount, entryCount);
      // Send start message to the peer
      InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
@@ -3132,7 +3165,8 @@
        ieContext.importSource = initializeMessage.getsenderID();
        ieContext.entryLeftCount = initializeMessage.getEntryCount();
        ieContext.initImportExportCounters(initializeMessage.getEntryCount());
        ieContext.setCounters(initializeMessage.getEntryCount(),
            initializeMessage.getEntryCount());
        preBackendImport(backend);
@@ -3151,9 +3185,6 @@
        // Process import
        backend.importLDIF(importConfig);
        if (debugEnabled())
          TRACER.debugInfo("The import has ended successfully on " +
              this.baseDN);
        stateSavingDisabled = false;
      }
    }
@@ -3174,6 +3205,8 @@
        // Re-enable backend
        closeBackendImport(backend);
        backend = retrievesBackend(baseDN);
      }
      // Update the task that initiated the import
opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
@@ -49,7 +49,7 @@
  // The tracer object for the debug logger
  private static final DebugTracer TRACER = getTracer();
  // Specifies the messageID built form the error that was detected
  // Specifies the messageID built from the error that was detected
  private int msgID;
  // Specifies the complementary details about the error that was detected
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,7 +42,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@@ -108,8 +108,8 @@
  /* This table is used to store the list of dn for which we are currently
   * handling servers.
   */
  private HashMap<DN, ReplicationCache> baseDNs =
          new HashMap<DN, ReplicationCache>();
  private ConcurrentHashMap<DN, ReplicationCache> baseDNs =
          new ConcurrentHashMap<DN, ReplicationCache>();
  private String localURL = "null";
  private boolean shutdown = false;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -428,6 +428,12 @@
   */
  private String[] newLDIFEntries()
  {
    // It is relevant to test ReplLDIFInputStream
    // and ReplLDIFOutputStream with big entries
    char bigAttributeValue[] = new char[30240];
    for (int i=0; i<bigAttributeValue.length; i++)
      bigAttributeValue[i] = Integer.toString(i).charAt(0);
    String[] entries =
    {
        "dn: dc=example,dc=com\n"
@@ -461,7 +467,7 @@
        + "cn: Robert Langman\n"
        + "sn: Langman\n"
        + "uid: robert\n"
        + "telephonenumber: +1 408 555 1213\n"
        + "telephonenumber: "+ new String(bigAttributeValue)+"\n"
        + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
        + "\n"
        };