mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.57.2014 667d7253a3873ed64dafbffe39d8a84a298c1fdc
Code cleanup:
- Increased MessageHandler encapsulation.
- Removed ServerHandler.closeSession() because it is adding unnecessary complexity in understanding calling sites.


MessageHandler.java:
Reduced class visibility to package private. Did the same for a few methods.
Made several fields private for better encapsulation.
Renamed getDomain() to setDomain().
In setBaseDNAndDomain(), changed the code to always set the ReplicationServerDomain, so it can never be null. Avoids any NPE + allows to remove null checks.

DataServerHandler.java:
In receiveNewStatus(), consequence of the change to MessageHandler: removed null check on replicationServerDomain.

ECLServerHandler.java, ReplicationServerHandler.java:
Consequence of the change to MessageHandler: removed null check on replicationServerDomain.

ServerHandler.java:
Consequence of the change to MessageHandler: removed null check on replicationServerDomain.
Removed closeSession() because it is adding unnecessary complexity in understanding calling sites.
In abortStart(), inlined the code of closeSession().
Removed the unneeded overriding methods for getInCount() and getOutCount().

ReplicationServer.java:
Consequence of removing ServerHandler.closeSession().
6 files modified
157 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java 46 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 97 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -654,7 +654,6 @@
   */
  public void receiveNewStatus(ChangeStatusMsg csMsg)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.processNewStatus(this, csMsg);
  }
}
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -838,7 +838,6 @@
   */
  private void registerIntoDomain()
  {
    if (replicationServerDomain != null)
      replicationServerDomain.registerHandler(this);
  }
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -60,7 +60,7 @@
 * Message are buffered into a queue.
 * Consumers are expected to come and consume the UpdateMsg from the queue.
 */
