mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
27.34.2009 a733cedcf54ab0b979f7f1b762d086e49bd59d72
Fix for issue 4316 : Replication takes too much time to shutdown

Several changes are included in this diff that allows replicaiton shutdown to
happen more quickly

- The Replication Server dbHandler thread was synchronized on the wrong variable
- During shutdown Topo messages were sent to all the other RS by the RS
that is shutdown.
- There was a left sleep in the ReplicationServer creation that is not necessary anymore.
1 files added
11 files modified
190 ■■■■ changed files
opends/src/server/org/opends/server/replication/common/MutableBoolean.java 67 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 62 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 14 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 23 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/MutableBoolean.java
New file
@@ -0,0 +1,67 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
/**
 * The MutableBoolean wraps a boolean in a mutable way.
 * This can be usable when one wishes to use a boolean object with condition
 * variables.
 */
public class MutableBoolean
{
  boolean value;
  /**
   * A MutableBoolean with the given initial value.
   *
   * @param value  The initial value of the mutable Boolean
   */
  public MutableBoolean(boolean value)
  {
    this.value = value;
  }
  /**
   * Retrieves the current value of this MutableBoolean.
   *
   * @return The current value of this MutableBoolean.
   */
  public boolean get()
  {
    return value;
  }
  /**
   * Sets the current value of this MutableBoolean.
   *
   * @param value The new value of this MutableBoolean.
   */
  public void set(boolean value)
  {
    this.value = value;
  }
}
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -321,9 +321,9 @@
    }
    shutdown  = true;
    synchronized (this)
    synchronized (msgQueue)
    {
      this.notifyAll();
      msgQueue.notifyAll();
    }
    synchronized (this)
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -988,7 +988,7 @@
      sendWindow = new Semaphore(sendWindowSize);
      // create reader
      reader = new ServerReader(session, serverId, this);
      reader = new ServerReader(session, this);
      reader.start();
      if (writer == null)
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -194,7 +194,7 @@
        }
      }
      if (replicationServerDomain!=null)
        replicationServerDomain.stopServer(handler);
        replicationServerDomain.stopServer(handler, false);
    }
  }
opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
@@ -93,6 +93,6 @@
  public void close()
  {
    if (handler.getDomain() != null)
      handler.getDomain().stopServer(handler);
      handler.getDomain().stopServer(handler, false);
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -569,10 +569,6 @@
        serverId , this);
      connectThread.start();
      // FIXME : Is it better to have the time to receive the ReplServerInfo
      // from all the other replication servers since this info is necessary
      // to route an early received total update request.
      try { Thread.sleep(300);} catch(Exception e) {}
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates listen thread");
@@ -1048,7 +1044,7 @@
      // Have a new group id: Disconnect every servers.
      for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
      {
        replicationServerDomain.stopAllServers();
        replicationServerDomain.stopAllServers(true);
      }
    }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -808,7 +808,7 @@
              cn.toString(), baseDn));
            mb.append(stackTraceToSingleLineString(e));
            logError(mb.toMessage());
            stopServer(origServer);
            stopServer(origServer, false);
          }
          // Mark the ack info object as completed to prevent potential timeout
          // code parallel run
@@ -887,7 +887,7 @@
                cn.toString(), baseDn));
            mb.append(stackTraceToSingleLineString(e));
            logError(mb.toMessage());
            stopServer(origServer);
            stopServer(origServer, false);
          }
          // Increment assured counters
          boolean safeRead =
@@ -979,25 +979,28 @@
    for (ReplicationServerHandler handler : replicationServers.values())
    {
      if (replServers.contains(handler.getServerAddressURL()))
        stopServer(handler);
        stopServer(handler, false);
    }
  }
  /**
   * Stop operations with all servers this domain is connected with (RS and DS).
   *
   * @param shutdown A boolean indicating if the stop is due to a
   *                 shutdown condition.
   */
  public void stopAllServers()
  public void stopAllServers(boolean shutdown)
  {
    // Close session with other replication servers
    for (ReplicationServerHandler serverHandler : replicationServers.values())
    {
      stopServer(serverHandler);
      stopServer(serverHandler, shutdown);
    }
    // Close session with other LDAP servers
    for (DataServerHandler serverHandler : directoryServers.values())
    {
      stopServer(serverHandler);
      stopServer(serverHandler, shutdown);
    }
  }
@@ -1026,9 +1029,11 @@
  /**
   * Stop operations with a given server.
   *
   * @param handler the server for which we want to stop operations
   * @param handler the server for which we want to stop operations.
   * @param shutdown A boolean indicating if the stop is due to a
   *                 shutdown condition.
   */
  public void stopServer(ServerHandler handler)
  public void stopServer(ServerHandler handler, boolean shutdown)
  {
      if (debugEnabled())
        TRACER.debugInfo(
@@ -1049,9 +1054,13 @@
      {
        try
        {
          // Acquire lock on domain (see more details in comment of start()
          // method of ServerHandler)
          lock();
          if (!shutdown)
          {
            lock();
          }
        } catch (InterruptedException ex)
        {
          // Try doing job anyway...
@@ -1066,9 +1075,12 @@
            // Check if generation id has to be reset
            mayResetGenerationId();
            // Warn our DSs that a RS or DS has quit (does not use this
            // handler as already removed from list)
            buildAndSendTopoInfoToDSs(null);
            if (!shutdown)
            {
              // Warn our DSs that a RS or DS has quit (does not use this
              // handler as already removed from list)
              buildAndSendTopoInfoToDSs(null);
            }
          }
        } else
        {
@@ -1093,10 +1105,13 @@
            mayResetGenerationId();
            // Update the remote replication servers with our list
            // of connected LDAP servers
            buildAndSendTopoInfoToRSs();
            // Warn our DSs that a RS or DS has quit (does not use this
            // handler as already removed from list)
            buildAndSendTopoInfoToDSs(null);
            if (!shutdown)
            {
              buildAndSendTopoInfoToRSs();
              // Warn our DSs that a RS or DS has quit (does not use this
              // handler as already removed from list)
              buildAndSendTopoInfoToDSs(null);
            }
          }
          else if (otherHandlers.contains(handler))
          {
@@ -1113,7 +1128,10 @@
      }
      finally
      {
        release();
        if (!shutdown)
        {
          release();
        }
      }
    }
  }
