From 74be925257cd0df68cfde1a77f77cbb930c7832f Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 24 Oct 2007 14:04:25 +0000
Subject: [PATCH] Fix 2425 - dsreplication initialize-all fails - fix ConcurrentModificationException in the Initialize task by using methods that lock the entry - fix unroutable message, by forwarding message only to the replication servers that have replica connected Miscellaneous improvements in error or debug traces
---
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 5
opends/src/server/org/opends/server/replication/server/ReplicationCache.java | 21 +++-
opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java | 8 -
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java | 14 ++
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 43 ++++++++--
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java | 3
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java | 77 ++----------------
opends/src/messages/messages/replication.properties | 3
opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java | 13 +++
9 files changed, 93 insertions(+), 94 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 47f7f6b..204b338 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -243,3 +243,6 @@
for domain %s with replication server %s - local data generation is %s \
- replication server data generation is %s - This may be only temporary \
or require a full resynchronization
+NOTICE_HEARTBEAT_FAILURE_97=%s is closing the session \
+ because it could not detect a heartbeat
+
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java b/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
index e86e485..f18dcbe 100644
--- a/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
+++ b/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
@@ -27,6 +27,8 @@
package org.opends.server.replication.plugin;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -114,11 +116,7 @@
if (now > lastReceiveTime + 2 * heartbeatInterval)
{
// Heartbeat is well overdue so the server is assumed to be dead.
- if (debugEnabled())
- {
- TRACER.debugInfo("Heartbeat monitor is closing the broker " +
- "session because it could not detect a heartbeat.");
- }
+ logError(NOTE_HEARTBEAT_FAILURE.get(this.currentThread().getName()));
session.close();
break;
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
index bed228e..af5f16e 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -45,7 +45,7 @@
long numEntries;
// The current number of entries exported
- long numExportedEntries;
+ private long numExportedEntries;
String entryBuffer = "";
/**
@@ -92,11 +92,11 @@
entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex);
// Send the entry
- if ((numEntries>0) && (numExportedEntries > numEntries))
+ if ((numEntries>0) && (getNumExportedEntries() > numEntries))
{
// This outputstream has reached the total number
// of entries to export.
- return;
+ throw(new IOException());
}
domain.exportLDIFEntry(entryBuffer);
numExportedEntries++;
@@ -114,4 +114,12 @@
}
}
}
+
+ /**
+ * Return the number of exported entries.
+ * @return the numExportedEntries
+ */
+ public long getNumExportedEntries() {
+ return numExportedEntries;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index 52e0ac4..871d567 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -524,8 +524,9 @@
if (heartbeatInterval > 0)
{
heartbeatMonitor =
- new HeartbeatMonitor("Replication Heartbeat Monitor", session,
- heartbeatInterval);
+ new HeartbeatMonitor("Replication Heartbeat Monitor on " +
+ baseDn + " with " + getReplicationServer(),
+ session, heartbeatInterval);
heartbeatMonitor.start();
}
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 608b7d8..78a088f 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -283,6 +283,7 @@
* @param count The value with which to initialize the counters.
*/
public void initImportExportCounters(long count)
+ throws DirectoryException
{
entryCount = count;
entryLeftCount = count;
@@ -307,6 +308,7 @@
* an import or export.
*/
public void updateCounters()
+ throws DirectoryException
{
entryLeftCount--;
@@ -344,7 +346,7 @@
public ReplicationDomain(ReplicationDomainCfg configuration)
throws ConfigException
{
- super("replication flush");
+ super("replicationDomain_" + configuration.getBaseDN());
// Read the configuration parameters.
replicationServers = configuration.getReplicationServer();
@@ -2536,7 +2538,10 @@
msg = broker.receive();
if (debugEnabled())
- TRACER.debugInfo("Import: EntryBytes received " + msg);
+ TRACER.debugInfo(
+ " sid:" + this.serverId +
+ " base DN:" + this.baseDN +
+ " Import EntryBytes received " + msg);
if (msg == null)
{
// The server is in the shutdown process
@@ -2750,11 +2755,20 @@
}
catch (DirectoryException de)
{
- Message message =
+ if ((ieContext != null) && (ieContext.checksumOutput) &&
+ (ros.getNumExportedEntries() >= ieContext.entryCount))
+ {
+ // This is the normal end when computing the generationId
+ // We can interrupt the export only by an IOException
+ }
+ else
+ {
+ Message message =
ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
- logError(message);
- throw new DirectoryException(
- ResultCode.OTHER, message, null);
+ logError(message);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, null);
+ }
}
catch (Exception e)
{
@@ -2843,7 +2857,14 @@
serverId, ieContext.exportTarget, lDIFEntry.getBytes());
broker.publish(entryMessage);
}
- ieContext.updateCounters();
+ try
+ {
+ ieContext.updateCounters();
+ }
+ catch (DirectoryException de)
+ {
+ throw new IOException(de);
+ }
}
/**
@@ -2857,7 +2878,8 @@
public void initializeFromRemote(short source, Task initTask)
throws DirectoryException
{
- // TRACER.debugInfo("Entering initializeFromRemote");
+ if (debugEnabled())
+ TRACER.debugInfo("Entering initializeFromRemote");
acquireIEContext();
ieContext.initializeTask = initTask;
@@ -2881,7 +2903,6 @@
public short decodeSource(String sourceString)
throws DirectoryException
{
- TRACER.debugInfo("Entering decodeSource");
short source = 0;
Throwable cause = null;
try
@@ -3140,7 +3161,9 @@
// Process import
backend.importLDIF(importConfig);
- TRACER.debugInfo("The import has ended successfully.");
+ if (debugEnabled())
+ TRACER.debugInfo("The import has ended successfully on " +
+ this.baseDN);
stateSavingDisabled = false;
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java b/opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java
index d956f3d..3b2e17a 100644
--- a/opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java
@@ -100,4 +100,17 @@
{
return this.senderID;
}
+
+ /**
+ * Returns a string representation of the message.
+ *
+ * @return the string representation of this message.
+ */
+ public String toString()
+ {
+ return "["+
+ this.getClass().getCanonicalName() +
+ " sender=" + this.senderID +
+ " destination=" + this.destination + "]";
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index d3f544d..3fa43f4 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -98,7 +98,8 @@
{
if (debugEnabled())
{
- TRACER.debugVerbose("Closing SocketSession.");
+ TRACER.debugInfo("Closing SocketSession." +
+ Thread.currentThread().getStackTrace());
}
if (plainSocket != null && !plainSocket.isClosed())
{
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index 279871a..89140e9 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -617,10 +617,14 @@
{
if (!senderHandler.isReplicationServer())
{
- // Send to all replicationServers
- for (ServerHandler destinationHandler : replicationServers.values())
+ // Send to all replication servers with a least one remote
+ // server connected
+ for (ServerHandler rsh : replicationServers.values())
{
- servers.add(destinationHandler);
+ if (!rsh.getRemoteLDAPServers().isEmpty())
+ {
+ servers.add(rsh);
+ }
}
}
@@ -651,6 +655,8 @@
{
for (ServerHandler h : replicationServers.values())
{
+ // Send to all replication servers with a least one remote
+ // server connected
if (h.isRemoteLDAPServer(msg.getDestination()))
{
servers.add(h);
@@ -696,13 +702,16 @@
{
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
- mb.append(" unreachable server ID=" + msg.getDestination());
- mb.append(" unroutable message =" + msg);
+ mb.append(" In Replication Server=" + this.replicationServer.
+ getMonitorInstanceName());
+ mb.append(" domain =" + this.baseDn);
+ mb.append(" unroutable message =" + msg.toString());
+ mb.append(" routing table is empty");
ErrorMessage errMsg = new ErrorMessage(
this.replicationServer.getServerId(),
msg.getsenderID(),
mb.toMessage());
-
+ logError(mb.toMessage());
try
{
senderHandler.send(errMsg);
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
index 2b8b8d9..1648256 100644
--- a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
+++ b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -32,26 +32,20 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
import java.util.List;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.messages.TaskMessages;
import org.opends.messages.Message;
-import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
-import org.opends.server.types.Modification;
-import org.opends.server.types.ModificationType;
import org.opends.server.types.ResultCode;
/**
@@ -122,8 +116,7 @@
String targetString = TaskUtils.getSingleValueString(attrList);
target = domain.decodeTarget(targetString);
- createCounterAttribute(ATTR_TASK_INITIALIZE_LEFT, 0);
- createCounterAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
+ setTotal(0);
}
/**
@@ -153,77 +146,27 @@
}
/**
- * Create attribute to store entry counters.
- * @param name The name of the attribute.
- * @param value The value to store for that attribute.
- */
- protected void createCounterAttribute(String name, long value)
- {
- AttributeType type;
- LinkedHashSet<AttributeValue> values =
- new LinkedHashSet<AttributeValue>();
-
- Entry taskEntry = getTaskEntry();
- try
- {
- type = getAttributeType(name, true);
- values.add(new AttributeValue(type,
- new ASN1OctetString(String.valueOf(value))));
- ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
- attrList.add(new Attribute(type, name,values));
- taskEntry.putAttribute(type, attrList);
- }
- finally
- {
- // taskScheduler.unlockEntry(taskEntryDN, lock);
- }
- }
-
- /**
* Set the total number of entries expected to be exported.
* @param total The total number of entries.
+ * @throws DirectoryException when a problem occurs
*/
- public void setTotal(long total)
+ public void setTotal(long total) throws DirectoryException
{
this.total = total;
- try
- {
- updateAttribute(ATTR_TASK_INITIALIZE_LEFT, total);
- updateAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
- }
- catch(Exception e) {}
+ replaceAttributeValue(ATTR_TASK_INITIALIZE_LEFT,
+ String.valueOf(total));
+ replaceAttributeValue(ATTR_TASK_INITIALIZE_DONE, String.valueOf(0));
}
/**
* Set the total number of entries still to be exported.
* @param left The total number of entries to be exported.
+ * @throws DirectoryException when a problem occurs
*/
- public void setLeft(long left)
+ public void setLeft(long left) throws DirectoryException
{
this.left = left;
- try
- {
- updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left);
- updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left);
- }
- catch(Exception e) {}
- }
-
- /**
- * Update an attribute for this task.
- * @param name The name of the attribute.
- * @param value The value.
- * @throws DirectoryException When an error occurs.
- */
- protected void updateAttribute(String name, long value)
- throws DirectoryException
- {
- Entry taskEntry = getTaskEntry();
-
- ArrayList<Modification> modifications = new ArrayList<Modification>();
- modifications.add(new Modification(ModificationType.REPLACE,
- new Attribute(name, String.valueOf(value))));
-
- taskEntry.applyModifications(modifications);
+ replaceAttributeValue(ATTR_TASK_INITIALIZE_LEFT, String.valueOf(left));
+ replaceAttributeValue(ATTR_TASK_INITIALIZE_DONE,String.valueOf(total-left));
}
}
--
Gitblit v1.10.0