mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
23.13.2009 df0e36ba23d4992009b1b694bb5cb37ba9587836
Remove dead code in the replication without any functionality change.
10 files modified
334 ■■■■■ changed files
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java 16 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 29 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 15 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java 134 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 52 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 34 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 3 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 32 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -26,13 +26,9 @@
 */
package org.opends.server.replication.common;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.util.Iterator;
import java.util.TreeMap;
import org.opends.server.loggers.debug.DebugTracer;
/**
 * This object is used to store a list of ServerState object, one by
@@ -42,11 +38,6 @@
public class MultiDomainServerState implements Iterable<String>
{
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * The list of (domain service id, ServerState).
   */
  private TreeMap<String, ServerState> list;
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -70,10 +70,6 @@
   * @param serverId The serverId of the server for which the ServerStartMsg
   *                 is created.
   * @param baseDn   The base DN.
   * @param maxReceiveDelay The max receive delay for this server.
   * @param maxReceiveQueue The max receive Queue for this server.
   * @param maxSendDelay The max Send Delay from this server.
   * @param maxSendQueue The max send Queue from this server.
   * @param windowSize   The window size used by this server.
   * @param heartbeatInterval The requested heartbeat interval.
   * @param serverState  The state of this server.
@@ -83,9 +79,7 @@
   *                      after the start messages have been exchanged.
   * @param groupId The group id of the DS for this DN
   */
  public ServerStartMsg(short serverId, String baseDn, int maxReceiveDelay,
                            int maxReceiveQueue, int maxSendDelay,
                            int maxSendQueue, int windowSize,
  public ServerStartMsg(short serverId, String baseDn, int windowSize,
                            long heartbeatInterval,
                            ServerState serverState,
                            short protocolVersion,
@@ -97,10 +91,10 @@
    this.serverId = serverId;
    this.baseDn = baseDn;
    this.maxReceiveDelay = maxReceiveDelay;
    this.maxReceiveQueue = maxReceiveQueue;
    this.maxSendDelay = maxSendDelay;
    this.maxSendQueue = maxSendQueue;
    this.maxReceiveDelay = 0;
    this.maxReceiveQueue = 0;
    this.maxSendDelay = 0;
    this.maxSendQueue = 0;
    this.windowSize = windowSize;
    this.heartbeatInterval = heartbeatInterval;
    this.sslEncryption = sslEncryption;
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -415,10 +415,6 @@
    serverId = serverStartMsg.getServerId();
    serverURL = serverStartMsg.getServerURL();
    groupId = serverStartMsg.getGroupId();
    maxReceiveDelay = serverStartMsg.getMaxReceiveDelay();
    maxReceiveQueue = serverStartMsg.getMaxReceiveQueue();
    maxSendDelay = serverStartMsg.getMaxSendDelay();
    maxSendQueue = serverStartMsg.getMaxSendQueue();
    heartbeatInterval = serverStartMsg.getHeartbeatInterval();
    // generic stuff
@@ -426,31 +422,6 @@
    setInitialServerState(serverStartMsg.getServerState());
    setSendWindowSize(serverStartMsg.getWindowSize());
    if (maxReceiveQueue > 0)
      restartReceiveQueue = (maxReceiveQueue > 1000 ? maxReceiveQueue -
          200 : maxReceiveQueue * 8 / 10);
    else
      restartReceiveQueue = 0;
    if (maxSendQueue > 0)
      restartSendQueue =
        (maxSendQueue > 1000 ? maxSendQueue - 200 : maxSendQueue * 8 /
            10);
    else
      restartSendQueue = 0;
    if (maxReceiveDelay > 0)
      restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay - 1
          : maxReceiveDelay);
    else
      restartReceiveDelay = 0;
    if (maxSendDelay > 0)
      restartSendDelay =
        (maxSendDelay > 10 ? maxSendDelay - 1 : maxSendDelay);
    else
      restartSendDelay = 0;
    if (heartbeatInterval < 0)
    {
      heartbeatInterval = 0;
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -27,7 +27,6 @@
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import java.io.IOException;
@@ -629,19 +628,7 @@
    attributes.add(Attributes.create("External-Changelog-Server",
        serverURL));
    try
    {
      MonitorData md;
      md = replicationServerDomain.computeMonitorData();
      // FIXME:ECL No monitoring exist for ECL.
    }
    catch (Exception e)
    {
      Message message =
        ERR_ERROR_RETRIEVING_MONITOR_DATA.get(stackTraceToSingleLineString(e));
      // We failed retrieving the monitor data.
      attributes.add(Attributes.create("error", message.toString()));
    }
    // FIXME:ECL No monitoring exist for ECL.
    return attributes;
  }
  /**
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -100,22 +100,6 @@
   */
  protected int inCount = 0;
  /**
   * Specifies the max receive queue for this handler.
   */
  protected int maxReceiveQueue = 0;
  /**
   * Specifies the max send queue for this handler.
   */
  protected int maxSendQueue = 0;
  /**
   * Specifies the max receive delay for this handler.
   */
  protected int maxReceiveDelay = 0;
  /**
   * Specifies the max send delay for this handler.
   */
  protected int maxSendDelay = 0;
  /**
   * Specifies the max queue size for this handler.
   */
  protected int maxQueueSize = 5000;
@@ -124,22 +108,6 @@
   */
  protected int maxQueueBytesSize = maxQueueSize * 100;
  /**
   * Specifies the max restart receive queue for this handler.
   */
  protected int restartReceiveQueue;
  /**
   * Specifies the max restart send queue for this handler.
   */
  protected int restartSendQueue;
  /**
   * Specifies the max restart receive delay for this handler.
   */
  protected int restartReceiveDelay;
  /**
   * Specifies the max restart send delay for this handler.
   */
  protected int restartSendDelay;
  /**
   * Specifies whether the consumer is following the producer (is not late).
   */
  protected boolean following = false;
@@ -152,11 +120,6 @@
   */
  private String serviceId = null;
  /**
   * Specifies whether the server is flow controlled and should be stopped from
   * sending messages.
   */
  protected boolean flowControl = false;
  /**
   * Specifies whether the consumer is still active or not.
   * If not active, the handler will not return any message.
   * Called at the beginning of shutdown process.
@@ -220,12 +183,6 @@
        msgQueue.removeFirst();
      }
    }
    if (isSaturated(update.getChangeNumber(), sourceHandler))
    {
      sourceHandler.setSaturated(true);
    }
  }
  /**
   * Set the shut down flag to true and returns the previous value of the flag.
@@ -708,92 +665,6 @@
  }
  /**
   * Check is this server is saturated (this server has already been
   * sent a bunch of updates and has not processed them so they are staying
   * in the message queue for this server an the size of the queue
   * for this server is above the configured limit.
   *
   * The limit can be defined in number of updates or with a maximum delay
   *
   * @param changeNumber The changenumber to use to make the delay calculations.
   * @param sourceHandler The ServerHandler which is sending the update.
   * @return true is saturated false if not saturated.
   */
  public boolean isSaturated(ChangeNumber changeNumber,
      MessageHandler sourceHandler)
  {
    synchronized (msgQueue)
    {
      int size = msgQueue.count();
      if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue))
        return true;
      if ((sourceHandler.maxSendQueue > 0) &&
          (size >= sourceHandler.maxSendQueue))
        return true;
      if (!msgQueue.isEmpty())
      {
        UpdateMsg firstUpdate = msgQueue.first();
        if (firstUpdate != null)
        {
          long timeDiff = changeNumber.getTimeSec() -
          firstUpdate.getChangeNumber().getTimeSec();
          if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay))
            return true;
          if ((sourceHandler.maxSendDelay > 0) &&
              (timeDiff >= sourceHandler.maxSendDelay))
            return true;
        }
      }
      return false;
    }
  }
  /**
   * Check that the size of the Server Handler messages Queue has lowered
   * below the limit and therefore allowing the reception of messages
   * from other servers to restart.
   * @param source The ServerHandler which was sending the update.
   *        can be null.
   * @return true if the processing can restart
   */
  public boolean restartAfterSaturation(MessageHandler source)
  {
    synchronized (msgQueue)
    {
      int queueSize = msgQueue.count();
      if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue))
        return false;
      if ((source != null) && (source.maxSendQueue > 0) &&
          (queueSize >= source.restartSendQueue))
        return false;
      if (!msgQueue.isEmpty())
      {
        UpdateMsg firstUpdate = msgQueue.first();
        UpdateMsg lastUpdate = msgQueue.last();
        if ((firstUpdate != null) && (lastUpdate != null))
        {
          long timeDiff = lastUpdate.getChangeNumber().getTimeSec() -
          firstUpdate.getChangeNumber().getTimeSec();
          if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay))
            return false;
          if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >=
            source.restartSendDelay))
            return false;
        }
      }
    }
    return true;
  }
  /**
   * Set that the consumer is now becoming inactive and thus getNextMessage
   * should not return any UpdateMsg any more.
   * @param active the provided state of the consumer.
@@ -812,11 +683,6 @@
    this.following = following;
  }
  private void setSaturated(boolean value)
  {
    flowControl = value;
  }
  /**
   * Set the initial value of the serverState for this handler.
   * Expected to be done once, then the state will be updated using
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -98,7 +98,6 @@
 */
