mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
24.04.2007 74be925257cd0df68cfde1a77f77cbb930c7832f
Fix 2425 - dsreplication initialize-all fails
- fix ConcurrentModificationException in the Initialize task by using methods that lock the entry
- fix unroutable message, by forwarding message only to the replication servers that have
replica connected
Miscellaneous improvements in error or debug traces

9 files modified
187 ■■■■ changed files
opends/src/messages/messages/replication.properties 3 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java 14 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 43 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationCache.java 21 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java 77 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -243,3 +243,6 @@
 for domain %s with replication server %s - local data generation is %s \
 - replication server data generation is %s - This may be only temporary \
  or require a full resynchronization
NOTICE_HEARTBEAT_FAILURE_97=%s is closing the session \
 because it could not detect a heartbeat
opends/src/server/org/opends/server/replication/plugin/HeartbeatMonitor.java
@@ -27,6 +27,8 @@
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -114,11 +116,7 @@
        if (now > lastReceiveTime + 2 * heartbeatInterval)
        {
          // Heartbeat is well overdue so the server is assumed to be dead.
          if (debugEnabled())
          {
            TRACER.debugInfo("Heartbeat monitor is closing the broker " +
                "session because it could not detect a heartbeat.");
          }
          logError(NOTE_HEARTBEAT_FAILURE.get(this.currentThread().getName()));
          session.close();
          break;
        }
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -45,7 +45,7 @@
  long numEntries;
  // The current number of entries exported
  long numExportedEntries;
  private long numExportedEntries;
  String entryBuffer = "";
  /**
@@ -92,11 +92,11 @@
        entryBuffer = entryBuffer + ebytes.substring(0, endOfEntryIndex);
        // Send the entry
        if ((numEntries>0) && (numExportedEntries > numEntries))
        if ((numEntries>0) && (getNumExportedEntries() > numEntries))
        {
          // This outputstream has reached the total number
          // of entries to export.
          return;
          throw(new IOException());
        }
        domain.exportLDIFEntry(entryBuffer);
        numExportedEntries++;
@@ -114,4 +114,12 @@
      }
    }
  }
  /**
   * Return the number of exported entries.
   * @return the numExportedEntries
   */
  public long getNumExportedEntries() {
    return numExportedEntries;
  }
}
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -524,8 +524,9 @@
    if (heartbeatInterval > 0)
    {
      heartbeatMonitor =
           new HeartbeatMonitor("Replication Heartbeat Monitor", session,
                                heartbeatInterval);
           new HeartbeatMonitor("Replication Heartbeat Monitor on " +
               baseDn + " with " + getReplicationServer(),
               session, heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -283,6 +283,7 @@
     * @param count The value with which to initialize the counters.
     */
    public void initImportExportCounters(long count)
      throws DirectoryException
    {
      entryCount = count;
      entryLeftCount = count;
@@ -307,6 +308,7 @@
     * an import or export.
     */
    public void updateCounters()
      throws DirectoryException
    {
      entryLeftCount--;
@@ -344,7 +346,7 @@
  public ReplicationDomain(ReplicationDomainCfg configuration)
    throws ConfigException
  {
    super("replication flush");
    super("replicationDomain_" + configuration.getBaseDN());
    // Read the configuration parameters.
    replicationServers = configuration.getReplicationServer();
@@ -2536,7 +2538,10 @@
        msg = broker.receive();
        if (debugEnabled())
          TRACER.debugInfo("Import: EntryBytes received " + msg);
          TRACER.debugInfo(
              " sid:" + this.serverId +
              " base DN:" + this.baseDN +
              " Import EntryBytes received " + msg);
        if (msg == null)
        {
          // The server is in the shutdown process
@@ -2750,11 +2755,20 @@
    }
    catch (DirectoryException de)
    {
      Message message =
      if ((ieContext != null) && (ieContext.checksumOutput) &&
          (ros.getNumExportedEntries() >= ieContext.entryCount))
      {
        // This is the normal end when computing the generationId
        // We can interrupt the export only by an IOException
      }
      else
      {
        Message message =
          ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
      logError(message);
      throw new DirectoryException(
          ResultCode.OTHER, message, null);
        logError(message);
        throw new DirectoryException(
            ResultCode.OTHER, message, null);
      }
    }
    catch (Exception e)
    {
@@ -2843,7 +2857,14 @@
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
      broker.publish(entryMessage);
    }
    ieContext.updateCounters();
    try
    {
      ieContext.updateCounters();
    }
    catch (DirectoryException de)
    {
      throw new IOException(de);
    }
  }
  /**
@@ -2857,7 +2878,8 @@
  public void initializeFromRemote(short source, Task initTask)
  throws DirectoryException
  {
    // TRACER.debugInfo("Entering initializeFromRemote");
    if (debugEnabled())
      TRACER.debugInfo("Entering initializeFromRemote");
    acquireIEContext();
    ieContext.initializeTask = initTask;
@@ -2881,7 +2903,6 @@
  public short decodeSource(String sourceString)
  throws DirectoryException
  {
    TRACER.debugInfo("Entering decodeSource");
    short  source = 0;
    Throwable cause = null;
    try
@@ -3140,7 +3161,9 @@
      // Process import
      backend.importLDIF(importConfig);
      TRACER.debugInfo("The import has ended successfully.");
      if (debugEnabled())
        TRACER.debugInfo("The import has ended successfully on " +
          this.baseDN);
      stateSavingDisabled = false;
    }
opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java
@@ -100,4 +100,17 @@
  {
    return this.senderID;
  }
  /**
   * Returns a string representation of the message.
   *
   * @return the string representation of this message.
   */
  public String toString()
  {
    return "["+
      this.getClass().getCanonicalName() +
      " sender=" + this.senderID +
      " destination=" + this.destination + "]";
  }
}
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -98,7 +98,8 @@
  {
    if (debugEnabled())
    {
      TRACER.debugVerbose("Closing SocketSession.");
      TRACER.debugInfo("Closing SocketSession." +
          Thread.currentThread().getStackTrace());
    }
    if (plainSocket != null && !plainSocket.isClosed())
    {
opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -617,10 +617,14 @@
    {
      if (!senderHandler.isReplicationServer())
      {
        // Send to all replicationServers
        for (ServerHandler destinationHandler : replicationServers.values())
        // Send to all replication servers with a least one remote
        // server connected
        for (ServerHandler rsh : replicationServers.values())
        {
          servers.add(destinationHandler);
          if (!rsh.getRemoteLDAPServers().isEmpty())
          {
            servers.add(rsh);
          }
        }
      }
@@ -651,6 +655,8 @@
        {
          for (ServerHandler h : replicationServers.values())
          {
            // Send to all replication servers with a least one remote
            // server connected
            if (h.isRemoteLDAPServer(msg.getDestination()))
            {
              servers.add(h);
@@ -696,13 +702,16 @@
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
      mb.append(" unreachable server ID=" + msg.getDestination());
      mb.append(" unroutable message =" + msg);
      mb.append(" In Replication Server=" + this.replicationServer.
          getMonitorInstanceName());
      mb.append(" domain =" + this.baseDn);
      mb.append(" unroutable message =" + msg.toString());
      mb.append(" routing table is empty");
      ErrorMessage errMsg = new ErrorMessage(
          this.replicationServer.getServerId(),
          msg.getsenderID(),
          mb.toMessage());
      logError(mb.toMessage());
      try
      {
        senderHandler.send(errMsg);
opends/src/server/org/opends/server/tasks/InitializeTargetTask.java
@@ -32,26 +32,20 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import org.opends.server.backends.task.Task;
import org.opends.server.backends.task.TaskState;
import org.opends.messages.TaskMessages;
import org.opends.messages.Message;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ResultCode;
/**
@@ -122,8 +116,7 @@
    String targetString = TaskUtils.getSingleValueString(attrList);
    target = domain.decodeTarget(targetString);
    createCounterAttribute(ATTR_TASK_INITIALIZE_LEFT, 0);
    createCounterAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
    setTotal(0);
  }
  /**
@@ -153,77 +146,27 @@
  }
  /**
   * Create attribute to store entry counters.
   * @param name The name of the attribute.
   * @param value The value to store for that attribute.
   */
  protected void createCounterAttribute(String name, long value)
  {
    AttributeType type;
    LinkedHashSet<AttributeValue> values =
      new LinkedHashSet<AttributeValue>();
    Entry taskEntry = getTaskEntry();
    try
    {
      type = getAttributeType(name, true);
      values.add(new AttributeValue(type,
          new ASN1OctetString(String.valueOf(value))));
      ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
      attrList.add(new Attribute(type, name,values));
      taskEntry.putAttribute(type, attrList);
    }
    finally
    {
      // taskScheduler.unlockEntry(taskEntryDN, lock);
    }
  }
  /**
   * Set the total number of entries expected to be exported.
   * @param total The total number of entries.
   * @throws DirectoryException when a problem occurs
   */
  public void setTotal(long total)
  public void setTotal(long total) throws DirectoryException
  {
    this.total = total;
    try
    {
      updateAttribute(ATTR_TASK_INITIALIZE_LEFT, total);
      updateAttribute(ATTR_TASK_INITIALIZE_DONE, 0);
    }
    catch(Exception e) {}
    replaceAttributeValue(ATTR_TASK_INITIALIZE_LEFT,
        String.valueOf(total));
    replaceAttributeValue(ATTR_TASK_INITIALIZE_DONE, String.valueOf(0));
  }
  /**
   * Set the total number of entries still to be exported.
   * @param left The total number of entries to be exported.
   * @throws DirectoryException when a problem occurs
   */
  public void setLeft(long left)
  public void setLeft(long left)  throws DirectoryException
  {
    this.left = left;
    try
    {
      updateAttribute(ATTR_TASK_INITIALIZE_LEFT, left);
      updateAttribute(ATTR_TASK_INITIALIZE_DONE, total-left);
    }
    catch(Exception e) {}
  }
  /**
   * Update an attribute for this task.
   * @param name The name of the attribute.
   * @param value The value.
   * @throws DirectoryException When an error occurs.
   */
  protected void updateAttribute(String name, long value)
  throws DirectoryException
  {
    Entry taskEntry = getTaskEntry();
    ArrayList<Modification> modifications = new ArrayList<Modification>();
    modifications.add(new Modification(ModificationType.REPLACE,
        new Attribute(name, String.valueOf(value))));
    taskEntry.applyModifications(modifications);
    replaceAttributeValue(ATTR_TASK_INITIALIZE_LEFT, String.valueOf(left));
    replaceAttributeValue(ATTR_TASK_INITIALIZE_DONE,String.valueOf(total-left));
  }
}