From 1a26464190204cdcb6d66d4125dd5f058cfb08f8 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 02 Nov 2007 14:13:02 +0000
Subject: [PATCH] Fix 2321 - Fixes specially  ReplLDIFInputStream and ReplLDIFOutput stream regarding different cases of entry sizes and buffer sizes

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java               |    6 
 opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java            |   41 +++++++--
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java               |   77 +++++++++++++-----
 opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java             |   50 ++++++++++--
 opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java                  |    2 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java |    8 +
 6 files changed, 139 insertions(+), 45 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java
index d886480..532b815 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java
@@ -45,6 +45,10 @@
   // The domain associated to this import.
   ReplicationDomain domain;
 
+  private byte[] bytes;
+
+  private int index;
+
   /**
    * Creates a new ReplLDIFInputStream that will import entries
    * for a synchronzation domain.
@@ -86,20 +90,50 @@
     if (closed)
       return -1;
 
-    byte[] bytes = domain.receiveEntryBytes();
+    int receivedLength;
+    int copiedLength;
 
-    if (bytes==null)
+    if (bytes == null)
     {
-      closed = true;
-      return -1;
+      // First time this method is called or the previous entry was
+      // finished. Read a new entry and return it.
+      bytes = domain.receiveEntryBytes();
+
+      if (bytes==null)
+      {
+        closed = true;
+        return -1;
+      }
+
+      receivedLength = bytes.length;
+      index = 0;
+    }
+    else
+    {
+      // Some data was left from the previous entry, feed the read with this
+      // data.
+      receivedLength = bytes.length - index;
     }
 
-    int l = bytes.length;
-    for (int i =0; i<l; i++)
+    if (receivedLength <= len)
     {
-      b[off+i] = bytes[i];
+      copiedLength = receivedLength;
     }
-    return l;
+    else
+    {
+      copiedLength = len;
+    }
+
+    for (int i =0; i<copiedLength; i++)
+    {
+      b[off+i] = bytes[index+i];
+    }
+    index += copiedLength;
+
+    if (copiedLength <= len)
+      bytes = null;
+
+    return copiedLength;
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
index af5f16e..5d48ea1 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -26,9 +26,12 @@
  */
 package org.opends.server.replication.plugin;
 
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.util.ServerConstants;
 
 /**
@@ -38,6 +41,11 @@
 public class ReplLDIFOutputStream
        extends OutputStream
 {
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
   // The synchronization domain on which the export is done
   ReplicationDomain domain;
 
@@ -75,21 +83,25 @@
   public void write(byte b[], int off, int len) throws IOException
   {
     int endOfEntryIndex;
-    int startOfEntryIndex = off;
-    int bytesToRead = len;
+    int endIndex;
+
+    String ebytes = "";
+    ebytes = ebytes.concat(entryBuffer);
+    entryBuffer = "";
+
+    ebytes = ebytes.concat(new String(b, off, len));
+    endIndex = ebytes.length();
 
     while (true)
     {
       // if we have the bytes for an entry, let's make an entry and send it
-      String ebytes = new String(b,startOfEntryIndex,bytesToRead);
       endOfEntryIndex = ebytes.indexOf(ServerConstants.EOL +
           ServerConstants.EOL);
 
       if ( endOfEntryIndex >= 0 )
       {
-
         endOfEntryIndex += 2;
-        entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex);
+        entryBuffer = ebytes.substring(0, endOfEntryIndex);
 
         // Send the entry
         if ((numEntries>0) && (getNumExportedEntries() > numEntries))
@@ -100,16 +112,25 @@
         }
         domain.exportLDIFEntry(entryBuffer);
         numExportedEntries++;
-
-        startOfEntryIndex = startOfEntryIndex + endOfEntryIndex;
         entryBuffer = "";
-        bytesToRead -= endOfEntryIndex;
-        if (bytesToRead==0)
+
+        if (endIndex == endOfEntryIndex)
+        {
+          // no more data to process
           break;
+        }
+        else
+        {
+          // loop to the data of the next entry
+          ebytes = ebytes.substring(endOfEntryIndex,
+                                    endIndex);
+          endIndex = ebytes.length();
+        }
       }
       else
       {
-        entryBuffer = new String(b, startOfEntryIndex, bytesToRead);
+        // a next call to us will provide more bytes to make an entry
+        entryBuffer = entryBuffer.concat(ebytes);
         break;
       }
     }
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 012ea9b..01680b4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -25,21 +25,19 @@
  *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.messages.ToolMessages.*;
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.messages.ToolMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME;
 import static org.opends.server.replication.protocol.OperationContext.*;
+import static org.opends.server.util.ServerConstants.*;
 import static org.opends.server.util.StaticUtils.createEntry;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import org.opends.server.protocols.asn1.ASN1OctetString;
-import static org.opends.server.util.ServerConstants.*;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -52,11 +50,12 @@
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Adler32;
 import java.util.zip.CheckedOutputStream;
 import java.util.zip.DataFormatException;
-import java.util.zip.Adler32;
-import java.io.OutputStream;
 
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
@@ -75,7 +74,9 @@
 import org.opends.server.core.ModifyDNOperationBasis;
 import org.opends.server.core.ModifyOperation;
 import org.opends.server.core.ModifyOperationBasis;
+import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.protocols.asn1.ASN1Exception;
+import org.opends.server.protocols.asn1.ASN1OctetString;
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.LDAPAttribute;
@@ -84,7 +85,25 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ChangeNumberGenerator;
 import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.protocol.AckMessage;
+import org.opends.server.replication.protocol.AddContext;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteContext;
+import org.opends.server.replication.protocol.DoneMessage;
+import org.opends.server.replication.protocol.EntryMessage;
+import org.opends.server.replication.protocol.ErrorMessage;
+import org.opends.server.replication.protocol.HeartbeatMessage;
+import org.opends.server.replication.protocol.InitializeRequestMessage;
+import org.opends.server.replication.protocol.InitializeTargetMessage;
+import org.opends.server.replication.protocol.ModifyContext;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.replication.protocol.ModifyDnContext;
+import org.opends.server.replication.protocol.OperationContext;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMessage;
+import org.opends.server.replication.protocol.ResetGenerationId;
+import org.opends.server.replication.protocol.RoutableMessage;
+import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.tasks.InitializeTargetTask;
 import org.opends.server.tasks.InitializeTask;
 import org.opends.server.tasks.TaskUtils;
@@ -281,11 +300,11 @@
      * Initializes the import/export counters with the provider value.
      * @param count The value with which to initialize the counters.
      */
