mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
23.58.2008 9a3dcf4b902b7f38c070a97cf5dc217cfccc9651
Fix for 3488 : Replication initialization or quicksetup sometime fails

The problem was that after a full update the ReplicationDomain restarts
the broker before reloading the serverState and generationID.

Therefore if the session establishment phase was faster than the initialize thread
the generationID that was sent to the ReplicationServer was the old one.

The solution is to move the reload of the serverState and generationID
before the broker.restart() in ReplicationDomain.initialize()

9 files modified
118 ■■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java 3 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 91 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java 1 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 1 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 1 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java 2 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java 6 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java 7 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java 6 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -103,7 +103,8 @@
          // of entries to export.
          throw(new IOException());
        }
        domain.exportLDIFEntry(entryBuffer);
        if (numEntries<0)
          domain.exportLDIFEntry(entryBuffer);
        numExportedEntries++;
        entryBuffer = "";
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -290,11 +290,8 @@
    // The count for the entry not yet processed
    long entryLeftCount = 0;
    boolean checksumOutput = false;
    // The exception raised when any
    DirectoryException exception = null;
    long checksumOutputValue = (long)0;
    /**
     * Initializes the import/export counters with the provider value.
@@ -2320,20 +2317,11 @@
   */
  public long computeGenerationId() throws DirectoryException
  {
    Backend backend = retrievesBackend(baseDN);
    long bec = backend.numSubordinates(baseDN, true) + 1;
    this.acquireIEContext();
    ieContext.checksumOutput = true;
    ieContext.entryCount = (bec<1000?bec:1000);
    ieContext.entryLeftCount = ieContext.entryCount;
    exportBackend();
    long genId = ieContext.checksumOutputValue;
    long genId = exportBackend(true);
    if (debugEnabled())
      TRACER.debugInfo("Computed generationId: #entries=" + bec +
               " generationId=" + ieContext.checksumOutputValue);
    ieContext.checksumOutput = false;
    this.releaseIEContext();
      TRACER.debugInfo("Computed generationId: generationId=" + genId);
    return genId;
  }
