mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.36.2014 48312eb62361cc16c74cd7c68346c23db63a2161
OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR

ReplicationDomain.java:
Renamed inner class IEContext to ImportExportContext.
Renamed fields ieContext to importExportContext.
Renamed isRemoteDSConnected() to getConnectedRemoteDS().
4 files modified
83 ■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 15 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java 62 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 4 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -1488,7 +1488,7 @@
    // process:
    // This is an error termination during the import
    // The error is stored and the import is ended by returning null
    final IEContext ieCtx = getImportExportContext();
    final ImportExportContext ieCtx = getImportExportContext();
    LocalizableMessage msg = null;
    switch (importErrorMessageId)
    {
@@ -3689,16 +3689,16 @@
    Backend backend = getBackend();
    IEContext ieCtx = getImportExportContext();
    ImportExportContext ieCtx = getImportExportContext();
    try
    {
      if (!backend.supportsLDIFImport())
      {
        ieCtx.setExceptionIfNoneSet(new DirectoryException(OTHER,
            ERR_INIT_IMPORT_NOT_SUPPORTED.get(backend.getBackendID())));
        return;
      }
      else
      {
        importConfig = new LDIFImportConfig(input);
        List<DN> includeBranches = new ArrayList<DN>();
        includeBranches.add(getBaseDN());
@@ -3713,9 +3713,9 @@
        importErrorMessageId = -1;
        // TODO How to deal with rejected entries during the import
        importConfig.writeRejectedEntries(
            getFileForPath("logs" + File.separator +
            "replInitRejectedEntries").getAbsolutePath(),
      File rejectsFile =
          getFileForPath("logs" + File.separator + "replInitRejectedEntries");
      importConfig.writeRejectedEntries(rejectsFile.getAbsolutePath(),
            ExistingFileBehavior.OVERWRITE);
        // Process import
@@ -3724,7 +3724,6 @@
        stateSavingDisabled = false;
      }
    }
    catch(Exception e)
    {
      ieCtx.setExceptionIfNoneSet(new DirectoryException(ResultCode.OTHER,
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -242,8 +242,8 @@
   * The context related to an import or export being processed
   * Null when none is being processed.
   */
  private final AtomicReference<IEContext> ieContext =
      new AtomicReference<IEContext>();
  private final AtomicReference<ImportExportContext> importExportContext =
      new AtomicReference<ImportExportContext>();
  /**
   * The Thread waiting for incoming update messages for this domain and pushing
@@ -663,7 +663,7 @@
   * @return the info related to this remote server if it is connected,
   *                  null is the server is NOT connected.
   */
  private DSInfo isRemoteDSConnected(int dsId)
  private DSInfo getConnectedRemoteDS(int dsId)
  {
    return getReplicaInfos().get(dsId);
  }
@@ -814,7 +814,7 @@
        else if (msg instanceof ErrorMsg)
        {
          ErrorMsg errorMsg = (ErrorMsg)msg;
          IEContext ieCtx = ieContext.get();
          ImportExportContext ieCtx = importExportContext.get();
          if (ieCtx != null)
          {
            /*
@@ -867,7 +867,7 @@
        }
        else if (msg instanceof InitializeRcvAckMsg)
        {
          IEContext ieCtx = ieContext.get();
          ImportExportContext ieCtx = importExportContext.get();
          if (ieCtx != null)
          {
            InitializeRcvAckMsg ackMsg = (InitializeRcvAckMsg) msg;
@@ -1108,10 +1108,10 @@
  }
  /**
   * This class contain the context related to an import or export
   * launched on the domain.
   * This class contains the context related to an import or export launched on
   * the domain.
   */
  protected class IEContext
  protected class ImportExportContext
  {
    /** The private task that initiated the operation. */
    private Task initializeTask;
@@ -1190,7 +1190,7 @@
     *                         for and import, false if the IEContext
     *                         will be used for and export.
     */
    private IEContext(boolean importInProgress)
    private ImportExportContext(boolean importInProgress)
    {
      this.importInProgress = importInProgress;
      this.startTime = System.currentTimeMillis();
@@ -1356,7 +1356,7 @@
      // Recompute the server with the minAck returned,means the slowest server.
      slowestServerId = serverId;
      for (Integer sid : ieContext.get().ackVals.keySet())
      for (Integer sid : importExportContext.get().ackVals.keySet())
      {
        if (this.ackVals.get(sid) < this.ackVals.get(slowestServerId))
        {
@@ -1462,7 +1462,7 @@
      int serverRunningTheTask, Task initTask, int initWindow)
  throws DirectoryException
  {
    final IEContext ieCtx = acquireIEContext(false);
    final ImportExportContext ieCtx = acquireIEContext(false);
    /*
    We manage the list of servers to initialize in order :
@@ -1583,7 +1583,8 @@
              logger.trace(
                "[IE] Exporter wait for reconnection by the listener thread");
            int att=0;
            while (!broker.shuttingDown() && !broker.isConnected()
            while (!broker.shuttingDown()
                && !broker.isConnected()
                && ++att < 100)
            {
              try { Thread.sleep(100); }
@@ -1591,7 +1592,8 @@
            }
          }
          if (initTask != null && broker.isConnected()
          if (initTask != null
              && broker.isConnected()
              && serverToInitialize != RoutableMsg.ALL_SERVERS)
          {
            /*
@@ -1665,7 +1667,7 @@
   * - wait it has finished the import and present the expected generationID,
   * - build the failureList.
   */
  private void waitForRemoteStartOfInit(IEContext ieCtx)
  private void waitForRemoteStartOfInit(ImportExportContext ieCtx)
  {
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieCtx.startList);
@@ -1723,7 +1725,7 @@
   * - wait it has finished the import and present the expected generationID,
   * - build the failureList.
   */
  private void waitForRemoteEndOfInit(IEContext ieCtx)
  private void waitForRemoteEndOfInit(ImportExportContext ieCtx)
  {
    final Set<Integer> replicasWeAreWaitingFor =
        new HashSet<Integer>(ieCtx.startList);
@@ -1758,7 +1760,7 @@
          continue;
        }
        DSInfo dsInfo = isRemoteDSConnected(serverId);
        DSInfo dsInfo = getConnectedRemoteDS(serverId);
        if (dsInfo == null)
        {
          /*
@@ -1823,11 +1825,11 @@
   * Acquire and initialize the import/export context, verifying no other
   * import/export is in progress.
   */
  private IEContext acquireIEContext(boolean importInProgress)
  private ImportExportContext acquireIEContext(boolean importInProgress)
      throws DirectoryException
  {
    final IEContext ieCtx = new IEContext(importInProgress);
    if (!ieContext.compareAndSet(null, ieCtx))
    final ImportExportContext ieCtx = new ImportExportContext(importInProgress);
    if (!importExportContext.compareAndSet(null, ieCtx))
    {
      // Rejects 2 simultaneous exports
      LocalizableMessage message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
@@ -1838,7 +1840,7 @@
  private void releaseIEContext()
  {
    ieContext.set(null);
    importExportContext.set(null);
  }
  /**
@@ -1847,7 +1849,7 @@
   *
   * @param errorMsg The error message received.
   */
  private void processErrorMsg(ErrorMsg errorMsg, IEContext ieCtx)
  private void processErrorMsg(ErrorMsg errorMsg, ImportExportContext ieCtx)
  {
    //Exporting must not be stopped on the first error, if we run initialize-all
    if (ieCtx != null && ieCtx.exportTarget != RoutableMsg.ALL_SERVERS)
@@ -1885,7 +1887,7 @@
    ReplicationMsg msg;
    while (true)
    {
      IEContext ieCtx = ieContext.get();
      ImportExportContext ieCtx = importExportContext.get();
      try
      {
        // In the context of the total update, we don't want any automatic
@@ -1983,7 +1985,7 @@
          // Other messages received during an import are trashed except
          // the topologyMsg.
          if (msg instanceof TopologyMsg
              && isRemoteDSConnected(ieCtx.importSource) == null)
              && getConnectedRemoteDS(ieCtx.importSource) == null)
          {
            LocalizableMessage errMsg = ERR_INIT_EXPORTER_DISCONNECTION.get(
                getBaseDNString(), getServerId(), ieCtx.importSource);
@@ -2056,7 +2058,7 @@
          Arrays.toString(lDIFEntry));
    // build the message
    IEContext ieCtx = ieContext.get();
    ImportExportContext ieCtx = importExportContext.get();
    EntryMsg entryMessage = new EntryMsg(
        getServerId(), ieCtx.getExportTarget(), lDIFEntry, pos, length,
        ++ieCtx.msgCnt);
@@ -2075,7 +2077,7 @@
      }
      int slowestServerId = ieCtx.getSlowestServer();
      if (isRemoteDSConnected(slowestServerId)==null)
      if (getConnectedRemoteDS(slowestServerId) == null)
      {
        ieCtx.setException(new DirectoryException(ResultCode.OTHER,
            ERR_INIT_HEARTBEAT_LOST_DURING_EXPORT.get(ieCtx.getSlowestServer())));
@@ -2203,7 +2205,7 @@
      update the task.
      */
      final IEContext ieCtx = acquireIEContext(true);
      final ImportExportContext ieCtx = acquireIEContext(true);
      ieCtx.initializeTask = initTask;
      ieCtx.attemptCnt = 0;
      ieCtx.initReqMsgSent = new InitializeRequestMsg(
@@ -2263,7 +2265,7 @@
    int source = initTargetMsgReceived.getSenderID();
    IEContext ieCtx = ieContext.get();
    ImportExportContext ieCtx = importExportContext.get();
    try
    {
      // Log starting
@@ -2470,7 +2472,7 @@
   */
  public boolean ieRunning()
  {
    return ieContext.get() != null;
    return importExportContext.get() != null;
  }
  /**
@@ -3484,9 +3486,9 @@
   *
   * @return the Import/Export context associated to this ReplicationDomain
   */
  protected IEContext getImportExportContext()
  protected ImportExportContext getImportExportContext()
  {
    return ieContext.get();
    return importExportContext.get();
  }
  /**
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationMonitor.java
@@ -131,7 +131,7 @@
        String.valueOf(domain.getGenerationID())));
    // Add import/export monitoring attributes
    final IEContext ieContext = domain.getImportExportContext();
    final ImportExportContext ieContext = domain.getImportExportContext();
    if (ieContext != null)
    {
      addMonitorData(attributes, "total-update",
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -44,7 +44,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationDomain.IEContext;
import org.opends.server.replication.service.ReplicationDomain.*;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.testng.annotations.DataProvider;
@@ -444,7 +444,7 @@
  private long getLeftEntryCount(ReplicationDomain domain)
  {
    final IEContext ieContext = domain.getImportExportContext();
    final ImportExportContext ieContext = domain.getImportExportContext();
    if (ieContext != null)
    {
      return ieContext.getLeftEntryCount();