-    public void initImportExportCounters(long count)
+    public void setCounters(long total, long left)
       throws DirectoryException
     {
-      entryCount = count;
-      entryLeftCount = count;
+      entryCount = total;
+      entryLeftCount = left;
 
       if (initializeTask != null)
       {
@@ -323,6 +342,15 @@
         }
       }
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String toString()
+    {
+      return new String("[ Entry count=" + this.entryCount +
+                        ", Entry left count=" + this.entryLeftCount + "]");
+    }
   }
 
   /**
@@ -815,7 +843,7 @@
 
             if (debugEnabled())
               if (!(msg instanceof HeartbeatMessage))
-                TRACER.debugInfo("Message received <" + msg + ">");
+                TRACER.debugVerbose("Message received <" + msg + ">");
 
             if (msg instanceof AckMessage)
             {
@@ -2526,7 +2554,7 @@
         msg = broker.receive();
 
         if (debugEnabled())
-          TRACER.debugInfo(
+          TRACER.debugVerbose(
               " sid:" + this.serverId +
               " base DN:" + this.baseDN +
               " Import EntryBytes received " + msg);
@@ -2586,6 +2614,12 @@
     // FIXME TBD Treat the case where the error happens while entries
     // are being exported
 
+    if (debugEnabled())
+      TRACER.debugVerbose(
+          " abandonImportExport:" + this.serverId +
+          " base DN:" + this.baseDN +
+          " Error Msg received " + errorMsg);
+
     if (ieContext != null)
     {
       ieContext.exception = new DirectoryException(ResultCode.OTHER,
@@ -2841,7 +2875,6 @@
 
     if (ieContext.checksumOutput == false)
     {
-      // Actually send the entry
       EntryMessage entryMessage = new EntryMessage(
         serverId, ieContext.exportTarget, lDIFEntry.getBytes());
       broker.publish(entryMessage);
@@ -3037,8 +3070,8 @@
       if (initTask != null)
       {
         ieContext.initializeTask = initTask;
-        ieContext.initImportExportCounters(entryCount);
       }
+      ieContext.setCounters(entryCount, entryCount);
 
       // Send start message to the peer
       InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
@@ -3132,7 +3165,8 @@
 
         ieContext.importSource = initializeMessage.getsenderID();
         ieContext.entryLeftCount = initializeMessage.getEntryCount();
-        ieContext.initImportExportCounters(initializeMessage.getEntryCount());
+        ieContext.setCounters(initializeMessage.getEntryCount(),
+            initializeMessage.getEntryCount());
 
         preBackendImport(backend);
 
@@ -3151,9 +3185,6 @@
         // Process import
         backend.importLDIF(importConfig);
 
-        if (debugEnabled())
-          TRACER.debugInfo("The import has ended successfully on " +
-              this.baseDN);
         stateSavingDisabled = false;
       }
     }
@@ -3174,6 +3205,8 @@
 
         // Re-enable backend
         closeBackendImport(backend);
+
+        backend = retrievesBackend(baseDN);
       }
 
       // Update the task that initiated the import
diff --git a/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java b/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
index 3b7d2f0..77b0d23 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
@@ -49,7 +49,7 @@
   // The tracer object for the debug logger
   private static final DebugTracer TRACER = getTracer();
 
-  // Specifies the messageID built form the error that was detected
+  // Specifies the messageID built from the error that was detected
   private int msgID;
 
   // Specifies the complementary details about the error that was detected
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 1b28d7c..ac342b1 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,7 +42,7 @@
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -108,8 +108,8 @@
   /* This table is used to store the list of dn for which we are currently
    * handling servers.
    */
-  private HashMap<DN, ReplicationCache> baseDNs =
-          new HashMap<DN, ReplicationCache>();
+  private ConcurrentHashMap<DN, ReplicationCache> baseDNs =
+          new ConcurrentHashMap<DN, ReplicationCache>();
 
   private String localURL = "null";
   private boolean shutdown = false;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index fdac923..42ec433 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -428,6 +428,12 @@
    */
   private String[] newLDIFEntries()
   {
+    // It is relevant to test ReplLDIFInputStream
+    // and ReplLDIFOutputStream with big entries
+    char bigAttributeValue[] = new char[30240];
+    for (int i=0; i<bigAttributeValue.length; i++)
+      bigAttributeValue[i] = Integer.toString(i).charAt(0);
+    
     String[] entries =
     {
         "dn: dc=example,dc=com\n"
@@ -461,7 +467,7 @@
         + "cn: Robert Langman\n"
         + "sn: Langman\n"
         + "uid: robert\n"
-        + "telephonenumber: +1 408 555 1213\n"
+        + "telephonenumber: "+ new String(bigAttributeValue)+"\n"
         + "entryUUID: 21111111-1111-1111-1111-111111111114\n"
         + "\n"
         };

--
Gitblit v1.10.0