public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
{
  private final Object flowControlLock = new Object();
  private final String baseDn;
  // The Status analyzer that periodically verifis if the connected DSs are
  // late or not
@@ -1293,16 +1292,9 @@
     * So this methods simply need to check that dependencies are OK
     * and update this replicaId RUV
     *
     *  TODO : dependency  :
     *  before forwarding change, we should check that the dependency
     *  that is indicated in this change is OK (change already in the RUV)
     */
    msg = handler.take();
    synchronized (flowControlLock)
    {
      if (handler.restartAfterSaturation(null))
        flowControlLock.notifyAll();
    }
    return msg;
  }
@@ -1664,7 +1656,7 @@
        } catch (IOException ioe)
        {
          /*
           * An error happened trying the send a routabled message
           * An error happened trying the send a routable message
           * to its destination server.
           * Send back an error to the originator of the message.
           */
@@ -1754,46 +1746,6 @@
  }
  /**
   * Check if some server Handler should be removed from flow control state.
   * @throws IOException If an error happened.
   */
  public void checkAllSaturation() throws IOException
  {
    for (ReplicationServerHandler handler : replicationServers.values())
    {
      handler.checkWindow();
    }
    for (DataServerHandler handler : directoryServers.values())
    {
      handler.checkWindow();
    }
  }
  /**
   * Check if a server that was in flow control can now restart
   * sending updates.
   * @param sourceHandler The server that must be checked.
   * @return true if the server can restart sending changes.
   *         false if the server can't restart sending changes.
   */
  public boolean restartAfterSaturation(MessageHandler sourceHandler)
  {
    for (MessageHandler handler : replicationServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
        return false;
    }
    for (MessageHandler handler : directoryServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
        return false;
    }
    return true;
  }
  /**
   * Send a TopologyMsg to all the connected directory servers in order to
   * let.
   * them know the topology (every known DSs and RSs)
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -184,8 +184,6 @@
   */
  int sendWindowSize;
  private int saturationCount = 0;
  /**
   * The protocol version established with the remote server.
   */
