mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.14.2014 4a5b26741a1d4e4c71b56817fd66cb0e16579c6e
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -634,7 +634,6 @@
   */
  public void receiveNewStatus(ChangeStatusMsg csMsg)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.processNewStatus(this, csMsg);
    replicationServerDomain.processNewStatus(this, csMsg);
  }
}
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -842,8 +842,7 @@
   */
  private void registerIntoDomain()
  {
    if (replicationServerDomain != null)
      replicationServerDomain.registerHandler(this);
    replicationServerDomain.registerHandler(this);
  }
  /**
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -59,7 +59,7 @@
 * LocalizableMessage are buffered into a queue.
 * Consumers are expected to come and consume the UpdateMsg from the queue.
 */
public class MessageHandler extends MonitorProvider<MonitorProviderCfg>
class MessageHandler extends MonitorProvider<MonitorProviderCfg>
{
  /** The logger of this class. */
  protected static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
@@ -84,11 +84,11 @@
  /**
   * Number of update sent to the server.
   */
  protected int outCount = 0;
  private int outCount = 0;
  /**
   * Number of updates received from the server.
   */
  protected int inCount = 0;
  private int inCount = 0;
  /**
   * Specifies the max queue size for this handler.
   */
@@ -96,7 +96,7 @@
  /**
   * Specifies the max queue size in bytes for this handler.
   */
  protected int maxQueueBytesSize = maxQueueSize * 100;
  private int maxQueueBytesSize = maxQueueSize * 100;
  /**
   * Specifies whether the consumer is following the producer (is not late).
   */
@@ -126,7 +126,7 @@
   *                  in memory by this ServerHandler.
   * @param replicationServer The hosting replication server.
   */
  public MessageHandler(int queueSize, ReplicationServer replicationServer)
  MessageHandler(int queueSize, ReplicationServer replicationServer)
  {
    this.maxQueueSize = queueSize;
    this.maxQueueBytesSize = queueSize * 100;
@@ -140,7 +140,7 @@
   * @param update The update that must be added to the list of updates of
   * this handler.
   */
  public void add(UpdateMsg update)
  void add(UpdateMsg update)
  {
    synchronized (msgQueue)
    {
@@ -149,7 +149,9 @@
       * waiting for some changes, wake it up
       */
      if (msgQueue.isEmpty())
      {
        msgQueue.notify();
      }
      msgQueue.add(update);
@@ -179,7 +181,7 @@
   * Set the shut down flag to true and returns the previous value of the flag.
   * @return The previous value of the shut down flag
   */
  public boolean engageShutdown()
  boolean engageShutdown()
  {
    return shuttingDown.getAndSet(true);
  }
@@ -188,7 +190,7 @@
   * Returns the shutdown flag.
   * @return The shutdown flag value.
   */
  public boolean shuttingDown()
  boolean shuttingDown()
  {
    return shuttingDown.get();
  }
@@ -198,9 +200,8 @@
   *
   * @param waitConnections     Waits for the Connections with other RS to
   *                            be established before returning.
   * @return The replication server domain.
   */
  public ReplicationServerDomain getDomain(boolean waitConnections)
  private void setDomain(boolean waitConnections)
  {
    if (replicationServerDomain == null)
    {
@@ -211,14 +212,13 @@
        replicationServer.waitConnections();
      }
    }
    return replicationServerDomain;
  }
  /**
   * Get the count of updates received from the server.
   * @return the count of update received from the server.
   */
  public int getInCount()
  int getInCount()
  {
    return inCount;
  }
@@ -372,10 +372,14 @@
            while (msgQueue.isEmpty() && following)
            {
              if (!synchronous)
              {
                return null;
              }
              msgQueue.wait(500);
              if (!activeConsumer)
              {
                return null;
              }
            }
          } catch (InterruptedException e)
          {
@@ -475,7 +479,7 @@
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.
   */
  public int getOutCount()
  int getOutCount()
  {
    return outCount;
  }
@@ -542,7 +546,7 @@
  /**
   * Increase the counter of updates received from the server.
   */
  public void incrementInCount()
  void incrementInCount()
  {
    inCount++;
  }
@@ -550,14 +554,12 @@
  /**
   * Increase the counter of updates sent to the server.
   */
  public void incrementOutCount()
  void incrementOutCount()
  {
    outCount++;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
  throws ConfigException, InitializationException
@@ -616,10 +618,8 @@
    else
    {
      this.baseDN = baseDN;
      if (!baseDN.toNormalizedString().equals("cn=changelog"))
      {
        getDomain(isDataServer);
      }
      setDomain(!"cn=changelog".equals(baseDN.toNormalizedString())
          && isDataServer);
    }
  }
@@ -644,7 +644,7 @@
   * @param msg the last update sent.
   * @return boolean indicating if the update was meaningful.
   */
  public boolean updateServerState(UpdateMsg msg)
  boolean updateServerState(UpdateMsg msg)
  {
    return serverState.update(msg.getCSN());
  }
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -256,7 +256,7 @@
          // We did not recognize the message, close session as what
          // can happen after is undetermined and we do not want the server to
          // be disturbed
          ServerHandler.closeSession(session, null, null);
          session.close();
          return;
        }
      }
@@ -267,7 +267,8 @@
        // Just log debug information and loop.
        // Do not log the message during shutdown.
        logger.traceException(e);
        if (!shutdown) {
        if (!shutdown)
        {
          logger.error(ERR_EXCEPTION_LISTENING, e.getLocalizedMessage());
        }
      }
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -732,8 +732,7 @@
  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
  throws DirectoryException, IOException
  {
    if (replicationServerDomain != null)
      replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
    replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
  }
}
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -27,7 +27,6 @@
package org.opends.server.replication.server;
import java.io.IOException;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import java.util.List;
@@ -67,34 +66,6 @@
  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
  /**
   * Close the session and log the provided error message
   * Log nothing if message is null.
   * @param providedSession The provided closing session.
   * @param providedMsg     The provided error message.
   * @param handler         The handler that manages that session.
   */
  static protected void closeSession(Session providedSession,
      LocalizableMessage providedMsg, ServerHandler handler)
  {
    if (providedMsg != null)
    {
      if (logger.isTraceEnabled())
        logger.trace("In %s closing session with err=%s",
            ((handler != null) ? handler : "Replication Server"), providedMsg);
      logger.error(providedMsg);
    }
    if (providedSession != null)
    {
      // This method is only called when aborting a failing handshake and
      // not StopMsg should be sent in such situation. StopMsg are only
      // expected when full handshake has been performed, or at end of
      // handshake phase 1, when DS was just gathering available RS info
      providedSession.close();
    }
  }
  /**
   * The serverId of the remote server.
   */
  protected int serverId;