public class MessageHandler extends MonitorProvider<MonitorProviderCfg>
class MessageHandler extends MonitorProvider<MonitorProviderCfg>
{
  /**
@@ -88,11 +88,11 @@
  /**
   * Number of update sent to the server.
   */
  protected int outCount = 0;
  private int outCount = 0;
  /**
   * Number of updates received from the server.
   */
  protected int inCount = 0;
  private int inCount = 0;
  /**
   * Specifies the max queue size for this handler.
   */
@@ -100,7 +100,7 @@
  /**
   * Specifies the max queue size in bytes for this handler.
   */
  protected int maxQueueBytesSize = maxQueueSize * 100;
  private int maxQueueBytesSize = maxQueueSize * 100;
  /**
   * Specifies whether the consumer is following the producer (is not late).
   */
@@ -130,7 +130,7 @@
   *                  in memory by this ServerHandler.
   * @param replicationServer The hosting replication server.
   */
  public MessageHandler(int queueSize, ReplicationServer replicationServer)
  MessageHandler(int queueSize, ReplicationServer replicationServer)
  {
    this.maxQueueSize = queueSize;
    this.maxQueueBytesSize = queueSize * 100;
@@ -144,7 +144,7 @@
   * @param update The update that must be added to the list of updates of
   * this handler.
   */
  public void add(UpdateMsg update)
  void add(UpdateMsg update)
  {
    synchronized (msgQueue)
    {
@@ -153,7 +153,9 @@
       * waiting for some changes, wake it up
       */
      if (msgQueue.isEmpty())
      {
        msgQueue.notify();
      }
      msgQueue.add(update);
@@ -183,7 +185,7 @@
   * Set the shut down flag to true and returns the previous value of the flag.
   * @return The previous value of the shut down flag
   */
  public boolean engageShutdown()
  boolean engageShutdown()
  {
    return shuttingDown.getAndSet(true);
  }
@@ -192,7 +194,7 @@
   * Returns the shutdown flag.
   * @return The shutdown flag value.
   */
  public boolean shuttingDown()
  boolean shuttingDown()
  {
    return shuttingDown.get();
  }
@@ -202,9 +204,8 @@
   *
   * @param waitConnections     Waits for the Connections with other RS to
   *                            be established before returning.
   * @return The replication server domain.
   */
  public ReplicationServerDomain getDomain(boolean waitConnections)
  private void setDomain(boolean waitConnections)
  {
    if (replicationServerDomain == null)
    {
@@ -214,14 +215,13 @@
        replicationServer.waitConnections();
      }
    }
    return replicationServerDomain;
  }
  /**
   * Get the count of updates received from the server.
   * @return the count of update received from the server.
   */
  public int getInCount()
  int getInCount()
  {
    return inCount;
  }
@@ -375,11 +375,15 @@
            while (msgQueue.isEmpty() && following)
            {
              if (!synchronous)
              {
                return null;
              }
              msgQueue.wait(500);
              if (!activeConsumer)
              {
                return null;
            }
            }
          } catch (InterruptedException e)
          {
            return null;
@@ -478,7 +482,7 @@
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.
   */
  public int getOutCount()
  int getOutCount()
  {
    return outCount;
  }
@@ -545,7 +549,7 @@
  /**
   * Increase the counter of updates received from the server.
   */
  public void incrementInCount()
  void incrementInCount()
  {
    inCount++;
  }
@@ -553,14 +557,12 @@
  /**
   * Increase the counter of updates sent to the server.
   */
  public void incrementOutCount()
  void incrementOutCount()
  {
    outCount++;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
  throws ConfigException, InitializationException
@@ -619,8 +621,8 @@
    else
    {
      this.baseDN = baseDN;
      if (!baseDN.toNormalizedString().equals("cn=changelog"))
        this.replicationServerDomain = getDomain(isDataServer);
      setDomain(!"cn=changelog".equals(baseDN.toNormalizedString())
            && isDataServer);
    }
  }
@@ -645,7 +647,7 @@
   * @param msg the last update sent.
   * @return boolean indicating if the update was meaningful.
   */
  public boolean updateServerState(UpdateMsg msg)
  boolean updateServerState(UpdateMsg msg)
  {
    return serverState.update(msg.getCSN());
  }
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -261,7 +261,7 @@
          // We did not recognize the message, close session as what
          // can happen after is undetermined and we do not want the server to
          // be disturbed
          ServerHandler.closeSession(session, null, null);
          session.close();
          return;
        }
      }
@@ -275,10 +275,9 @@
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        if (!shutdown) {
          Message message =
            ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
          logError(message);
        if (!shutdown)
        {
          logError(ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()));
        }
      }
    }
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -756,7 +756,6 @@
  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
  throws DirectoryException, IOException
  {
    if (replicationServerDomain != null)
      replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
  }
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -27,7 +27,6 @@
package org.opends.server.replication.server;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Semaphore;
@@ -63,35 +62,6 @@
  private static final int SHUTDOWN_JOIN_TIMEOUT = 30000;
  /**
   * Close the session and log the provided error message
   * Log nothing if message is null.
   * @param providedSession The provided closing session.
   * @param providedMsg     The provided error message.
   * @param handler         The handler that manages that session.
   */
  static protected void closeSession(Session providedSession,
      Message providedMsg, ServerHandler handler)
  {
    if (providedMsg != null)
    {
      if (debugEnabled())
        TRACER.debugInfo("In " +
          ((handler != null) ? handler.toString() : "Replication Server") +
          " closing session with err=" + providedMsg);
      logError(providedMsg);
    }
    if (providedSession != null)
    {
      // This method is only called when aborting a failing handshake and
      // not StopMsg should be sent in such situation. StopMsg are only
      // expected when full handshake has been performed, or at end of
      // handshake phase 1, when DS was just gathering available RS info
      providedSession.close();
    }
  }
  /**
   * The serverId of the remote server.
   */
  protected int serverId;
@@ -243,7 +213,20 @@
    Session localSession = session;
    if (localSession != null)
    {
      closeSession(localSession, reason, this);
      if (reason != null)
      {
        if (debugEnabled())
        {
         TRACER.debugInfo("In " + this + " closing session with err=" + reason);
        }
        logError(reason);
      }
      // This method is only called when aborting a failing handshake and
      // not StopMsg should be sent in such situation. StopMsg are only
      // expected when full handshake has been performed, or at end of
      // handshake phase 1, when DS was just gathering available RS info
      localSession.close();
    }
    releaseDomainLock();
