mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
15.30.2013 141b38df935b4bf865e1bcf2874aed1c70e3437c
Fixing UpdateOperationTest.csnGeneratorAdjust() test failure in Continuous Integration.

UpdateOperationTest.java:
Added consumeAllMessages(ReplicationBroker).
In csnGeneratorAdjust(), called consumeAllMessages().
Extracted local variables serverId.
Fixed formatting.
Increased vertical density.
Removed a useless call to Thread.sleep().
Removed useless comments.

ReplicationBroker.java:
Used local variables "localSession" to avoid concurrency issues when setSession(null) is called by another thread.

*.java:
Implemented toString().
4 files modified
192 ■■■■■ changed files
opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 80 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 9 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java 97 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -293,4 +293,10 @@
    return sslEncryption;
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " " + (sslEncryption ? "with SSL" : "");
  }
}
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -82,7 +82,7 @@
   */
  public final static String NO_CONNECTED_SERVER = "Not connected";
  private volatile String replicationServer = NO_CONNECTED_SERVER;
  private volatile Session session = null;
  private volatile Session session;
  private final ServerState state;
  private final DN baseDN;
  private final int serverId;
@@ -1284,7 +1284,8 @@
      // Send our Start Session
      StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
      startECLSessionMsg.setOperationId("-1");
      session.publish(startECLSessionMsg);
      final Session localSession = session;
      localSession.publish(startECLSessionMsg);
      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
      if (debugEnabled())
@@ -1294,7 +1295,7 @@
      }
      // Alright set the timeout to the desired value
      session.setSoTimeout(timeout);
      localSession.setSoTimeout(timeout);
      connected = true;
    } catch (Exception e)
    {
@@ -1319,8 +1320,6 @@
  private TopologyMsg performPhaseTwoHandshake(String server,
    ServerStatus initStatus)
  {
    TopologyMsg topologyMsg;
    try
    {
      /*
@@ -1347,12 +1346,13 @@
        startSessionMsg =
          new StartSessionMsg(initStatus, new ArrayList<String>());
      }
      session.publish(startSessionMsg);
      final Session localSession = session;
      localSession.publish(startSessionMsg);
      /*
       * Read the TopologyMsg that should come back.
       */
      topologyMsg = (TopologyMsg) session.receive();
      TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
      if (debugEnabled())
      {
@@ -1361,8 +1361,8 @@
      }
      // Alright set the timeout to the desired value
      session.setSoTimeout(timeout);
      localSession.setSoTimeout(timeout);
      return topologyMsg;
    } catch (Exception e)
    {
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
@@ -1372,9 +1372,8 @@
      setSession(null);
      // Be sure to return null.
      topologyMsg = null;
      return null;
    }
    return topologyMsg;
  }
  /**
@@ -1423,6 +1422,7 @@
     *   local DS
     * - replication server in the same VM as local DS one
     */
    // TODO JNR log why an RS was evicted as best server
    Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
    /*
    The list of best replication servers is filtered with each criteria. At
@@ -2225,10 +2225,10 @@
            Check the session. If it has changed, some disconnection or
            reconnection happened and we need to restart from scratch.
            */
            if (session != null && session == currentSession)
            final Session localSession = session;
            if (localSession != null && session == currentSession)
            {
              session.publish(msg);
              localSession.publish(msg);
              done = true;
            }
          }
@@ -2243,8 +2243,10 @@
            window update message was lost somehow...
            then loop to check again if connection was closed.
            */
            if (session != null) {
              session.publish(new WindowProbeMsg());
            Session localSession = session;
            if (localSession != null)
            {
              localSession.publish(new WindowProbeMsg());
            }
          }
        }