@@ -315,19 +313,9 @@
  {
    if (rcvWindow < rcvWindowSizeHalf)
    {
      if (flowControl)
      {
        if (replicationServerDomain.restartAfterSaturation(this))
        {
          flowControl = false;
        }
      }
      if (!flowControl)
      {
        WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
        session.publish(msg);
        rcvWindow += rcvWindowSizeHalf;
      }
      WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
      session.publish(msg);
      rcvWindow += rcvWindowSizeHalf;
    }
  }
@@ -1183,22 +1171,6 @@
    boolean interrupted = true;
    UpdateMsg msg = getnextMessage(true); // synchronous:block until msg
    /*
     * When we remove a message from the queue we need to check if another
     * server is waiting in flow control because this queue was too long.
     * This check might cause a performance penalty an therefore it
     * is not done for every message removed but only every few messages.
     */
    if (++saturationCount > 10)
    {
      saturationCount = 0;
      try
      {
        replicationServerDomain.checkAllSaturation();
      } catch (IOException e)
      {
      }
    }
    boolean acquired = false;
    do
    {
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -731,8 +731,7 @@
      /*
       * Send our ServerStartMsg.
       */
      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId,
          baseDn, 0, 0, 0, 0,
      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId, baseDn,
        maxRcvWindow, heartbeatInterval, state,
        ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
        isSslEncryption,
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -568,18 +568,18 @@
         throws Exception
  {
    AckMsg msg1, msg2 ;
    // Consctructor test (with ChangeNumber)
    // Chech that retrieved CN is OK
    msg1 = new  AckMsg(cn);
    assertEquals(msg1.getChangeNumber().compareTo(cn), 0);
    // Check default values for error info
    assertFalse(msg1.hasTimeout());
    assertFalse(msg1.hasWrongStatus());
    assertFalse(msg1.hasReplayError());
    assertTrue(msg1.getFailedServers().size() == 0);
    // Check constructor with error info
    msg1 = new  AckMsg(cn, hasTimeout, hasWrongStatus, hasReplayError, failedServers);
    assertEquals(msg1.getChangeNumber().compareTo(cn), 0);
@@ -587,7 +587,7 @@
    assertTrue(msg1.hasWrongStatus() == hasWrongStatus);
    assertTrue(msg1.hasReplayError() == hasReplayError);
    assertEquals(msg1.getFailedServers(), failedServers);
    // Consctructor test (with byte[])
    msg2 = new  AckMsg(msg1.getBytes());
    assertEquals(msg2.getChangeNumber().compareTo(cn), 0);
@@ -595,7 +595,7 @@
    assertTrue(msg1.hasWrongStatus() == msg2.hasWrongStatus());
    assertTrue(msg1.hasReplayError() == msg2.hasReplayError());
    assertEquals(msg1.getFailedServers(), msg2.getFailedServers());
    // Check invalid bytes for constructor
    byte[] b = msg1.getBytes();
    b[0] = ReplicationMsg.MSG_TYPE_ADD;
@@ -609,7 +609,7 @@
    {
      assertTrue(true);
    }
    // Check that retrieved CN is OK
    msg2 = (AckMsg) ReplicationMsg.generateMsg(msg1.getBytes());
  }
@@ -632,11 +632,11 @@
    String serviceId = "serviceid";
    // create a cookie
    MultiDomainServerState cookie =
    MultiDomainServerState cookie =
      new MultiDomainServerState(
          "o=test:000001210b6f21e904b100000001 000001210b6f21e904b200000001;" +
          "o=test2:000001210b6f21e904b100000002 000001210b6f21e904b200000002;");
    // Constructor test
    ECLUpdateMsg msg1 = new ECLUpdateMsg(delmsg, cookie, serviceId);
    assertTrue(msg1.getCookie().equalsTo(cookie));
@@ -679,16 +679,12 @@
  public void serverStartMsgTest(short serverId, String baseDN, int window,
         ServerState state, long genId, boolean sslEncryption, byte groupId) throws Exception
  {
    ServerStartMsg msg = new ServerStartMsg(serverId, baseDN,
        window, window, window, window, window, window, state,
    ServerStartMsg msg = new ServerStartMsg(
        serverId, baseDN, window, window, state,
        ProtocolVersion.getCurrentVersion(), genId, sslEncryption, groupId);
    ServerStartMsg newMsg = new ServerStartMsg(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
    assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay());
    assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue());
    assertEquals(msg.getMaxSendDelay(), newMsg.getMaxSendDelay());
    assertEquals(msg.getMaxSendQueue(), newMsg.getMaxSendQueue());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
    assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval());
    assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption());
