From 74be925257cd0df68cfde1a77f77cbb930c7832f Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 24 Oct 2007 14:04:25 +0000
Subject: [PATCH] Fix 2425 - dsreplication initialize-all fails - fix ConcurrentModificationException in the Initialize task by using methods that lock the entry - fix unroutable message, by forwarding message only to the replication servers that have   replica connected Miscellaneous improvements in error or debug traces

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java    |    5 
 opends/src/server/org/opends/server/replication/server/ReplicationCache.java     |   21 +++-
 opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java     |    8 -
 opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java |   14 ++
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java    |   43 ++++++++--
 opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java   |    3 
 opends/src/server/org/opends/server/tasks/InitializeTargetTask.java              |   77 ++----------------
 opends/src/messages/messages/replication.properties                              |    3 
 opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java    |   13 +++
 9 files changed, 93 insertions(+), 94 deletions(-)

diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 47f7f6b..204b338 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -243,3 +243,6 @@
  for domain %s with replication server %s - local data generation is %s \
  - replication server data generation is %s - This may be only temporary \
   or require a full resynchronization
+NOTICE_HEARTBEAT_FAILURE_97=%s is closing the session \
+ because it could not detect a heartbeat
+  
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java b/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
index e86e485..f18dcbe 100644
--- a/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
+++ b/opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
@@ -27,6 +27,8 @@
 
 package org.opends.server.replication.plugin;
 
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
@@ -114,11 +116,7 @@
         if (now > lastReceiveTime + 2 * heartbeatInterval)
         {
           // Heartbeat is well overdue so the server is assumed to be dead.
-          if (debugEnabled())
-          {
-            TRACER.debugInfo("Heartbeat monitor is closing the broker " +
-                "session because it could not detect a heartbeat.");
-          }
+          logError(NOTE_HEARTBEAT_FAILURE.get(this.currentThread().getName()));
           session.close();
           break;
         }
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
index bed228e..af5f16e 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -45,7 +45,7 @@
   long numEntries;
 
   // The current number of entries exported
-  long numExportedEntries;
+  private long numExportedEntries;
   String entryBuffer = "";
 
   /**
@@ -92,11 +92,11 @@
         entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex);
 
         // Send the entry
-        if ((numEntries>0) && (numExportedEntries > numEntries))
+        if ((numEntries>0) && (getNumExportedEntries() > numEntries))
         {
           // This outputstream has reached the total number
           // of entries to export.
-          return;
+          throw(new IOException());
         }
         domain.exportLDIFEntry(entryBuffer);
         numExportedEntries++;
@@ -114,4 +114,12 @@
       }
     }
   }
+
+  /**
+   * Return the number of exported entries.
+   * @return the numExportedEntries
+   */
+  public long getNumExportedEntries() {
+    return numExportedEntries;
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index 52e0ac4..871d567 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -524,8 +524,9 @@
     if (heartbeatInterval > 0)
     {
       heartbeatMonitor =
-           new HeartbeatMonitor("Replication Heartbeat Monitor", session,
-                                heartbeatInterval);
+           new HeartbeatMonitor("Replication Heartbeat Monitor on " +
+               baseDn + " with " + getReplicationServer(),
+               session, heartbeatInterval);
       heartbeatMonitor.start();
     }
   }
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 608b7d8..78a088f 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -283,6 +283,7 @@
      * @param count The value with which to initialize the counters.
      */
     public void initImportExportCounters(long count)