@@ -246,7 +217,20 @@
    Session localSession = session;
    if (localSession != null)
    {
      closeSession(localSession, reason, this);
      if (reason != null)
      {
        if (logger.isTraceEnabled())
        {
         logger.trace("In " + this + " closing session with err=" + reason);
        }
        logger.error(reason);
      }
      // This method is only called when aborting a failing handshake and
      // not StopMsg should be sent in such situation. StopMsg are only
      // expected when full handshake has been performed, or at end of
      // handshake phase 1, when DS was just gathering available RS info
      localSession.close();
    }
    releaseDomainLock();
@@ -255,7 +239,7 @@
    // We may have changed it as it was -1 and we received a value >0 from peer
    // server and the last topo message sent may have failed being sent: in that
    // case retrieve old value of generation id for replication server domain
    if (oldGenerationId != -100 && replicationServerDomain != null)
    if (oldGenerationId != -100)
    {
      replicationServerDomain.changeGenerationId(oldGenerationId);
    }
@@ -266,7 +250,7 @@
   */
  protected void releaseDomainLock()
  {
    if (replicationServerDomain != null && replicationServerDomain.hasLock())
    if (replicationServerDomain.hasLock())
    {
      replicationServerDomain.release();
    }
@@ -336,8 +320,7 @@
      {
        final LocalizableMessage message =
            ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName());
        throw new DirectoryException(ResultCode.OTHER,
            message, e);
        throw new DirectoryException(ResultCode.OTHER, message, e);
      }
      reader.start();
      writer.start();
@@ -369,7 +352,7 @@
  public void send(ReplicationMsg msg) throws IOException
  {
    // avoid logging anything for unit tests that include a null domain.
    if (logger.isTraceEnabled() && replicationServerDomain != null)
    if (logger.isTraceEnabled())
    {
      logger.trace("In "
          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
@@ -518,16 +501,6 @@
    return heartbeatInterval;
  }
  /**
   * Get the count of updates received from the server.
   * @return the count of update received from the server.
   */
  @Override
  public int getInCount()
  {
    return inCount;
  }
  /** {@inheritDoc} */
  @Override
  public List<Attribute> getMonitorData()
@@ -600,16 +573,6 @@
  public abstract String getMonitorInstanceName();
  /**
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.
   */
  @Override
  public int getOutCount()
  {
    return outCount;
  }
  /**
   * Gets the protocol version used with this remote server.
   * @return The protocol version used with this remote server.
   */
@@ -717,9 +680,7 @@
    assuredSrSentUpdatesTimeout.incrementAndGet();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
  throws ConfigException, InitializationException
@@ -825,16 +786,11 @@
  public void lockDomainWithTimeout() throws DirectoryException,
      InterruptedException
  {
    if (replicationServerDomain == null)
    {
      return;
    }
    Random random = new Random();
    int randomTime = random.nextInt(6); // Random from 0 to 5
    final Random random = new Random();
    final int randomTime = random.nextInt(6); // Random from 0 to 5
    // Wait at least 3 seconds + (0 to 5 seconds)
    long timeout = 3000 + (randomTime * 1000);
    boolean lockAcquired = replicationServerDomain.tryLock(timeout);
    final long timeout = 3000 + (randomTime * 1000);
    final boolean lockAcquired = replicationServerDomain.tryLock(timeout);
    if (!lockAcquired)
    {
      LocalizableMessage message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
@@ -1200,8 +1156,7 @@
   */
  void processAck(AckMsg ack)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.processAck(ack, this);
    replicationServerDomain.processAck(ack, this);
  }
  /**
@@ -1210,9 +1165,7 @@
   */
  public long getReferenceGenId()
  {
    if (replicationServerDomain != null)
      return replicationServerDomain.getGenerationId();
    return -1;
    return replicationServerDomain.getGenerationId();
  }
  /**
@@ -1221,8 +1174,7 @@
   */
  void processResetGenId(ResetGenerationIdMsg msg)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.resetGenerationId(this, msg);
    replicationServerDomain.resetGenerationId(this, msg);
  }
  /**
@@ -1233,8 +1185,7 @@
  public void put(UpdateMsg update) throws IOException
  {
    decAndCheckWindow();
    if (replicationServerDomain!=null)
      replicationServerDomain.put(update, this);
    replicationServerDomain.put(update, this);
  }
  /**
@@ -1242,8 +1193,7 @@
   */
  public void doStop()
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.stopServer(this, false);
    replicationServerDomain.stopServer(this, false);
  }
  /**