@@ -990,7 +986,7 @@
    // LS2 state
    ServerState s2 = new ServerState();
    short sid2 = 222;
    Long now = TimeThread.getTime();
    Long now = ((Integer)10).longValue();
    ChangeNumber cn2 = new ChangeNumber(now,
                                       (short) 123, sid2);
    s2.update(cn2);
@@ -1167,7 +1163,7 @@
    UpdateMsg newMsg = new UpdateMsg(msg.getBytes());
    assertEquals(test.getBytes(), newMsg.getPayload());
  }
  /**
   * Test that ServerStartMsg encoding and decoding works
   * by checking that : msg == new ServerStartMsg(msg.getBytes()).
@@ -1207,8 +1203,8 @@
    ServerState state = new ServerState();
    assertTrue(state.update(new ChangeNumber((long)75, 5,(short)263)));
    short mode = 3;
    int firstDraftChangeNumber = 13;
    int lastDraftChangeNumber  = 14;
    int firstDraftChangeNumber = 13;
    int lastDraftChangeNumber  = 14;
    String myopid = new String("fakeopid");
    // create original
    StartECLSessionMsg msg = new StartECLSessionMsg();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -48,15 +48,11 @@
import java.util.TreeSet;
import java.util.UUID;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.loggers.ErrorLogger;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -1004,7 +1000,7 @@
      // send a ServerStartMsg with an empty ServerState.
      ServerStartMsg msg =
        new ServerStartMsg((short) 1723, TEST_ROOT_DN_STRING,
            0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(),
            WINDOW, (long) 5000, new ServerState(),
            ProtocolVersion.getCurrentVersion(), 0, sslEncryption, (byte)-1);
      session.publish(msg);
@@ -1050,7 +1046,7 @@
      DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
      msg = new ServerStartMsg(
          (short) 1724, TEST_ROOT_DN_STRING,
          0, 0, 0, 0, WINDOW, (long) 5000, replServerState,
          WINDOW, (long) 5000, replServerState,
          ProtocolVersion.getCurrentVersion(),
          ReplicationTestCase.getGenerationId(baseDn),
          sslEncryption, (byte)10);
@@ -1221,7 +1217,7 @@
          new DeleteMsg("o=example," + TEST_ROOT_DN_STRING, gen.newChangeNumber(),
              "uid");
        broker.publish(msg);
        if ((count % 10) == 0)
        debugInfo("BrokerWriter " + broker.getServerId() + "  sent="+count);
      }