mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
25.29.2007 0eab55fa49863935bbc81242b7e13fa550fedc6d
some replication hardening :
- fix some race conditions in namingConflict test
- add some cleanup at the end of ReplicationServerDynamicConfTest and
IsolationTest
- don't use anymore 2 statics in the replication code that could cause
problem when in-core restart are done.
- improve the shutdown by making sure that all threads are done
before returning
14 files modified
203 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/Historical.java 50 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ListenerThread.java 26 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 2 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 40 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java 3 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 12 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 1 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java 5 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java 15 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java 6 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ModifyConflictTest.java 25 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java 10 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/Historical.java
@@ -82,15 +82,9 @@
  public static final String HISTORICAL = "ds-synch-historical";
  /**
   * The AttributeType associated to the attribute used to store
   * hitorical information.
   * The name of the entryuuid attribute.
   */
  public static final AttributeType historicalAttrType =
    DirectoryServer.getSchema().getAttributeType(HISTORICALATTRIBUTENAME);
  static final String ENTRYUIDNAME = "entryuuid";
  static final AttributeType entryuuidAttrType =
    DirectoryServer.getSchema().getAttributeType(ENTRYUIDNAME);
  /*
@@ -177,7 +171,7 @@
    Modification mod;
    mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    modifiedEntry.removeAttribute(historicalAttrType);
    modifiedEntry.removeAttribute(attr.getAttributeType());
    modifiedEntry.addAttribute(attr, null);
  }
@@ -194,7 +188,7 @@
  private AttributeInfo getAttrInfo(Modification mod)
  {
    Attribute modAttr = mod.getAttribute();
    if (modAttr.getAttributeType().equals(Historical.historicalAttrType))
    if (isHistoricalAttribute(modAttr))
    {
      // Don't keep historical information for the attribute that is
      // used to store the historical information.
@@ -229,6 +223,8 @@
   */
  public Attribute encode()
  {
    AttributeType historicalAttrType =
      DirectoryServer.getSchema().getAttributeType(HISTORICALATTRIBUTENAME);
    LinkedHashSet<AttributeValue> hist = new LinkedHashSet<AttributeValue>();
    for (Map.Entry<AttributeType, AttrInfoWithOptions> entryWithOptions :
@@ -344,7 +340,7 @@
   */
  public static Historical load(Entry entry)
  {
    List<Attribute> hist = entry.getAttribute(historicalAttrType);
    List<Attribute> hist = getHistoricalAttr(entry);
    Historical histObj = new Historical();
    AttributeType lastAttrType = null;
    Set<String> lastOptions = new HashSet<String>();
@@ -441,7 +437,7 @@
  {
    TreeMap<ChangeNumber, FakeOperation> operations =
            new TreeMap<ChangeNumber, FakeOperation>();
    List<Attribute> attrs = entry.getOperationalAttribute(historicalAttrType);
    List<Attribute> attrs = getHistoricalAttr(entry);
    if (attrs != null)
    {
      for (Attribute attr : attrs)
@@ -478,6 +474,19 @@
  }
  /**
   * Get the Attribute used to store the historical information from
   * the given Entry.
   *
   * @param   entry  The entry containing the historical information.
   *
   * @return  The Attribute used to store the historical information.
   */
  public static List<Attribute> getHistoricalAttr(Entry entry)
  {
    return entry.getAttribute(HISTORICALATTRIBUTENAME);
  }
  /**
   * Get the entry unique Id in String form.
   *
   * @param entry The entry for which the unique id should be returned.
@@ -487,6 +496,8 @@
  public static String getEntryUuid(Entry entry)
  {
    String uuidString = null;
    AttributeType entryuuidAttrType =
      DirectoryServer.getSchema().getAttributeType(ENTRYUIDNAME);
    List<Attribute> uuidAttrs =
             entry.getOperationalAttribute(entryuuidAttrType);
    if (uuidAttrs != null)
@@ -513,6 +524,8 @@
  {
    String uuidString = null;
    Map<AttributeType, List<Attribute>> attrs = op.getOperationalAttributes();
    AttributeType entryuuidAttrType =
      DirectoryServer.getSchema().getAttributeType(ENTRYUIDNAME);
    List<Attribute> uuidAttrs = attrs.get(entryuuidAttrType);
    if (uuidAttrs != null)
@@ -526,5 +539,20 @@
    }
    return uuidString;
  }
  /**
   * Check if a given attribute is an attribute used to store historical
   * information.
   *
   * @param   attr The attribute that needs to be checked.
   *
   * @return  a boolean indicating if the given attribute is
   *          used to store historical information.
   */
  public static boolean isHistoricalAttribute(Attribute attr)
  {
    AttributeType attrType = attr.getAttributeType();
    return attrType.getNameOrOID().equals(Historical.HISTORICALATTRIBUTENAME);
  }
}
opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -50,6 +50,8 @@
  private ReplicationDomain listener;
  private boolean shutdown = false;
  private boolean done = false;
  /**
   * Constructor for the ListenerThread.
@@ -76,14 +78,13 @@
  public void run()
  {
    UpdateMessage msg;
    boolean done = false;
    if (debugEnabled())
    {
      TRACER.debugInfo("Replication Listener thread starting.");
    }
    while (!done)
    while (shutdown == false)
    {
      try
      {
@@ -91,7 +92,8 @@
        {
          listener.replay(msg);
        }
        done = true;
        if (msg == null)
          shutdown = true;
      } catch (Exception e)
      {
        /*
@@ -104,9 +106,27 @@
        logError(message);
      }
    }
    done = true;
    if (debugEnabled())
    {
      TRACER.debugInfo("Replication Listener thread stopping.");
    }
  }
  /**
   * Wait for the completion of this thread.
   */
  public void waitForShutdown()
  {
    try
    {
      while (done == false)
      {
        Thread.sleep(50);
      }
    } catch (InterruptedException e)
    {
      // exit the loop if this thread is interrupted.
    }
  }
}
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -192,6 +192,7 @@
      MultimasterSynchronizationProviderCfg configuration)
  throws ConfigException
  {
    domains.clear();
    replicationServerListener = new ReplicationServerListener(configuration);
    // Register as an add and delete listener with the root configuration so we
@@ -435,6 +436,7 @@
    {
      domain.shutdown();
    }
    domains.clear();
    // shutdown the ReplicationServer Service if necessary
    if (replicationServerListener != null)
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -245,6 +245,12 @@
  private DN configDn;
  /**
   * A boolean indicating if the thread used to save the persistentServerState
   * is terminated.
   */
  private boolean done = false;
  /**
   * This class contain the context related to an import or export
   * launched on the domain.
   */
@@ -1207,6 +1213,8 @@
      { }
    }
    state.save();
    done = true;
  }
  /**
@@ -1216,6 +1224,10 @@
   */
  private void createListeners()
  {
    synchronized (synchroThreads)
    {
      if (!shutdown)
      {
    synchroThreads.clear();
    for (int i=0; i<listenerThreadNumber; i++)
    {
@@ -1224,20 +1236,26 @@
      synchroThreads.add(myThread);
    }
  }
    }
  }
  /**
   * Shutdown this ReplicationDomain.
   */
  public void shutdown()
  {
    // stop the flush thread
    shutdown = true;
    synchronized (synchroThreads)
    {
    // stop the listener threads
    for (ListenerThread thread : synchroThreads)
    {
      thread.shutdown();
    }
    }
    // stop the flush thread
    shutdown = true;
    synchronized (this)
    {
      this.notify();
@@ -1253,7 +1271,19 @@
    //  wait for the listener thread to stop
    for (ListenerThread thread : synchroThreads)
    {
      thread.shutdown();
      thread.waitForShutdown();
    }
    // wait for completion of the persistentServerState thread.
    try
    {
      while (!done)
      {
        Thread.sleep(50);
      }
    } catch (InterruptedException e)
    {
      // stop waiting when interrupted.
    }
  }
@@ -2249,7 +2279,7 @@
   */
  public long computeGenerationId() throws DirectoryException
  {
    Backend backend = this.retrievesBackend(baseDN);
    Backend backend = retrievesBackend(baseDN);
    long bec = backend.getEntryCount();
    this.acquireIEContext();
    ieContext.checksumOutput = true;
@@ -3049,7 +3079,7 @@
    LDIFImportConfig importConfig = null;
    DirectoryException de = null;
    Backend backend = this.retrievesBackend(baseDN);
    Backend backend = retrievesBackend(baseDN);
    if (!backend.supportsLDIFImport())
    {
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -215,7 +215,8 @@
          continue;
        }
      }
      if (!attr.getAttributeType().equals(Historical.historicalAttrType))
      if (!Historical.isHistoricalAttribute(attr))
      {
        LDAPModification ldapmod = new LDAPModification(
          mod.getModificationType(), new LDAPAttribute(mod.getAttribute()));
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -26,8 +26,6 @@
 */
package org.opends.server.replication;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
@@ -40,25 +38,22 @@
import java.io.File;
import java.net.ServerSocket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.net.SocketTimeoutException;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
@@ -71,7 +66,6 @@
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.tasks.LdifFileWriter;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
@@ -79,8 +73,6 @@
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -1028,7 +1020,7 @@
      // At this moment, root entry of the domain has been removed so
      // genId is no more in the database ... but it has still the old
      // value in memory.
      int found = testEntriesInDb();
      testEntriesInDb();
      replDomain.loadGenerationId();
      debugInfo("Successfully ending " + testCase);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -52,7 +52,6 @@
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -36,10 +36,10 @@
import java.util.ArrayList;
import java.util.List;
import org.opends.server.TestCaseUtils;
import org.opends.messages.Message;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
@@ -49,7 +49,6 @@
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMessage;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -793,6 +793,7 @@
          gen.newChangeNumber(), user1entrysecondUUID);
    broker.publish(delMsg);
    resultEntry = getEntry(personWithUUIDEntry.getDN(), 10000, false);
    resultEntry = getEntry(personWithSecondUniqueID.getDN(), 10000, false);
    // check that the delete operation has been applied
    assertNull(resultEntry,
@@ -1171,9 +1172,19 @@
    broker.publish(modDnMsg);
    // unfortunately it is difficult to check that the operation
    // did not do anything.
    // The only thing we can check is that resolved naminf conflict counter
    // The only thing we can check is that resolved naming conflict counter
    // has correctly been incremented.
    assertEquals(getMonitorDelta(), 1);
    int count = 0;
    while ((count<2000) && getMonitorDelta() == 0)
    {
      // it is possible that the update has not yet been applied
      // wait a short time and try again.
      Thread.sleep(100);
      count++;
    }
    // if the monitor counter did not get incremented after 200sec
    // then something got wrong.
    assertTrue(count < 200);
    
    // Check that there was no administrative alert generated
    // because the conflict has been automatically resolved.
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
@@ -171,7 +171,7 @@
    DN dn = DN.decode("uid=user.1,o=test");
    Entry entry = DirectoryServer.getEntry(dn);
    List<Attribute> attrs = entry.getAttribute(Historical.historicalAttrType);
    List<Attribute> attrs = Historical.getHistoricalAttr(entry);
    Attribute before = attrs.get(0);
    // Check that encoding and decoding preserves the history information.
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/IsolationTest.java
@@ -108,8 +108,7 @@
      // try a new modify operation on the base entry.
      op = conn.processModify(baseDn, generatemods("description", "test"));
      // chek that the operation was successful.
      // check that the update failed.
      // check that the operation was successfull.
      assertEquals(op.getResultCode(), ResultCode.SUCCESS, 
          op.getAdditionalLogMessage().toString());
    }
@@ -119,9 +118,12 @@
        MultimasterReplication.deleteDomain(baseDn);
      if (replicationPlugin != null)
      {
        replicationPlugin.finalizeSynchronizationProvider();
        DirectoryServer.deregisterSynchronizationProvider(replicationPlugin);
    }
  }
  }
  /**
   * Clean the database and replace with a single entry.
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ModifyConflictTest.java
@@ -601,6 +601,10 @@
   */
  private Entry initializeEntry() throws DirectoryException
  {
    AttributeType entryuuidAttrType =
      DirectoryServer.getSchema().getAttributeType(
          Historical.ENTRYUIDNAME);
    /*
     * Objectclass and DN do not have any impact on the modifty conflict
     * resolution for the description attribute. Always use the same values
@@ -622,10 +626,10 @@
    // Create the att values list
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>(
        1);
    values.add(new AttributeValue(Historical.entryuuidAttrType,
    values.add(new AttributeValue(entryuuidAttrType,
        new ASN1OctetString(uuid.toString())));
    ArrayList<Attribute> uuidList = new ArrayList<Attribute>(1);
    Attribute uuidAttr = new Attribute(Historical.entryuuidAttrType,
    Attribute uuidAttr = new Attribute(entryuuidAttrType,
        "entryUUID", values);
    uuidList.add(uuidAttr);
@@ -635,7 +639,7 @@
    Map<AttributeType, List<Attribute>> operationalAttributes = entry
        .getOperationalAttributes();
    operationalAttributes.put(Historical.entryuuidAttrType, uuidList);
    operationalAttributes.put(entryuuidAttrType, uuidList);
    return entry;
  }
@@ -645,6 +649,8 @@
  private void testHistoricalAndFake(
      Historical hist, Entry entry)
  {
    AttributeType entryuuidAttrType =
      DirectoryServer.getSchema().getAttributeType(Historical.ENTRYUIDNAME);
    // Get the historical uuid associated to the entry
    // (the one that needs to be tested)
@@ -652,7 +658,7 @@
    // Get the Entry uuid in String format
    List<Attribute> uuidAttrs = entry
        .getOperationalAttribute(Historical.entryuuidAttrType);
        .getOperationalAttribute(entryuuidAttrType);
    uuidAttrs.get(0).getValues().iterator().next().toString();
    if (uuidAttrs != null)
@@ -730,6 +736,10 @@
  private List<Modification> replayModify(
      Entry entry, Historical hist, Modification mod, int date)
  {
    AttributeType historicalAttrType =
      DirectoryServer.getSchema().getAttributeType(
          Historical.HISTORICALATTRIBUTENAME);
    InternalClientConnection connection =
      InternalClientConnection.getRootConnection();
    ChangeNumber t = new ChangeNumber(date, (short) 0, (short) 0);
@@ -763,7 +773,7 @@
     * works  by encoding decoding and checking that the result is the same
     * as the initial value.
     */
    entry.removeAttribute(Historical.historicalAttrType);
    entry.removeAttribute(historicalAttrType);
    entry.addAttribute(hist.encode(), null);
    Historical hist2 = Historical.load(entry);
    assertEquals(hist2.encode().toString(), hist.encode().toString());
@@ -793,6 +803,9 @@
  private void testHistorical(
      Historical hist, LocalBackendAddOperation addOp)
  {
    AttributeType entryuuidAttrType =
      DirectoryServer.getSchema().getAttributeType(
          Historical.ENTRYUIDNAME);
    // Get the historical uuid associated to the entry
    // (the one that needs to be tested)
@@ -800,7 +813,7 @@
    // Get the op uuid in String format
    List<Attribute> uuidAttrs = addOp.getOperationalAttributes().get(
        Historical.entryuuidAttrType);
        entryuuidAttrType);
    uuidAttrs.get(0).getValues().iterator().next().toString();
    if (uuidAttrs != null)
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerDynamicConfTest.java
@@ -49,8 +49,11 @@
  @Test()
  public void replServerApplyChangeTest() throws Exception
  {
    ReplicationServer replicationServer = null;
    TestCaseUtils.startServer();
    try {
    // find two free ports for the replication Server port
    ServerSocket socket1 = TestCaseUtils.bindFreePort();
    int replicationServerPort = socket1.getLocalPort();
@@ -63,7 +66,7 @@
    ReplServerFakeConfiguration conf =
      new ReplServerFakeConfiguration(
          replicationServerPort, null, 0, 1, 0, 0, null);
    ReplicationServer replicationServer = new ReplicationServer(conf);
      replicationServer = new ReplicationServer(conf);
   
    // Most of the configuration change are trivial to apply.
    // The interesting change is the change of the replication server port.
@@ -84,4 +87,9 @@
    // broker did connect successfully.
    assertTrue(broker.getCurrentSendWindow() != 0);
  }
    finally
    {
      replicationServer.shutdown();
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.server;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
@@ -41,7 +40,6 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -49,9 +47,6 @@
import java.util.TreeSet;
import java.util.UUID;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.DirectoryServer;
@@ -75,7 +70,6 @@
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.replication.protocol.ServerStartMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;