@@ -252,7 +235,7 @@
    // We may have changed it as it was -1 and we received a value >0 from peer
    // server and the last topo message sent may have failed being sent: in that
    // case retrieve old value of generation id for replication server domain
    if (oldGenerationId != -100 && replicationServerDomain != null)
    if (oldGenerationId != -100)
    {
      replicationServerDomain.changeGenerationId(oldGenerationId);
    }
@@ -263,7 +246,7 @@
   */
  protected void releaseDomainLock()
  {
    if (replicationServerDomain != null && replicationServerDomain.hasLock())
    if (replicationServerDomain.hasLock())
    {
      replicationServerDomain.release();
    }
@@ -333,8 +316,7 @@
      {
        final Message message =
            ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName());
        throw new DirectoryException(ResultCode.OTHER,
            message, e);
        throw new DirectoryException(ResultCode.OTHER, message, e);
      }
      reader.start();
      writer.start();
@@ -366,7 +348,7 @@
  public void send(ReplicationMsg msg) throws IOException
  {
    // avoid logging anything for unit tests that include a null domain.
    if (debugEnabled() && replicationServerDomain != null)
    if (debugEnabled())
    {
      TRACER.debugInfo("In "
          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
@@ -515,16 +497,6 @@
    return heartbeatInterval;
  }
  /**
   * Get the count of updates received from the server.
   * @return the count of update received from the server.
   */
  @Override
  public int getInCount()
  {
    return inCount;
  }
  /** {@inheritDoc} */
  @Override
  public List<Attribute> getMonitorData()
@@ -597,16 +569,6 @@
  public abstract String getMonitorInstanceName();
  /**
   * Get the count of updates sent to this server.
   * @return  The count of update sent to this server.
   */
  @Override
  public int getOutCount()
  {
    return outCount;
  }
  /**
   * Gets the protocol version used with this remote server.
   * @return The protocol version used with this remote server.
   */
@@ -714,9 +676,7 @@
    assuredSrSentUpdatesTimeout.incrementAndGet();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void initializeMonitorProvider(MonitorProviderCfg configuration)
  throws ConfigException, InitializationException
@@ -822,16 +782,11 @@
  public void lockDomainWithTimeout() throws DirectoryException,
      InterruptedException
  {
    if (replicationServerDomain == null)
    {
      return;
    }
    Random random = new Random();
    int randomTime = random.nextInt(6); // Random from 0 to 5
    final Random random = new Random();
    final int randomTime = random.nextInt(6); // Random from 0 to 5
    // Wait at least 3 seconds + (0 to 5 seconds)
    long timeout = 3000 + (randomTime * 1000);
    boolean lockAcquired = replicationServerDomain.tryLock(timeout);
    final long timeout = 3000 + (randomTime * 1000);
    final boolean lockAcquired = replicationServerDomain.tryLock(timeout);
    if (!lockAcquired)
    {
      Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
@@ -1197,7 +1152,6 @@
   */
  void processAck(AckMsg ack)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.processAck(ack, this);
  }
@@ -1207,9 +1161,7 @@
   */
  public long getReferenceGenId()
  {
    if (replicationServerDomain != null)
      return replicationServerDomain.getGenerationId();
    return -1;
  }
  /**
@@ -1218,7 +1170,6 @@
   */
  void processResetGenId(ResetGenerationIdMsg msg)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.resetGenerationId(this, msg);
  }
@@ -1230,7 +1181,6 @@
  public void put(UpdateMsg update) throws IOException
  {
    decAndCheckWindow();
    if (replicationServerDomain!=null)
      replicationServerDomain.put(update, this);
  }
@@ -1239,7 +1189,6 @@
   */
  public void doStop()
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.stopServer(this, false);
  }