From a8a50d12008ef3b1471e4967125d36888c5cbdee Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 02 Nov 2007 14:13:02 +0000
Subject: [PATCH] Fix 2321 - Fixes specially  ReplLDIFInputStream and ReplLDIFOutput stream regarding different cases of entry sizes and buffer sizes

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java |   77 +++++++++++++++++++++++++++-----------
 1 files changed, 55 insertions(+), 22 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 012ea9b..01680b4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -25,21 +25,19 @@
  *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.messages.ToolMessages.*;
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.messages.ToolMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME;
 import static org.opends.server.replication.protocol.OperationContext.*;
+import static org.opends.server.util.ServerConstants.*;
 import static org.opends.server.util.StaticUtils.createEntry;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import org.opends.server.protocols.asn1.ASN1OctetString;
-import static org.opends.server.util.ServerConstants.*;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -52,11 +50,12 @@
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Adler32;
 import java.util.zip.CheckedOutputStream;
 import java.util.zip.DataFormatException;
-import java.util.zip.Adler32;
-import java.io.OutputStream;
 
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
 import org.opends.server.admin.std.server.ReplicationDomainCfg;
@@ -75,7 +74,9 @@
 import org.opends.server.core.ModifyDNOperationBasis;
 import org.opends.server.core.ModifyOperation;
 import org.opends.server.core.ModifyOperationBasis;
+import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.protocols.asn1.ASN1Exception;
+import org.opends.server.protocols.asn1.ASN1OctetString;
 import org.opends.server.protocols.internal.InternalClientConnection;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.LDAPAttribute;
@@ -84,7 +85,25 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ChangeNumberGenerator;
 import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.protocol.AckMessage;
+import org.opends.server.replication.protocol.AddContext;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteContext;
+import org.opends.server.replication.protocol.DoneMessage;
+import org.opends.server.replication.protocol.EntryMessage;
+import org.opends.server.replication.protocol.ErrorMessage;
+import org.opends.server.replication.protocol.HeartbeatMessage;
+import org.opends.server.replication.protocol.InitializeRequestMessage;
+import org.opends.server.replication.protocol.InitializeTargetMessage;
+import org.opends.server.replication.protocol.ModifyContext;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.replication.protocol.ModifyDnContext;
+import org.opends.server.replication.protocol.OperationContext;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMessage;
+import org.opends.server.replication.protocol.ResetGenerationId;
+import org.opends.server.replication.protocol.RoutableMessage;
+import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.tasks.InitializeTargetTask;
 import org.opends.server.tasks.InitializeTask;
 import org.opends.server.tasks.TaskUtils;
@@ -281,11 +300,11 @@
      * Initializes the import/export counters with the provider value.
      * @param count The value with which to initialize the counters.
      */
-    public void initImportExportCounters(long count)
+    public void setCounters(long total, long left)
       throws DirectoryException
     {
-      entryCount = count;
-      entryLeftCount = count;
+      entryCount = total;
+      entryLeftCount = left;
 
       if (initializeTask != null)
       {
@@ -323,6 +342,15 @@
         }
       }
     }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String toString()
+    {
+      return new String("[ Entry count=" + this.entryCount +
+                        ", Entry left count=" + this.entryLeftCount + "]");
+    }
   }
 
   /**
@@ -815,7 +843,7 @@
 
             if (debugEnabled())
               if (!(msg instanceof HeartbeatMessage))
-                TRACER.debugInfo("Message received <" + msg + ">");
+                TRACER.debugVerbose("Message received <" + msg + ">");
 
             if (msg instanceof AckMessage)
             {
@@ -2526,7 +2554,7 @@
         msg = broker.receive();
 
         if (debugEnabled())
-          TRACER.debugInfo(
+          TRACER.debugVerbose(
               " sid:" + this.serverId +
               " base DN:" + this.baseDN +
               " Import EntryBytes received " + msg);
@@ -2586,6 +2614,12 @@
     // FIXME TBD Treat the case where the error happens while entries
     // are being exported
 
+    if (debugEnabled())
+      TRACER.debugVerbose(
+          " abandonImportExport:" + this.serverId +
+          " base DN:" + this.baseDN +
+          " Error Msg received " + errorMsg);
+
     if (ieContext != null)
     {
       ieContext.exception = new DirectoryException(ResultCode.OTHER,
@@ -2841,7 +2875,6 @@
 
     if (ieContext.checksumOutput == false)
     {
-      // Actually send the entry
       EntryMessage entryMessage = new EntryMessage(
         serverId, ieContext.exportTarget, lDIFEntry.getBytes());
       broker.publish(entryMessage);
@@ -3037,8 +3070,8 @@
       if (initTask != null)
       {
         ieContext.initializeTask = initTask;
-        ieContext.initImportExportCounters(entryCount);
       }
+      ieContext.setCounters(entryCount, entryCount);
 
       // Send start message to the peer
       InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
@@ -3132,7 +3165,8 @@
 
         ieContext.importSource = initializeMessage.getsenderID();
         ieContext.entryLeftCount = initializeMessage.getEntryCount();
-        ieContext.initImportExportCounters(initializeMessage.getEntryCount());
+        ieContext.setCounters(initializeMessage.getEntryCount(),
+            initializeMessage.getEntryCount());
 
         preBackendImport(backend);
 
@@ -3151,9 +3185,6 @@
         // Process import
         backend.importLDIF(importConfig);
 
-        if (debugEnabled())
-          TRACER.debugInfo("The import has ended successfully on " +
-              this.baseDN);
         stateSavingDisabled = false;
       }
     }
@@ -3174,6 +3205,8 @@
 
         // Re-enable backend
         closeBackendImport(backend);
+
+        backend = retrievesBackend(baseDN);
       }
 
       // Update the task that initiated the import

--
Gitblit v1.10.0