mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
03.00.2008 3556ad0275d7271b6533569ac72cb741abdb0b81
 fix for 2787 : Replication Server sessions fails when disconnecting and re-connecting

When a ReplicationBroker disconnect and reconnect quickly to a Replication
Server the reconnection sometimes fails.

These Changes :
- add a unit tests for these condition
- fixes race conditions in the Replication Server code to make this test
successful
9 files modified
275 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java 9 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 99 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 23 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 49 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 69 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplLDIFOutputStream.java
@@ -22,16 +22,14 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.io.IOException;
import java.io.OutputStream;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.ServerConstants;
/**
@@ -41,11 +39,6 @@
public class ReplLDIFOutputStream
       extends OutputStream
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  // The synchronization domain on which the export is done
  ReplicationDomain domain;
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -114,12 +114,11 @@
   * @param baseDn the baseDn for which this DB was created.
   * @param replicationServer The ReplicationServer that creates this dbHandler.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * @param generationId The generationId of the data contained in the LDAP
   * server for this domain.
   * @throws DatabaseException If a database problem happened
   */
  public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv, long generationId)
      ReplicationDbEnv dbenv)
         throws DatabaseException
  {
    this.serverId = id;
@@ -369,7 +368,8 @@
        mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get());
        mb.append(stackTraceToSingleLineString(end));
        logError(mb.toMessage());
        replicationServer.shutdown();
        if (replicationServer != null)
          replicationServer.shutdown();
        break;
      }
    }
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.*;
@@ -257,7 +257,7 @@
              + " serverId=" + serverId);
          DbHandler dbHandler =
            new DbHandler(serverId, baseDn, replicationServer, this, 1);
            new DbHandler(serverId, baseDn, replicationServer, this);
          replicationServer.getReplicationServerDomain(baseDn, true).
          setDbHandler(serverId, dbHandler);
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
@@ -544,16 +544,14 @@
   *
   * @param id The serverId for which the dbHandler must be created.
   * @param baseDn The DN for which the dbHandler muste be created.
   * @param generationId The generationId for this server and this
   *        replicationServerDomain.
   * @return The new DB handler for this ReplicationServer and the serverId and
   *         DN given in parameter.
   * @throws DatabaseException in case of underlying database problem.
   */
  public DbHandler newDbHandler(short id, DN baseDn, long generationId)
  public DbHandler newDbHandler(short id, DN baseDn)
  throws DatabaseException
  {
    return new DbHandler(id, baseDn, this, dbEnv, generationId);
    return new DbHandler(id, baseDn, this, dbEnv);
  }
  /**
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -215,6 +215,11 @@
      }
    }
    if (generationId < 0)
    {
      generationId = sourceHandler.getGenerationId();
    }
    // look for the dbHandler that is responsible for the LDAP server which
    // generated the change.
    DbHandler dbHandler = null;
@@ -225,8 +230,7 @@
      {
        try
        {
          dbHandler = replicationServer.newDbHandler(id,
              baseDn, generationId);
          dbHandler = replicationServer.newDbHandler(id, baseDn);
          generationIdSavedStatus = true;
        }
        catch (DatabaseException e)
@@ -277,8 +281,25 @@
      handler.add(update, sourceHandler);
    }
  }
  /**
   * Wait a short while for ServerId disconnection.
   *
   * @param serverId the serverId to be checked.
   */
  public void waitDisconnection(short serverId)
  {
    if (connectedServers.containsKey(serverId))
    {
      // try again
      try
      {
        Thread.sleep(100);
      } catch (InterruptedException e)
      {
      }
    }
  }
  /**
@@ -339,22 +360,27 @@
        " for " + baseDn + " " +
        " stopServer " + handler.getMonitorInstanceName());
    handler.stopHandler();
    if (handler.isReplicationServer())
    {
      replicationServers.remove(handler.getServerId());
    }
    else
    {
      connectedServers.remove(handler.getServerId());
    }
      if (handler.isReplicationServer())
      {
        if (replicationServers.containsValue(handler))
        {
          replicationServers.remove(handler.getServerId());
          handler.stopHandler();
        }
      }
      else
      {
        if (connectedServers.containsValue(handler))
        {
          connectedServers.remove(handler.getServerId());
          handler.stopHandler();
        }
      }
    mayResetGenerationId();
    // Update the remote replication servers with our list
    // of connected LDAP servers
    sendReplServerInfo();
      // Update the remote replication servers with our list
      // of connected LDAP servers
      sendReplServerInfo();
  }
  /**
@@ -1238,6 +1264,45 @@
      return replicationServer;
    }
    /**
     * Process reception of a ReplServerInfoMessage.
     *
     * @param infoMsg The received message.
     * @param handler The handler that received the message.
     * @throws IOException when raised by the underlying session.
     */
    public void receiveReplServerInfo(
        ReplServerInfoMessage infoMsg, ServerHandler handler) throws IOException
    {
      if (debugEnabled())
      {
        if (handler.isReplicationServer())
          TRACER.debugInfo(
           "In RS " + getReplicationServer().getServerId() +
           " Receiving replServerInfo from " + handler.getServerId() +
           " baseDn=" + baseDn +
           " genId=" + infoMsg.getGenerationId());
      }
      mayResetGenerationId();
      if (generationId < 0)
        generationId = handler.getGenerationId();
      if (generationId > 0 && (generationId != infoMsg.getGenerationId()))
      {
        Message message = NOTE_BAD_GENERATION_ID.get(
            baseDn.toNormalizedString(),
            Short.toString(handler.getServerId()),
            Long.toString(infoMsg.getGenerationId()),
            Long.toString(generationId));
        ErrorMessage errorMsg = new ErrorMessage(
            getReplicationServer().getServerId(),
            handler.getServerId(),
            message);
        handler.sendError(errorMsg);
      }
    }
    /*
     * Monitor Data generation
     */
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -283,6 +283,10 @@
        // Get or Create the ReplicationServerDomain
        replicationServerDomain =
                replicationServer.getReplicationServerDomain(this.baseDn, true);
        replicationServerDomain.waitDisconnection(receivedMsg.getServerId());
        replicationServerDomain.mayResetGenerationId();
        localGenerationId = replicationServerDomain.getGenerationId();
        ServerState localServerState =
