From a8a50d12008ef3b1471e4967125d36888c5cbdee Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 02 Nov 2007 14:13:02 +0000
Subject: [PATCH] Fix 2321 - Fixes specially ReplLDIFInputStream and ReplLDIFOutput stream regarding different cases of entry sizes and buffer sizes
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 77 +++++++++++++++++++++++++++-----------
1 files changed, 55 insertions(+), 22 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 012ea9b..01680b4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -25,21 +25,19 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.messages.ToolMessages.*;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.messages.ToolMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.replication.protocol.OperationContext.*;
+import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import org.opends.server.protocols.asn1.ASN1OctetString;
-import static org.opends.server.util.ServerConstants.*;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
@@ -52,11 +50,12 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
-import java.util.zip.Adler32;
-import java.io.OutputStream;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
@@ -75,7 +74,9 @@
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.ModifyOperationBasis;
+import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1Exception;
+import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
@@ -84,7 +85,25 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.protocol.AckMessage;
+import org.opends.server.replication.protocol.AddContext;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteContext;
+import org.opends.server.replication.protocol.DoneMessage;
+import org.opends.server.replication.protocol.EntryMessage;
+import org.opends.server.replication.protocol.ErrorMessage;
+import org.opends.server.replication.protocol.HeartbeatMessage;
+import org.opends.server.replication.protocol.InitializeRequestMessage;
+import org.opends.server.replication.protocol.InitializeTargetMessage;
+import org.opends.server.replication.protocol.ModifyContext;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.replication.protocol.ModifyDnContext;
+import org.opends.server.replication.protocol.OperationContext;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMessage;
+import org.opends.server.replication.protocol.ResetGenerationId;
+import org.opends.server.replication.protocol.RoutableMessage;
+import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
@@ -281,11 +300,11 @@
* Initializes the import/export counters with the provider value.
* @param count The value with which to initialize the counters.
*/
- public void initImportExportCounters(long count)
+ public void setCounters(long total, long left)
throws DirectoryException
{
- entryCount = count;
- entryLeftCount = count;
+ entryCount = total;
+ entryLeftCount = left;
if (initializeTask != null)
{
@@ -323,6 +342,15 @@
}
}
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public String toString()
+ {
+ return new String("[ Entry count=" + this.entryCount +
+ ", Entry left count=" + this.entryLeftCount + "]");
+ }
}
/**
@@ -815,7 +843,7 @@
if (debugEnabled())
if (!(msg instanceof HeartbeatMessage))
- TRACER.debugInfo("Message received <" + msg + ">");
+ TRACER.debugVerbose("Message received <" + msg + ">");
if (msg instanceof AckMessage)
{
@@ -2526,7 +2554,7 @@
msg = broker.receive();
if (debugEnabled())
- TRACER.debugInfo(
+ TRACER.debugVerbose(
" sid:" + this.serverId +
" base DN:" + this.baseDN +
" Import EntryBytes received " + msg);
@@ -2586,6 +2614,12 @@
// FIXME TBD Treat the case where the error happens while entries
// are being exported
+ if (debugEnabled())
+ TRACER.debugVerbose(
+ " abandonImportExport:" + this.serverId +
+ " base DN:" + this.baseDN +
+ " Error Msg received " + errorMsg);
+
if (ieContext != null)
{
ieContext.exception = new DirectoryException(ResultCode.OTHER,
@@ -2841,7 +2875,6 @@
if (ieContext.checksumOutput == false)
{
- // Actually send the entry
EntryMessage entryMessage = new EntryMessage(
serverId, ieContext.exportTarget, lDIFEntry.getBytes());
broker.publish(entryMessage);
@@ -3037,8 +3070,8 @@
if (initTask != null)
{
ieContext.initializeTask = initTask;
- ieContext.initImportExportCounters(entryCount);
}
+ ieContext.setCounters(entryCount, entryCount);
// Send start message to the peer
InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
@@ -3132,7 +3165,8 @@
ieContext.importSource = initializeMessage.getsenderID();
ieContext.entryLeftCount = initializeMessage.getEntryCount();
- ieContext.initImportExportCounters(initializeMessage.getEntryCount());
+ ieContext.setCounters(initializeMessage.getEntryCount(),
+ initializeMessage.getEntryCount());
preBackendImport(backend);
@@ -3151,9 +3185,6 @@
// Process import
backend.importLDIF(importConfig);
- if (debugEnabled())
- TRACER.debugInfo("The import has ended successfully on " +
- this.baseDN);
stateSavingDisabled = false;
}
}
@@ -3174,6 +3205,8 @@
// Re-enable backend
closeBackendImport(backend);
+
+ backend = retrievesBackend(baseDN);
}
// Update the task that initiated the import
--
Gitblit v1.10.0