From 1a26464190204cdcb6d66d4125dd5f058cfb08f8 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 02 Nov 2007 14:13:02 +0000
Subject: [PATCH] Fix 2321 - Fixes specially ReplLDIFInputStream and ReplLDIFOutput stream regarding different cases of entry sizes and buffer sizes
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 6
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java | 41 +++++++--
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 77 +++++++++++++-----
opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java | 50 ++++++++++--
opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java | 2
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java | 8 +
6 files changed, 139 insertions(+), 45 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java
index d886480..532b815 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFInputStream.java
@@ -45,6 +45,10 @@
// The domain associated to this import.
ReplicationDomain domain;
+ private byte[] bytes;
+
+ private int index;
+
/**
* Creates a new ReplLDIFInputStream that will import entries
* for a synchronzation domain.
@@ -86,20 +90,50 @@
if (closed)
return -1;
- byte[] bytes = domain.receiveEntryBytes();
+ int receivedLength;
+ int copiedLength;
- if (bytes==null)
+ if (bytes == null)
{
- closed = true;
- return -1;
+ // First time this method is called or the previous entry was
+ // finished. Read a new entry and return it.
+ bytes = domain.receiveEntryBytes();
+
+ if (bytes==null)
+ {
+ closed = true;
+ return -1;
+ }
+
+ receivedLength = bytes.length;
+ index = 0;
+ }
+ else
+ {
+ // Some data was left from the previous entry, feed the read with this
+ // data.
+ receivedLength = bytes.length - index;
}
- int l = bytes.length;
- for (int i =0; i<l; i++)
+ if (receivedLength <= len)
{
- b[off+i] = bytes[i];
+ copiedLength = receivedLength;
}
- return l;
+ else
+ {
+ copiedLength = len;
+ }
+
+ for (int i =0; i<copiedLength; i++)
+ {
+ b[off+i] = bytes[index+i];
+ }
+ index += copiedLength;
+
+ if (copiedLength <= len)
+ bytes = null;
+
+ return copiedLength;
}
/**
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
index af5f16e..5d48ea1 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -26,9 +26,12 @@
*/
package org.opends.server.replication.plugin;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
import java.io.IOException;
import java.io.OutputStream;
+import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.ServerConstants;
/**
@@ -38,6 +41,11 @@
public class ReplLDIFOutputStream
extends OutputStream
{
+ /**
+ * The tracer object for the debug logger.
+ */
+ private static final DebugTracer TRACER = getTracer();
+
// The synchronization domain on which the export is done
ReplicationDomain domain;
@@ -75,21 +83,25 @@
public void write(byte b[], int off, int len) throws IOException
{
int endOfEntryIndex;
- int startOfEntryIndex = off;
- int bytesToRead = len;
+ int endIndex;
+
+ String ebytes = "";
+ ebytes = ebytes.concat(entryBuffer);
+ entryBuffer = "";
+
+ ebytes = ebytes.concat(new String(b, off, len));
+ endIndex = ebytes.length();
while (true)
{
// if we have the bytes for an entry, let's make an entry and send it
- String ebytes = new String(b,startOfEntryIndex,bytesToRead);
endOfEntryIndex = ebytes.indexOf(ServerConstants.EOL +
ServerConstants.EOL);
if ( endOfEntryIndex >= 0 )
{
-
endOfEntryIndex += 2;
- entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex);
+ entryBuffer = ebytes.substring(0, endOfEntryIndex);
// Send the entry
if ((numEntries>0) && (getNumExportedEntries() > numEntries))
@@ -100,16 +112,25 @@
}
domain.exportLDIFEntry(entryBuffer);
numExportedEntries++;
-
- startOfEntryIndex = startOfEntryIndex + endOfEntryIndex;
entryBuffer = "";
- bytesToRead -= endOfEntryIndex;
- if (bytesToRead==0)
+
+ if (endIndex == endOfEntryIndex)
+ {
+ // no more data to process
break;
+ }
+ else
+ {
+ // loop to the data of the next entry
+ ebytes = ebytes.substring(endOfEntryIndex,
+ endIndex);
+ endIndex = ebytes.length();
+ }
}
else
{
- entryBuffer = new String(b, startOfEntryIndex, bytesToRead);
+ // a next call to us will provide more bytes to make an entry
+ entryBuffer = entryBuffer.concat(ebytes);
break;
}
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 012ea9b..01680b4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -25,21 +25,19 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.messages.ToolMessages.*;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.messages.ToolMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.replication.protocol.OperationContext.*;
+import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import org.opends.server.protocols.asn1.ASN1OctetString;
-import static org.opends.server.util.ServerConstants.*;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
@@ -52,11 +50,12 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
-import java.util.zip.Adler32;
-import java.io.OutputStream;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
@@ -75,7 +74,9 @@
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.ModifyOperationBasis;
+import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1Exception;
+import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
@@ -84,7 +85,25 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.protocol.AckMessage;
+import org.opends.server.replication.protocol.AddContext;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteContext;
+import org.opends.server.replication.protocol.DoneMessage;
+import org.opends.server.replication.protocol.EntryMessage;
+import org.opends.server.replication.protocol.ErrorMessage;
+import org.opends.server.replication.protocol.HeartbeatMessage;
+import org.opends.server.replication.protocol.InitializeRequestMessage;
+import org.opends.server.replication.protocol.InitializeTargetMessage;
+import org.opends.server.replication.protocol.ModifyContext;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.replication.protocol.ModifyDnContext;
+import org.opends.server.replication.protocol.OperationContext;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMessage;
+import org.opends.server.replication.protocol.ResetGenerationId;
+import org.opends.server.replication.protocol.RoutableMessage;
+import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
@@ -281,11 +300,11 @@
* Initializes the import/export counters with the provider value.
* @param count The value with which to initialize the counters.
*/
- public void initImportExportCounters(long count)
+ public void setCounters(long total, long left)
throws DirectoryException
{
- entryCount = count;
- entryLeftCount = count;
+ entryCount = total;
+ entryLeftCount = left;
if (initializeTask != null)
{
@@ -323,6 +342,15 @@
}
}
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public String toString()
+ {
+ return new String("[ Entry count=" + this.entryCount +
+ ", Entry left count=" + this.entryLeftCount + "]");
+ }
}
/**
@@ -815,7 +843,7 @@
if (debugEnabled())
if (!(msg instanceof HeartbeatMessage))
- TRACER.debugInfo("Message received <" + msg + ">");
+ TRACER.debugVerbose("Message received <" + msg + ">");
if (msg instanceof AckMessage)
{
@@ -2526,7 +2554,7 @@
msg = broker.receive();
if (debugEnabled())
- TRACER.debugInfo(
+ TRACER.debugVerbose(
" sid:" + this.serverId +
" base DN:" + this.baseDN +
" Import EntryBytes received " + msg);
@@ -2586,6 +2614,12 @@
// FIXME TBD Treat the case where the error happens while entries
// are being exported
+ if (debugEnabled())
+ TRACER.debugVerbose(
+ " abandonImportExport:" + this.serverId +
+ " base DN:" + this.baseDN +
+ " Error Msg received " + errorMsg);
+
if (ieContext != null)
{
ieContext.exception = new DirectoryException(ResultCode.OTHER,
@@ -2841,7 +2875,6 @@
if (ieContext.checksumOutput == false)
{
- // Actually send the entry
EntryMessage entryMessage = new EntryMessage(
serverId, ieContext.exportTarget, lDIFEntry.getBytes());
broker.publish(entryMessage);
@@ -3037,8 +3070,8 @@
if (initTask != null)
{
ieContext.initializeTask = initTask;
- ieContext.initImportExportCounters(entryCount);
}
+ ieContext.setCounters(entryCount, entryCount);
// Send start message to the peer
InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
@@ -3132,7 +3165,8 @@
ieContext.importSource = initializeMessage.getsenderID();
ieContext.entryLeftCount = initializeMessage.getEntryCount();
- ieContext.initImportExportCounters(initializeMessage.getEntryCount());
+ ieContext.setCounters(initializeMessage.getEntryCount(),
+ initializeMessage.getEntryCount());
preBackendImport(backend);
@@ -3151,9 +3185,6 @@
// Process import
backend.importLDIF(importConfig);
- if (debugEnabled())
- TRACER.debugInfo("The import has ended successfully on " +
- this.baseDN);
stateSavingDisabled = false;
}
}
@@ -3174,6 +3205,8 @@
// Re-enable backend
closeBackendImport(backend);
+
+ backend = retrievesBackend(baseDN);
}
// Update the task that initiated the import
diff --git a/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java b/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
index 3b7d2f0..77b0d23 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
@@ -49,7 +49,7 @@
// The tracer object for the debug logger
private static final DebugTracer TRACER = getTracer();
- // Specifies the messageID built form the error that was detected
+ // Specifies the messageID built from the error that was detected
private int msgID;
// Specifies the complementary details about the error that was detected
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 1b28d7c..ac342b1 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,7 +42,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
@@ -108,8 +108,8 @@
/* This table is used to store the list of dn for which we are currently
* handling servers.
*/
- private HashMap<DN, ReplicationCache> baseDNs =
- new HashMap<DN, ReplicationCache>();
+ private ConcurrentHashMap<DN, ReplicationCache> baseDNs =
+ new ConcurrentHashMap<DN, ReplicationCache>();
private String localURL = "null";
private boolean shutdown = false;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index fdac923..42ec433 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -428,6 +428,12 @@
*/
private String[] newLDIFEntries()
{
+ // It is relevant to test ReplLDIFInputStream
+ // and ReplLDIFOutputStream with big entries
+ char bigAttributeValue[] = new char[30240];
+ for (int i=0; i<bigAttributeValue.length; i++)
+ bigAttributeValue[i] = Integer.toString(i).charAt(0);
+
String[] entries =
{
"dn: dc=example,dc=com\n"
@@ -461,7 +467,7 @@
+ "cn: Robert Langman\n"
+ "sn: Langman\n"
+ "uid: robert\n"
- + "telephonenumber: +1 408 555 1213\n"
+ + "telephonenumber: "+ new String(bigAttributeValue)+"\n"
+ "entryUUID: 21111111-1111-1111-1111-111111111114\n"
+ "\n"
};
--
Gitblit v1.10.0