@@ -2330,8 +2332,8 @@
      // Save session information for later in case we need it for log messages
      // after the session has been closed and/or failed.
      final Session savedSession = session;
      if (savedSession == null)
      final Session localSession = session;
      if (localSession == null)
      {
        // Must be shutting down.
        break;
@@ -2340,7 +2342,7 @@
      final int previousRsServerID = rsServerId;
      try
      {
        ReplicationMsg msg = savedSession.receive();
        ReplicationMsg msg = localSession.receive();
        if (msg instanceof UpdateMsg)
        {
          synchronized (this)
@@ -2372,12 +2374,12 @@
        {
          // RS performs a proper disconnection
          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
              previousRsServerID, savedSession.getReadableRemoteAddress(),
              previousRsServerID, localSession.getReadableRemoteAddress(),
              serverId, baseDN.toNormalizedString());
          logError(message);
          // Try to find a suitable RS
          reStart(savedSession, true);
          reStart(localSession, true);
        }
        else if (msg instanceof MonitorMsg)
        {
@@ -2436,14 +2438,15 @@
                {
                  message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
                      serverId, previousRsServerID,
                      savedSession.getReadableRemoteAddress(),
                      localSession.getReadableRemoteAddress(),
                      baseDN.toNormalizedString());
                }
                else
                {
                  // TODO JNR log why an RS was evicted as best server
                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                      serverId, previousRsServerID,
                      savedSession.getReadableRemoteAddress(),
                      localSession.getReadableRemoteAddress(),
                      bestServerInfo.getServerId(),
                      baseDN.toNormalizedString());
                }
@@ -2480,13 +2483,13 @@
            // We did not initiate the close on our side, log an error message.
            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
                serverId, baseDN.toNormalizedString(), previousRsServerID,
                savedSession.getReadableRemoteAddress());
                localSession.getReadableRemoteAddress());
            logError(message);
          }
          if (reconnectOnFailure)
          {
            reStart(savedSession, true);
            reStart(localSession, true);
          }
          else
          {
@@ -2546,9 +2549,10 @@
    try
    {
      updateDoneCount++;
      if ((updateDoneCount >= halfRcvWindow) && (session != null))
      final Session localSession = session;
      if (updateDoneCount >= halfRcvWindow && localSession != null)
      {
        session.publish(new WindowMsg(updateDoneCount));
        localSession.publish(new WindowMsg(updateDoneCount));
        rcvWindow += updateDoneCount;
        updateDoneCount = 0;
      }
@@ -2598,9 +2602,10 @@
  public void setSoTimeout(int timeout) throws SocketException
  {
    this.timeout = timeout;
    if (session != null)
    final Session localSession = session;
    if (localSession != null)
    {
      session.setSoTimeout(timeout);
      localSession.setSoTimeout(timeout);
    }
  }
@@ -2905,14 +2910,14 @@
    // Start a CSN heartbeat thread.
    if (changeTimeHeartbeatSendInterval > 0)
    {
      String threadName = "Replica DS(" + getServerId()
      final Session localSession = session;
      final String threadName = "Replica DS(" + getServerId()
          + ") change time heartbeat publisher for domain \""
          + this.baseDN + "\" to RS(" + getRsServerId()
          + ") at " + session.getReadableRemoteAddress();
          + baseDN + "\" to RS(" + getRsServerId()
          + ") at " + localSession.getReadableRemoteAddress();
      ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
          threadName, session, changeTimeHeartbeatSendInterval,
          serverId);
          threadName, localSession, changeTimeHeartbeatSendInterval, serverId);
      ctHeartbeatPublisherThread.start();
    } else
    {
@@ -3030,4 +3035,11 @@
      DirectoryServer.deregisterMonitorProvider(monitor);
    }
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " " + baseDN + " " + serverId;
  }
}
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -69,7 +69,7 @@
 *   should read the list of replication servers from the configuration,
 *   instantiate a {@link ServerState} then start the publish service
 *   by calling
 *   {@link #startPublishService(Collection, int, long, long)}.
 *   {@link #startPublishService(Set, int, long, long)}.
 *   At this point it can start calling the {@link #publish(UpdateMsg)}
 *   method if needed.
 * <p>
@@ -3675,4 +3675,11 @@
  {
    return state.getCSN(serverID);
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -27,6 +27,7 @@
 */
package org.opends.server.replication;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -375,14 +376,12 @@
      // Check that the modify has been replayed.
      found = checkEntryHasAttribute(personWithUUIDEntry.getDN(),
          "description", "Description was changed",
          10000, true);
          "description", "Description was changed", 10000, true);
      assertTrue(found, "The second modification was not replayed.");
      // Delete the entries to clean the database.
      DeleteMsg delMsg = new DeleteMsg(personWithUUIDEntry.getDN(), gen.newCSN(), user1entryUUID);
      broker.publish(delMsg);
      broker.publish(
          new DeleteMsg(personWithUUIDEntry.getDN(), gen.newCSN(), user1entryUUID));
      assertNull(getEntry(personWithUUIDEntry.getDN(), 10000, false),
          "The DELETE replication message was not replayed");
    }
@@ -552,16 +551,12 @@
     * Open a session to the replicationServer using the ReplicationServer broker API.
     * This must use a serverId different from the LDAP server ID
     */
    final int serverId = 2;
    ReplicationBroker broker =
      openReplicationSession(baseDN, 2, 100, replServerPort, 1000, true);
        openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
    try
    {
      /*
       * Create a CSN generator to generate new CSNs when we need to send
       * operations messages to the replicationServer.
       */
      CSNGenerator gen = new CSNGenerator(2, 0);
      CSNGenerator gen = new CSNGenerator(serverId, 0);
    /*
     * Test that the conflict resolution code is able to find entries
@@ -658,11 +653,10 @@
     */
    // send a delete operation with a wrong dn but the unique ID of the entry
    // used above
      DN delDN = DN.decode("cn=anotherdn," + baseDN);
    DeleteMsg delMsg = new DeleteMsg(delDN, gen.newCSN(), user1entryUUID);
    updateMonitorCount(baseDN, resolvedMonitorAttr);
      alertCount = DummyAlertHandler.getAlertCount();
    broker.publish(delMsg);
      DN delDN = DN.decode("cn=anotherdn," + baseDN);
      broker.publish(new DeleteMsg(delDN, gen.newCSN(), user1entryUUID));
    // check that the delete operation has been applied
    assertNull(getEntry(personWithUUIDEntry.getDN(), 10000, false),
@@ -739,10 +733,10 @@
     * To achieve this send a delete operation with a correct DN
     * but a wrong unique ID.
     */
    delMsg = new DeleteMsg(newPersonDN, gen.newCSN(), "11111111-9abc-def0-1234-1234567890ab");
    updateMonitorCount(baseDN, resolvedMonitorAttr);
      alertCount = DummyAlertHandler.getAlertCount();
    broker.publish(delMsg);
      broker.publish(
          new DeleteMsg(newPersonDN, gen.newCSN(), "11111111-9abc-def0-1234-1234567890ab"));
    // check that the delete operation has not been applied
      assertNotNull(getEntry(newPersonDN, 10000, true),
@@ -824,19 +818,13 @@
    // delete the entries to clean the database
    DN delDN2 = DN.decode("entryUUID = " + user1entrysecondUUID + "+"
        + user1dn.getRDN() + "," + baseDN);
    delMsg = new DeleteMsg(delDN2, gen.newCSN(), user1entrysecondUUID);
    broker.publish(delMsg);
    // check that the delete operation has been applied
      DN delDN2 = DN.decode(
          "entryUUID = " + user1entrysecondUUID + "+" + user1dn.getRDN() + "," + baseDN);
      broker.publish(new DeleteMsg(delDN2, gen.newCSN(), user1entrysecondUUID));
      assertNull(getEntry(delDN2, 10000, false),
        "The DELETE replication message was not replayed");
    delMsg = new DeleteMsg(reallyNewDN, gen.newCSN(), user1entryUUID);
    broker.publish(delMsg);
      // check that the delete operation has been applied
      broker.publish(new DeleteMsg(reallyNewDN, gen.newCSN(), user1entryUUID));
      assertNull(getEntry(reallyNewDN, 10000, false),
          "The DELETE replication message was not replayed");
@@ -930,8 +918,7 @@
      alertCount = DummyAlertHandler.getAlertCount();
    // delete domain1
    delMsg = new DeleteMsg(domain1dn, olderCSN, domain1uid);
    broker.publish(delMsg);
      broker.publish(new DeleteMsg(domain1dn, olderCSN, domain1uid));
    // check that the domain1 has correctly been deleted
    assertNull(getEntry(domain1dn, 10000, false),
@@ -975,8 +962,7 @@
      alertCount = DummyAlertHandler.getAlertCount();
    // delete domain1
    delMsg = new DeleteMsg(domain1dn, gen.newCSN(), domain1uid);
    broker.publish(delMsg);
      broker.publish(new DeleteMsg(domain1dn, gen.newCSN(), domain1uid));
    // check that the domain1 has correctly been deleted
    assertNull(getEntry(domain1dn, 10000, false),
@@ -1131,11 +1117,11 @@
    // Cleanup from previous run
    cleanupTest();
    final int serverId = 27;
    ReplicationBroker broker =
      openReplicationSession(baseDN,  27, 100, replServerPort, 2000, true);
        openReplicationSession(baseDN, serverId, 100, replServerPort, 2000, true);
    try {
      CSNGenerator gen = new CSNGenerator( 27, 0);
      CSNGenerator gen = new CSNGenerator(serverId, 0);
      /*
       * Test that operations done on this server are sent to the
@@ -1303,12 +1289,12 @@
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "Starting replication test : infiniteReplayLoop"));
    Thread.sleep(2000);
    int serverId = 11;
    ReplicationBroker broker =
      openReplicationSession(baseDN,  11, 100, replServerPort, 1000, true);
        openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
    try
    {
      CSNGenerator gen = new CSNGenerator( 11, 0);
      CSNGenerator gen = new CSNGenerator(serverId, 0);
      // Create a test entry.
      Entry tmp = TestCaseUtils.entryFromLdifString(
@@ -1400,7 +1386,6 @@
  public void csnGeneratorAdjust() throws Exception
  {
    testSetUp("csnGeneratorAdjust");
    int serverId = 88;
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "Starting synchronization test : CSNGeneratorAdjust"));
@@ -1408,16 +1393,13 @@
     * Open a session to the replicationServer using the broker API.
     * This must use a different serverId to that of the directory server.
     */
    final int serverId = 88;
    ReplicationBroker broker =
      openReplicationSession(baseDN, serverId, 100, replServerPort, 1000, true);
    consumeAllMessages(broker); // clean leftover messages from lostHeartbeatFailover()
    try
    {
      /*
       * Create a CSN generator to generate new CSNs
       * when we need to send operation messages to the replicationServer.
       */
      long inTheFuture = System.currentTimeMillis() + (3600 * 1000);
      final long inTheFuture = System.currentTimeMillis() + (3600 * 1000);
      CSNGenerator gen = new CSNGenerator(serverId, inTheFuture);
      // Create and publish an update message to add an entry.
@@ -1453,4 +1435,31 @@
      broker.stop();
    }
  }
  /**
   * Consumes all the messages sent to this broker. This is useful at the start
   * of a test to avoid leftover messages from previous test runs.
   */
  private void consumeAllMessages(ReplicationBroker broker)
  {
    final List<ReplicationMsg> msgs = new ArrayList<ReplicationMsg>();
    try
    {
      while (true)
      {
        msgs.add(broker.receive());
      }
    }
    catch (SocketTimeoutException expectedAtSomeStage)
    {
      // this is expected to happen when there will not be any more messages to
      // consume from the socket
    }
    if (!msgs.isEmpty())
    {
      logError(Message.raw(Category.SYNC, Severity.SEVERE_ERROR,
          "Leftover messages from previous test runs " + msgs));
    }
  }
}