mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noël Rouvignac
05.24.2016 13c6df397224971fb5ca3542ecb023b656e1a3c0
OPENDJ-3206 dsreplication initializeAll fails with no helpful error message if no other DS exist
2 files modified
76 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java 72 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/messages/org/opends/messages/replication.properties 4 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
@@ -16,6 +16,7 @@
 */
package org.opends.server.replication.service;
import static org.forgerock.opendj.ldap.ResultCode.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.common.AssuredMode.*;
import static org.opends.server.replication.common.StatusMachine.*;
@@ -27,6 +28,7 @@
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@@ -34,6 +36,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
@@ -45,6 +48,7 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.server.config.meta.ReplicationDomainCfgDefn.AssuredType;
import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
@@ -78,7 +82,6 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.forgerock.opendj.ldap.DN;
import org.opends.server.types.DirectoryException;
/**
@@ -1099,10 +1102,7 @@
    }
  }
  /**
   * This class contains the context related to an import or export launched on
   * the domain.
   */
  /** This class contains the context related to an import or export launched on the domain. */
  protected class ImportExportContext
  {
    /** The private task that initiated the operation. */
@@ -1153,8 +1153,7 @@
    private final Set<Integer> failureList = new HashSet<>(0);
    /**
     * Flow control during initialization: for each remote server, counter of
     * messages received.
     * Flow control during initialization: Map of remote serverId to number of messages received.
     */
    private final Map<Integer, Integer> ackVals = new HashMap<>();
    /** ServerId of the slowest server (the one with the smallest non null counter). */
@@ -1339,15 +1338,18 @@
        logger.trace("[IE] setAckVal[" + serverId + "]=" + numAck);
      }
      this.ackVals.put(serverId, numAck);
      ackVals.put(serverId, numAck);
      // Recompute the server with the minAck returned,means the slowest server.
      slowestServerId = serverId;
      for (Integer sid : importExportContext.get().ackVals.keySet())
      int minMsgReceived = ackVals.get(serverId);
      for (Entry<Integer, Integer> mapEntry : ackVals.entrySet())
      {
        if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
        int nbMsgReceived = mapEntry.getValue();
        if (nbMsgReceived < minMsgReceived)
        {
          slowestServerId = sid;
          slowestServerId = mapEntry.getKey();
          minMsgReceived = nbMsgReceived;
        }
      }
    }
@@ -1399,9 +1401,7 @@
    }
    catch (Exception e)
    {
      ResultCode resultCode = ResultCode.OTHER;
      LocalizableMessage message = ERR_INVALID_EXPORT_TARGET.get();
      throw new DirectoryException(resultCode, message, e);
      throw new DirectoryException(ResultCode.OTHER, ERR_INVALID_EXPORT_TARGET.get(), e);
    }
  }
@@ -1424,8 +1424,7 @@
   * @throws DirectoryException If it was not possible to publish the
   *                            Initialization message to the Topology.
   */
  public void initializeRemote(int target, Task initTask)
  throws DirectoryException
  public void initializeRemote(int target, Task initTask) throws DirectoryException
  {
    initializeRemote(target, getServerId(), initTask, getInitWindow());
  }
@@ -1460,15 +1459,21 @@
    - to update the task with the server(s) where this test failed
    */
    Map<Integer, DSInfo> replicaInfos = getReplicaInfos();
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
      if (replicaInfos.isEmpty())
      {
        throw new DirectoryException(UNWILLING_TO_PERFORM,
            ERR_FULL_UPDATE_NO_REMOTES.get(getBaseDN(), getServerId()));
      }
      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL,
          countEntries(), getBaseDN(), getServerId());
      ieCtx.startList.addAll(getReplicaInfos().keySet());
      ieCtx.startList.addAll(replicaInfos.keySet());
      // We manage the list of servers with which a flow control can be enabled
      for (DSInfo dsi : getReplicaInfos().values())
      for (DSInfo dsi : replicaInfos.values())
      {
        if (dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
@@ -1478,21 +1483,19 @@
    }
    else
    {
      DSInfo dsi = getDsInfoOrNull(replicaInfos.values(), serverToInitialize);
      if (dsi == null)
      {
        throw new DirectoryException(UNWILLING_TO_PERFORM,
            ERR_FULL_UPDATE_MISSING_REMOTE.get(getBaseDN(), getServerId(), serverToInitialize));
      }
      logger.info(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START, countEntries(),
          getBaseDN(), getServerId(), serverToInitialize);
      ieCtx.startList.add(serverToInitialize);
      // We manage the list of servers with which a flow control can be enabled
      for (DSInfo dsi : getReplicaInfos().values())
      {
        if (dsi.getDsId() == serverToInitialize &&
            dsi.getProtocolVersion()>= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          ieCtx.setAckVal(dsi.getDsId(), 0);
        }
      }
    }
    DirectoryException exportRootException = null;
@@ -1653,6 +1656,19 @@
    }
  }
  private DSInfo getDsInfoOrNull(Collection<DSInfo> replicaInfos, int serverToInitialize)
  {
    for (DSInfo dsi : replicaInfos)
    {
      if (dsi.getDsId() == serverToInitialize
          && dsi.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        return dsi;
      }
    }
    return null;
  }
  /**
   * For all remote servers in the start list:
   * - wait it has finished the import and present the expected generationID,
opendj-server-legacy/src/messages/org/opends/messages/replication.properties
@@ -592,3 +592,7 @@
INFO_CHANGELOG_FILTER_OUT_RECORD_BREAKING_ORDER_296=Filtering out from log file '%s' the record '%s' \
 because it would break ordering. Last key appended is '%s'.
ERR_UNRECOGNIZED_RECORD_VERSION_297=Cannot decode change-log record with version %x
ERR_FULL_UPDATE_NO_REMOTES_298=Cannot start total update \
 in domain "%s" from this directory server DS(%d): no remote directory servers exist
ERR_FULL_UPDATE_MISSING_REMOTE_299=Cannot start total update \
 in domain "%s" from this directory server DS(%d): the remote directory server DS(%d) is unknown