@@ -1010,16 +1014,18 @@
      {
      }
    }
    do {
    boolean acquired = false;
    do
    {
      try
      {
        sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS);
        acquired = sendWindow.tryAcquire((long)500, TimeUnit.MILLISECONDS);
        interrupted = false;
      } catch (InterruptedException e)
      {
        // loop until not interrupted
      }
    } while ((interrupted) && (!shutdown));
    } while (((interrupted) || (!acquired )) && (!shutdown));
    this.incrementOutCount();
    return msg;
  }
@@ -1800,6 +1806,17 @@
  }
  /**
   * Send an ErrorMessage to the peer.
   *
   * @param errorMsg The message to be sent
   * @throws IOException when raised by the underlying session
   */
  public void sendError(ErrorMessage errorMsg) throws IOException
  {
    session.publish(errorMsg);
  }
  /**
   * Process the reception of a WindowProbe message.
   *
   * @param  windowProbeMsg The message to process.
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -199,54 +199,7 @@
        {
          ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
          handler.receiveReplServerInfo(infoMsg);
          if (debugEnabled())
          {
            if (handler.isReplicationServer())
              TRACER.debugInfo(
               "In RS " + replicationServerDomain.getReplicationServer().
               getServerId() +
               " Receiving replServerInfo from " + handler.getServerId() +
               " baseDn=" + replicationServerDomain.getBaseDn() +
               " genId=" + infoMsg.getGenerationId());
          }
          if (replicationServerDomain.getGenerationId()<0)
          {
            // Here is the case where a ReplicationServer receives from
            // another ReplicationServer the generationId for a domain
            // for which the generation ID has never been set.
            replicationServerDomain.
                    setGenerationId(infoMsg.getGenerationId(),false);
          }
          else
          {
            if (infoMsg.getGenerationId()<0)
            {
              // Here is the case where another ReplicationServer
              // signals that it has no generationId set for the domain.
              // If we have generationId set locally and no server currently
              // connected for that domain in the topology then we may also
              // reset the generationId localy.
              replicationServerDomain.mayResetGenerationId();
            }
            if (replicationServerDomain.getGenerationId() !=
                    infoMsg.getGenerationId())
            {
              Message message = NOTE_BAD_GENERATION_ID.get(
                  replicationServerDomain.getBaseDn().toNormalizedString(),
                  Short.toString(handler.getServerId()),
                  Long.toString(infoMsg.getGenerationId()),
                  Long.toString(replicationServerDomain.getGenerationId()));
              ErrorMessage errorMsg = new ErrorMessage(
                  replicationServerDomain.getReplicationServer().getServerId(),
                  handler.getServerId(),
                  message);
              session.publish(errorMsg);
            }
          }
          replicationServerDomain.receiveReplServerInfo(infoMsg, handler);
        }
        else if (msg instanceof MonitorRequestMessage)
        {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication;
@@ -721,7 +721,6 @@
      // Read generationId - should be not retrievable since no entry
      debugInfo(testCase + " Connecting DS1 to replServer1(" + changelog1ID + ")");
      connectServer1ToChangelog(changelog1ID);
      Thread.sleep(1000);
      debugInfo(testCase + " Expect genId attribute to be not retrievable");
      genId = readGenId();
@@ -848,13 +847,15 @@
      debugInfo("Create again replServer1");
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      debugInfo("Delay to allow DS to reconnect to replServer1");
      Thread.sleep(200);
      Thread.sleep(1000);
      long genIdAfterRestart = replServer1.getGenerationId(baseDn);
      debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart);
      assertTrue(replServer1!=null, "Replication server creation failed.");
      assertTrue(genIdBeforeShut == genIdAfterRestart,
      "generationId is expected to have the same value after replServer1 restart");
        "generationId is expected to have the same value" +
        " after replServer1 restart. Before : " + genIdBeforeShut +
        " after : " + genIdAfterRestart);
      try
      {
@@ -1072,12 +1073,6 @@
    disconnectFromReplServer(changelog1ID);
    Thread.sleep(1000);
    debugInfo("Expect genId to be unset(-1) in all servers since no server is " +
        " connected and no change ever occurred");
    assertEquals(replServer1.getGenerationId(baseDn), -1, " in replServer1");
    assertEquals(replServer2.getGenerationId(baseDn), -1, " in replServer2");
    assertEquals(replServer3.getGenerationId(baseDn), -1, " in replServer3");
    debugInfo("Add entries to DS");
    this.addTestEntriesToDB(updatedEntries);
@@ -1278,6 +1273,8 @@
      debugInfo(testCase + " Expect genId attribute to be retrievable");
      genId = readGenId();
      assertEquals(genId, 3211313L);
      disconnectFromReplServer(changelog1ID);
    }
    finally
    {
@@ -1285,11 +1282,63 @@
      debugInfo("Successfully ending " + testCase);
    }
  }
  /**
   * Loop opening sessions to the Replication Server
   * to check that it handle correctly deconnection and reconnection.
   */
  @Test(enabled=false, groups="slow")
  public void testLoop() throws Exception
  {
    String testCase = "testLoop";
    debugInfo("Starting "+ testCase + " debugEnabled:" + debugEnabled());
    long rgenId;
    ReplicationDomain.clearJEBackend(false,
        "userRoot",
        baseDn.toNormalizedString());
    replServer1 = createReplicationServer(changelog1ID, false, testCase);
    replServer1.clearDb();
    ReplicationBroker broker = null;
    try
    {
      for (int i=0; i< 100; i++)
      {
        long generationId = 1000+i;
        broker = openReplicationSession(baseDn,
            server2ID, 100, getChangelogPort(changelog1ID),
            1000, !emptyOldChanges, generationId);
        debugInfo(testCase + " Expect genId to be set in memory on the replication " +
        " server side even if not wrote on disk/db since no change occurred.");
        rgenId = replServer1.getGenerationId(baseDn);
        if (rgenId != generationId)
        {
          fail("replication server failed to set generation ID");
          replServer1.getGenerationId(baseDn);
        }
        broker.stop();
        broker = null;
      }
    } finally
    {
      if (broker != null)
        broker.stop();
    }
  }
  /**
   * This is used to make sure that the 3 tests are run in the
   * specified order since this is necessary.
   */
  @Test(enabled=true, groups="slow")
  public void generationIdTest() throws Exception
  {
    testSingleRS();
    testMultiRS();
    testServerStop();
    testLoop();
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -73,7 +73,7 @@
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
    DbHandler handler =
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1);
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
    ChangeNumber changeNumber1 = gen.newChangeNumber();
@@ -153,7 +153,7 @@
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
    DbHandler handler =
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv, 1);
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
    // Creates changes added to the dbHandler
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);