@@ -2729,15 +2717,22 @@
  }
  /**
   * Export the entries from the backend.
   * Export the entries from the backend and/or compute the generation ID.
   * The ieContext must have been set before calling.
   * @param checksumOutput true is the exportBackend is called to compute
   *                       the generationID
   *
   * @return The computed  generationID.
   *
   * @throws DirectoryException when an error occurred
   */
  protected void exportBackend()
  protected long exportBackend(boolean checksumOutput)
  throws DirectoryException
  {
    long genID = 0;
    Backend backend = retrievesBackend(this.baseDN);
    long bec = backend.numSubordinates(baseDN, true) + 1;
    long entryCount = (bec<1000?bec:1000);
    //  Acquire a shared lock for the backend.
    try
@@ -2766,9 +2761,9 @@
    OutputStream os;
    ReplLDIFOutputStream ros;
    if (ieContext.checksumOutput)
    if (checksumOutput)
    {
      ros = new ReplLDIFOutputStream(this, ieContext.entryCount);
      ros = new ReplLDIFOutputStream(this, entryCount);
      os = new CheckedOutputStream(ros, new Adler32());
      try
      {
@@ -2793,7 +2788,7 @@
    exportConfig.setIncludeBranches(includeBranches);
    // For the checksum computing mode, only consider the 'stable' attributes
    if (ieContext.checksumOutput)
    if (checksumOutput)
    {
      String includeAttributeStrings[] =
        {"objectclass", "sn", "cn", "entryuuid"};
@@ -2818,8 +2813,8 @@
    }
    catch (DirectoryException de)
    {
      if ((ieContext != null) && (ieContext.checksumOutput) &&
          (ros.getNumExportedEntries() >= ieContext.entryCount))
      if ((checksumOutput) &&
          (ros.getNumExportedEntries() >= entryCount))
      {
        // This is the normal end when computing the generationId
        // We can interrupt the export only by an IOException
@@ -2844,9 +2839,9 @@
    finally
    {
      if ((ieContext != null) && (ieContext.checksumOutput))
      if (checksumOutput)
      {
        ieContext.checksumOutputValue =
        genID =
         ((CheckedOutputStream)os).getChecksum().getValue();
      }
      else
@@ -2880,6 +2875,7 @@
            ResultCode.OTHER, message, null);
      }
    }
    return genID;
  }
  /**
@@ -2922,12 +2918,10 @@
      throw ioe;
    }
    if (ieContext.checksumOutput == false)
    {
      EntryMessage entryMessage = new EntryMessage(
    EntryMessage entryMessage = new EntryMessage(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
      broker.publish(entryMessage);
    }
    broker.publish(entryMessage);
    try
    {
      ieContext.updateCounters();
@@ -3130,7 +3124,7 @@
      broker.publish(initializeMessage);
      exportBackend();
      exportBackend(false);
      // Notify the peer of the success
      DoneMessage doneMsg = new DoneMessage(serverId,
@@ -3263,6 +3257,30 @@
        backend = retrievesBackend(baseDN);
      }
      try
      {
        loadDataState();
        if (debugEnabled())
          TRACER.debugInfo(
              "After import, the replication plugin restarts connections" +
              " to all RSs to provide new generation ID=" + generationId);
        broker.setGenerationId(generationId);
      }
      catch (DirectoryException fe)
      {
        // If we already catch an Exception it's quite possible
        // that the loadDataState() and setGenerationId() fail
        // so we don't bother about the new Exception.
        // However if there was no Exception before we want
        // to return this Exception to the task creator.
        if (de == null)
          de = fe;
      }
      // Re-exchange generationID and state with RS
      broker.reStart();
      // Update the task that initiated the import
      if ((ieContext != null ) && (ieContext.initializeTask != null))
      {
@@ -3276,19 +3294,6 @@
    {
      throw de;
    }
    else
    {
      loadDataState();
      if (debugEnabled())
        TRACER.debugInfo(
            "After import, the replication plugin restarts connections" +
            " to all RSs to provide new generation ID=" + generationId);
      broker.setGenerationId(generationId);
      // Re-exchange generationID and state with RS
      broker.reStart();
    }
  }
  /**
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
@@ -58,7 +58,6 @@
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.util.TimeThread;
import org.testng.annotations.*;
/**
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -68,7 +68,6 @@
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationBackend;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.tasks.LdifFileWriter;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -81,7 +81,6 @@
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.opends.server.util.TimeThread;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -81,7 +81,6 @@
import org.opends.server.types.RDN;
import org.opends.server.types.RawModification;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -100,7 +99,6 @@
  private Entry  user3Entry;
  private String user3dn;
  private String user3lLDIFEntry;
  private String user3UUID;
  private String baseUUID;
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ComputeBestServerTest.java
@@ -31,7 +31,6 @@
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.*;
import org.opends.messages.Category;
@@ -65,11 +64,6 @@
    }
  }
  private void debugInfo(String message, Exception e)
  {
    debugInfo(message + stackTraceToSingleLineString(e));
  }
  /**
   * Set up the environment.
   *
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -51,7 +51,7 @@
import org.testng.annotations.Test;
/**
 * Test if the replication domain is able to switch of replication rerver
 * Test if the replication domain is able to switch of replication server
 * if there is some replication server failure.
 */
@Test(sequential = true)
@@ -82,11 +82,6 @@
    }
  }
  private void debugInfo(String message, Exception e)
  {
    debugInfo(message + stackTraceToSingleLineString(e));
  }
  private void initTest()
  {
    rs1Port = -1;
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -552,12 +552,6 @@
    catch (Exception e) {}
  }
  
  @Test(enabled=true)
  public void MonitorTest() throws Exception
  {
    testMultiRS();
  }
  private static final ByteArrayOutputStream oStream =
    new ByteArrayOutputStream();
  private static final ByteArrayOutputStream eStream =