@@ -1710,7 +1728,7 @@
        mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
        mb2.append(stackTraceToSingleLineString(ioe));
        logError(mb2.toMessage());
        stopServer(senderHandler);
        stopServer(senderHandler, false);
      }
    } else
    {
@@ -1746,8 +1764,8 @@
            // an error happened on the sender session trying to recover
            // from an error on the receiver session.
            // We don't have much solution left beside closing the sessions.
            stopServer(senderHandler);
            stopServer(targetHandler);
            stopServer(senderHandler, false);
            stopServer(targetHandler, false);
          }
        // TODO Handle error properly (sender timeout in addition)
        }
@@ -1766,7 +1784,7 @@
    // Terminate the assured timer
    assuredTimeoutTimer.cancel();
    stopAllServers();
    stopAllServers(true);
    stopDbHandlers();
  }
@@ -3163,7 +3181,7 @@
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
            logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(rsHandler.getName()));
            stopServer(rsHandler);
            stopServer(rsHandler, false);
          }
        }
      }
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -375,7 +375,7 @@
      writer = new ServerWriter(session, serverId,
          this, replicationServerDomain);
      reader = new ServerReader(session, serverId, this);
      reader = new ServerReader(session, this);
      reader.start();
      writer.start();
@@ -1421,6 +1421,6 @@
  public void doStop()
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.stopServer(this);
      replicationServerDomain.stopServer(this, false);
  }
}
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -58,7 +58,6 @@
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private int serverId;
  private ProtocolSession session;
  private ServerHandler handler;
@@ -66,16 +65,14 @@
   * Constructor for the LDAP server reader part of the replicationServer.
   *
   * @param session The ProtocolSession from which to read the data.
   * @param serverId The server ID of the server from which we read messages.
   * @param handler The server handler for this server reader.
   */
  public ServerReader(ProtocolSession session, int serverId,
  public ServerReader(ProtocolSession session,
      ServerHandler handler)
  {
    super("Replication Reader Thread for RS handler " +
        handler.getMonitorInstanceName());
    this.session = session;
    this.serverId = serverId;
    this.handler = handler;
  }
@@ -302,9 +299,12 @@
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
        Integer.toString(handler.getReplicationServerId()));
      logError(errMessage);
      if (!handler.shuttingDown())
      {
        errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
            Integer.toString(handler.getReplicationServerId()));
        logError(errMessage);
      }
    }
    catch (ClassNotFoundException e)
    {
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -246,7 +246,7 @@
      {
       // Can't do much more : ignore
      }
      replicationServerDomain.stopServer(handler);
      replicationServerDomain.stopServer(handler, false);
      if (debugEnabled())
      {
        TRACER.debugInfo(this.getName() + " stopped " + errMessage);
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -79,6 +79,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
@@ -309,7 +310,7 @@
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
   */
  private Object monitorResponse = new Object();
  private final MutableBoolean monitorResponse = new MutableBoolean(false);
  /**
@@ -585,6 +586,8 @@
   */
  public Map<Integer, ServerState> getReplicaStates()
  {
    monitorResponse.set(false);
    // publish Monitor Request Message to the Replication Server
    broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
@@ -593,7 +596,10 @@
    {
      synchronized (monitorResponse)
      {
        monitorResponse.wait(10000);
        if (monitorResponse.get() == false)
        {
          monitorResponse.wait(10000);
        }
      }
    } catch (InterruptedException e)
    {}
@@ -844,6 +850,7 @@
          // Notify the sender that the response was received.
          synchronized (monitorResponse)
          {
            monitorResponse.set(true);
            monitorResponse.notify();
          }
        }
@@ -1901,6 +1908,18 @@
    disableService();
    enableService();
    // wait for the domain to reconnect.
    int count = 0;
    while (!isConnected() && (count < 10))
    {
      try
      {
        Thread.sleep(100);
      } catch (InterruptedException e)
      {
      }
    }
    resetGenerationId(getGenerationID());
    // check that at least one ReplicationServer did change its generation-id
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -419,7 +419,7 @@
        openReplicationSession(DN.decode(TEST_ROOT_DN_STRING),  3,
                             100, replicationServerPort, 5000, state);
      assertTrue(broker.isConnected());
      assertTrue(broker.isConnected(), "Broker could not connect to RS");
      ReplicationMsg msg2 = broker.receive();
      broker.updateWindowAfterReplay();