+      throws DirectoryException
     {
       entryCount = count;
       entryLeftCount = count;
@@ -307,6 +308,7 @@
      * an import or export.
      */
     public void updateCounters()
+      throws DirectoryException
     {
       entryLeftCount--;
 
@@ -344,7 +346,7 @@
   public ReplicationDomain(ReplicationDomainCfg configuration)
     throws ConfigException
   {
-    super("replication flush");
+    super("replicationDomain_" + configuration.getBaseDN());
 
     // Read the configuration parameters.
     replicationServers = configuration.getReplicationServer();
@@ -2536,7 +2538,10 @@
         msg = broker.receive();
 
         if (debugEnabled())
-          TRACER.debugInfo("Import: EntryBytes received " + msg);
+          TRACER.debugInfo(
+              " sid:" + this.serverId +
+              " base DN:" + this.baseDN +
+              " Import EntryBytes received " + msg);
         if (msg == null)
         {
           // The server is in the shutdown process
@@ -2750,11 +2755,20 @@
     }
     catch (DirectoryException de)
     {
-      Message message =
+      if ((ieContext != null) && (ieContext.checksumOutput) &&
+          (ros.getNumExportedEntries() >= ieContext.entryCount))
+      {
+        // This is the normal end when computing the generationId
+        // We can interrupt the export only by an IOException
+      }
+      else
+      {
+        Message message =
           ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
-      logError(message);
-      throw new DirectoryException(
-          ResultCode.OTHER, message, null);
+        logError(message);
+        throw new DirectoryException(
+            ResultCode.OTHER, message, null);
+      }
     }
     catch (Exception e)
     {
@@ -2843,7 +2857,14 @@
         serverId, ieContext.exportTarget, lDIFEntry.getBytes());
       broker.publish(entryMessage);
     }
-    ieContext.updateCounters();
+    try
+    {
+      ieContext.updateCounters();
+    }
+    catch (DirectoryException de)
+    {
+      throw new IOException(de);
+    }
   }
 
   /**
@@ -2857,7 +2878,8 @@
   public void initializeFromRemote(short source, Task initTask)
   throws DirectoryException
   {
-    // TRACER.debugInfo("Entering initializeFromRemote");
+    if (debugEnabled())
+      TRACER.debugInfo("Entering initializeFromRemote");
 
     acquireIEContext();
     ieContext.initializeTask = initTask;
@@ -2881,7 +2903,6 @@
   public short decodeSource(String sourceString)
   throws DirectoryException
   {
-    TRACER.debugInfo("Entering decodeSource");
     short  source = 0;
     Throwable cause = null;
     try
@@ -3140,7 +3161,9 @@
       // Process import
       backend.importLDIF(importConfig);
 
-      TRACER.debugInfo("The import has ended successfully.");
+      if (debugEnabled())
+        TRACER.debugInfo("The import has ended successfully on " +
+          this.baseDN);
       stateSavingDisabled = false;
 
     }
diff --git a/opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java b/opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java
index d956f3d..3b2e17a 100644
--- a/opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java
+++ b/opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java
@@ -100,4 +100,17 @@
   {
     return this.senderID;
   }
+
+  /**
+   * Returns a string representation of the message.
+   *
+   * @return the string representation of this message.
+   */
+  public String toString()
+  {
+    return "["+
+      this.getClass().getCanonicalName() +
+      " sender=" + this.senderID +
+      " destination=" + this.destination + "]";
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index d3f544d..3fa43f4 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -98,7 +98,8 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugVerbose("Closing SocketSession.");
+      TRACER.debugInfo("Closing SocketSession." +
+          Thread.currentThread().getStackTrace());
     }
     if (plainSocket != null && !plainSocket.isClosed())
     {
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index 279871a..89140e9 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -617,10 +617,14 @@
     {
       if (!senderHandler.isReplicationServer())
       {
-        // Send to all replicationServers
-        for (ServerHandler destinationHandler : replicationServers.values())
+        // Send to all replication servers with a least one remote
+        // server connected
+        for (ServerHandler rsh : replicationServers.values())
         {
-          servers.add(destinationHandler);
+          if (!rsh.getRemoteLDAPServers().isEmpty())
+          {
+            servers.add(rsh);
+          }
         }
       }
 
@@ -651,6 +655,8 @@
         {
           for (ServerHandler h : replicationServers.values())
           {
+            // Send to all replication servers with a least one remote
+            // server connected
             if (h.isRemoteLDAPServer(msg.getDestination()))
             {
               servers.add(h);
@@ -696,13 +702,16 @@
     {
       MessageBuilder mb = new MessageBuilder();
       mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
-      mb.append(" unreachable server ID=" + msg.getDestination());
-      mb.append(" unroutable message =" + msg);
+      mb.append(" In Replication Server=" + this.replicationServer.
+          getMonitorInstanceName());
+      mb.append(" domain =" + this.baseDn);
+      mb.append(" unroutable message =" + msg.toString());
+      mb.append(" routing table is empty");
       ErrorMessage errMsg = new ErrorMessage(
           this.replicationServer.getServerId(),
           msg.getsenderID(),
           mb.toMessage());
-
+      logError(mb.toMessage());
       try
       {
         senderHandler.send(errMsg);
diff --git a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
index 2b8b8d9..1648256 100644
--- a/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
+++ b/opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -32,26 +32,20 @@
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import org.opends.server.loggers.debug.DebugTracer;
 
-import java.util.ArrayList;
-import java.util.LinkedHashSet;
 import java.util.List;
 
 import org.opends.server.backends.task.Task;
 import org.opends.server.backends.task.TaskState;
 import org.opends.messages.TaskMessages;
 import org.opends.messages.Message;
-import org.opends.server.protocols.asn1.ASN1OctetString;
 import org.opends.server.replication.plugin.ReplicationDomain;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeValue;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 
 
-import org.opends.server.types.Modification;
-import org.opends.server.types.ModificationType;
 import org.opends.server.types.ResultCode;
 
 /**
@@ -122,8 +116,7 @@
     String targetString = TaskUtils.getSingleValueString(attrList);
     target = domain.decodeTarget(targetString);
 
-    createCounterAttribute(ATTR_TASK_INITIALIZE_LEFT, 0);
-    createCounterAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
+    setTotal(0);
   }
 
   /**
@@ -153,77 +146,27 @@
   }
 
   /**
-   * Create attribute to store entry counters.
-   * @param name The name of the attribute.
-   * @param value The value to store for that attribute.
-   */
-  protected void createCounterAttribute(String name, long value)
-  {
-    AttributeType type;
-    LinkedHashSet<AttributeValue> values =
-      new LinkedHashSet<AttributeValue>();
-
-    Entry taskEntry = getTaskEntry();
-    try
-    {
-      type = getAttributeType(name, true);
-      values.add(new AttributeValue(type,
-          new ASN1OctetString(String.valueOf(value))));
-      ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
-      attrList.add(new Attribute(type, name,values));
-      taskEntry.putAttribute(type, attrList);
-    }
-    finally
-    {
-      // taskScheduler.unlockEntry(taskEntryDN, lock);
-    }
-  }
-
-  /**
    * Set the total number of entries expected to be exported.
    * @param total The total number of entries.
+   * @throws DirectoryException when a problem occurs
    */
-  public void setTotal(long total)
+  public void setTotal(long total) throws DirectoryException
   {
     this.total = total;
-    try
-    {
-      updateAttribute(ATTR_TASK_INITIALIZE_LEFT, total);
-      updateAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
-    }
-    catch(Exception e) {}
+    replaceAttributeValue(ATTR_TASK_INITIALIZE_LEFT,
+        String.valueOf(total));
+    replaceAttributeValue(ATTR_TASK_INITIALIZE_DONE, String.valueOf(0));
   }
 
   /**
    * Set the total number of entries still to be exported.
    * @param left The total number of entries to be exported.
+   * @throws DirectoryException when a problem occurs
    */
-  public void setLeft(long left)
+  public void setLeft(long left)  throws DirectoryException
   {
     this.left = left;
-    try
-    {
-      updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left);
-      updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left);
-    }
-    catch(Exception e) {}
-  }
-
-  /**
-   * Update an attribute for this task.
-   * @param name The name of the attribute.
-   * @param value The value.
-   * @throws DirectoryException When an error occurs.
-   */
-  protected void updateAttribute(String name, long value)
-  throws DirectoryException
-  {
-    Entry taskEntry = getTaskEntry();
-
-    ArrayList<Modification> modifications = new ArrayList<Modification>();
-    modifications.add(new Modification(ModificationType.REPLACE,
-        new Attribute(name, String.valueOf(value))));
-
-    taskEntry.applyModifications(modifications);
+    replaceAttributeValue(ATTR_TASK_INITIALIZE_LEFT, String.valueOf(left));
+    replaceAttributeValue(ATTR_TASK_INITIALIZE_DONE,String.valueOf(total-left));
   }
 }

--
Gitblit v1.10.0