mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.24.2013 cefb0cd556858b9818a25be27ced11cd4458c805
First step towards simplifying the ReplicationBroker class.

ReplicationBroker.java:
Changed monitorResponse from MutableBoolean to AtomicBoolean.
Replaced fields rsGroupId, rsServerId, rsServerUrl, replicationServer and connected by a single connectedRS + added immutable inner class ConnectedRS.
Renamed updateRSInfoLocallyConfiguredStatus() to setLocallyConfiguredFlag().
In receiveTopo(), used Collections.retainAll().

MutableBoolean.java: REMOVED
1 files deleted
1 files modified
436 ■■■■ changed files
opends/src/server/org/opends/server/replication/common/MutableBoolean.java 66 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 370 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/MutableBoolean.java
File was deleted
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -36,9 +36,11 @@
import java.net.SocketTimeoutException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -67,18 +69,95 @@
{
  /**
   * Immutable class containing information about whether the broker is
   * connected to an RS and data associated to this connected RS.
   * <p>
   * Mutable methods return a new version of this object copying the data that
   * did not change.
   */
  // @Immutable
  private static final class ConnectedRS
  {
    private final String replicationServer;
    /** The info of the RS we are connected to. */
    private final ReplicationServerInfo rsInfo;
    private final boolean connected;
    private ConnectedRS(boolean connected, ReplicationServerInfo rsInfo,
        String replicationServer)
    {
      this.connected = connected;
      this.rsInfo = rsInfo;
      this.replicationServer = replicationServer;
    }
    private static ConnectedRS stopped()
    {
      return new ConnectedRS(false, null, "stopped");
    }
    private static ConnectedRS noConnectedRS()
    {
      return new ConnectedRS(false, null, NO_CONNECTED_SERVER);
    }
    /**
     * Returns a new version of the current object with the connected status set
     * to true.
     */
    private ConnectedRS setConnected()
    {
      return new ConnectedRS(true, rsInfo, replicationServer);
    }
    public int getServerId()
    {
      return rsInfo != null ? rsInfo.getServerId() : -1;
    }
    private byte getGroupId()
    {
      return rsInfo != null ? rsInfo.getGroupId() : -1;
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      final StringBuilder sb = new StringBuilder();
      toString(sb);
      return sb.toString();
    }
    public void toString(StringBuilder sb)
    {
      sb.append("connected=").append(connected).append(", ");
      if (rsInfo == null) // this is a null object
      {
        sb.append("no connected RS");
      }
      else
      {
        sb.append("connected RS(serverId=").append(rsInfo.getServerId())
          .append(", serverUrl=").append(rsInfo.getServerURL())
          .append(", groupId=").append(rsInfo.getGroupId())
          .append(")");
      }
    }
  }
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private volatile boolean shutdown = false;
  private final Object startStopLock = new Object();
  private volatile ReplicationDomainCfg config;
  private volatile boolean connected = false;
  /**
   * String reported under CSN=monitor when there is no connected RS.
   */
  public final static String NO_CONNECTED_SERVER = "Not connected";
  private volatile String replicationServer = NO_CONNECTED_SERVER;
  private volatile Session session;
  private final ServerState state;
  private Semaphore sendWindow;
@@ -88,19 +167,15 @@
  private int timeout = 0;
  private short protocolVersion;
  private ReplSessionSecurity replSessionSecurity;
  /** The group id of the RS we are connected to. */
  private byte rsGroupId = -1;
  /** The server id of the RS we are connected to. */
  private int rsServerId = -1;
  /** The server URL of the RS we are connected to. */
  private String rsServerUrl;
  private final AtomicReference<ConnectedRS> connectedRS =
      new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
  /** Our replication domain. */
  private ReplicationDomain domain;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
   */
  private final MutableBoolean monitorResponse = new MutableBoolean(false);
  private final AtomicBoolean monitorResponse = new AtomicBoolean(false);
  /**
   * A Map containing the ServerStates of all the replicas in the topology
   * as seen by the ReplicationServer the last time it was polled or the last
@@ -217,7 +292,7 @@
    {
      shutdown = false;
      this.rcvWindow = getMaxRcvWindow();
      connect();
      connect(connectedRS.get());
    }
  }
@@ -227,7 +302,7 @@
   */
  public byte getRsGroupId()
  {
    return rsGroupId;
    return connectedRS.get().getGroupId();
  }
  /**
@@ -236,7 +311,7 @@
   */
  public int getRsServerId()
  {
    return rsServerId;
    return connectedRS.get().getServerId();
  }
  /**
@@ -287,20 +362,11 @@
  }
  /**
   * Gets the server url of the RS we are connected to.
   * @return The server url of the RS we are connected to
   */
  public String getRsServerUrl()
  {
    return rsServerUrl;
  }
  /**
   * Sets the locally configured flag for the passed ReplicationServerInfo
   * object, analyzing the local configuration.
   * @param rsInfo the Replication server to check and update
   */
  private void updateRSInfoLocallyConfiguredStatus(ReplicationServerInfo rsInfo)
  private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo)
  {
    // Determine if the passed ReplicationServerInfo has a URL that is present
    // in the locally configured replication servers
@@ -678,13 +744,14 @@
    }
  }
  private void connect()
  private void connect(ConnectedRS rs)
  {
    if (getBaseDN().toNormalizedString().equalsIgnoreCase(
        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
    {
      connectAsECL();
    } else
      connectAsECL(rs);
    }
    else
    {
      connectAsDataServer();
    }
@@ -697,8 +764,8 @@
   */
  private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
  {
    Map<Integer, ReplicationServerInfo> rsInfos =
      new ConcurrentHashMap<Integer, ReplicationServerInfo>();
    final Map<Integer, ReplicationServerInfo> rsInfos =
        new ConcurrentSkipListMap<Integer, ReplicationServerInfo>();
    for (String serverUrl : getReplicationServerUrls())
    {
@@ -732,14 +799,13 @@
   * </li>
   * </ul>
   */
  private void connectAsECL()
  private void connectAsECL(ConnectedRS rs)
  {
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    String bestServer = getReplicationServerUrls().iterator().next();
    final String bestServer = getReplicationServerUrls().iterator().next();
    if (performPhaseOneHandshake(bestServer, true, true) != null)
    {
      performECLPhaseTwoHandshake(bestServer);
      performECLPhaseTwoHandshake(bestServer, rs);
    }
  }
@@ -808,14 +874,16 @@
      // Get info from every available replication servers
      replicationServerInfos = collectReplicationServersInfo();
      ReplicationServerInfo electedRsInfo = null;
      if (replicationServerInfos.size() > 0)
      if (replicationServerInfos.isEmpty())
      {
        connectedRS.set(ConnectedRS.noConnectedRS());
      }
      else
      {
        // At least one server answered, find the best one.
        RSEvaluations evals = computeBestReplicationServer(true, -1, state,
            replicationServerInfos, serverId, getGroupId(), getGenerationID());
        electedRsInfo = evals.getBestRS();
        ReplicationServerInfo electedRsInfo = evals.getBestRS();
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
@@ -850,43 +918,33 @@
          {
            connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
          } // Could perform handshake phase 2 with best
        } // Could perform handshake phase 1 with best
      }
      } // Reached some servers
      // connected is set by connectToReplicationServer()
      // and electedRsInfo isn't null then. Check anyway
      if (electedRsInfo != null && connected)
      final ConnectedRS rs = connectedRS.get();
      if (rs.connected)
      {
        connectPhaseLock.notify();
        if ((electedRsInfo.getGenerationId() == getGenerationID())
            || (electedRsInfo.getGenerationId() == -1))
        final long rsGenId = rs.rsInfo.getGenerationId();
        final int rsServerId = rs.rsInfo.getServerId();
        if (rsGenId == getGenerationID() || rsGenId == -1)
        {
          Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
              .get(serverId, rsServerId, baseDN.toNormalizedString(),
                  session.getReadableRemoteAddress(),
                  getGenerationID());
          logError(message);
        } else
        {
          Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
              .get(serverId, rsServerId, baseDN.toNormalizedString(),
                  session.getReadableRemoteAddress(),
                  getGenerationID(),
                  electedRsInfo.getGenerationId());
          logError(message);
          logError(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
              serverId, rsServerId, baseDN.toNormalizedString(),
              session.getReadableRemoteAddress(), getGenerationID()));
        }
      } else
        else
        {
          logError(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
              serverId, rsServerId, baseDN.toNormalizedString(),
              session.getReadableRemoteAddress(), getGenerationID(), rsGenId));
        }
      }
      else
      {
        /*
         * This server could not find any replicationServer. It's going to start
         * in degraded mode. Log a message.
         */
        connected = false;
        replicationServer = NO_CONNECTED_SERVER;
         // This server could not find any replicationServer.
         // It's going to start in degraded mode. Log a message.
        if (!connectionError)
        {
          connectionError = true;
@@ -894,16 +952,14 @@
          if (replicationServerInfos.size() > 0)
          {
            Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
            logError(WARN_COULD_NOT_FIND_CHANGELOG.get(
                serverId, baseDN.toNormalizedString(),
                collectionToString(replicationServerInfos.keySet(), ", "));
            logError(message);
                collectionToString(replicationServerInfos.keySet(), ", ")));
          }
          else
          {
            Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
                serverId, baseDN.toNormalizedString());
            logError(message);
            logError(WARN_NO_AVAILABLE_CHANGELOGS.get(
                serverId, baseDN.toNormalizedString()));
          }
        }
      }
@@ -925,13 +981,11 @@
  {
    final int serverId = getServerId();
    final DN baseDN = getBaseDN();
    ConnectedRS rs = null;
    try
    {
      replicationServer = session.getReadableRemoteAddress();
      maxSendWindow = rsInfo.getWindowSize();
      rsGroupId = rsInfo.getGroupId();
      rsServerId = rsInfo.getServerId();
      rsServerUrl = rsInfo.getServerURL();
      receiveTopo(topologyMsg);
@@ -964,7 +1018,8 @@
      }
      sendWindow = new Semaphore(maxSendWindow);
      rcvWindow = getMaxRcvWindow();
      connected = true;
      rs = new ConnectedRS(true, rsInfo, session.getReadableRemoteAddress());
      connectedRS.set(rs);
      /*
      May have created a broker with null replication domain for
@@ -977,18 +1032,17 @@
      }
      final byte groupId = getGroupId();
      if (getRsGroupId() != groupId)
      if (rs.getGroupId() != groupId)
      {
        /*
        Connected to replication server with wrong group id:
        warn user and start heartbeat monitor to recover when a server
        with the right group id shows up.
        */
        Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
                Byte.toString(groupId), Integer.toString(rsServerId),
                rsInfo.getServerURL(), Byte.toString(getRsGroupId()),
                baseDN.toNormalizedString(), Integer.toString(serverId));
        logError(message);
        logError(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
            Byte.toString(groupId), Integer.toString(rs.getServerId()),
            rsInfo.getServerURL(), Byte.toString(rs.getGroupId()),
            baseDN.toNormalizedString(), Integer.toString(serverId)));
      }
      startRSHeartBeatMonitoring();
      if (rsInfo.getProtocolVersion() >=
@@ -1006,8 +1060,9 @@
    }
    finally
    {
      if (!connected)
      if (rs == null)
      {
        connectedRS.set(ConnectedRS.noConnectedRS());
        setSession(null);
      }
    }
@@ -1108,8 +1163,9 @@
      boolean isSslEncryption = replSessionSecurity.isSslEncryption();
      // Send our ServerStartMsg.
      String url = socket.getLocalAddress().getHostName() + ":"
          + socket.getLocalPort();
      final HostPort hp = new HostPort(
          socket.getLocalAddress().getHostName(), socket.getLocalPort());
      String url = hp.toString();
      StartMsg serverStartMsg;
      if (!isECL)
      {
@@ -1135,11 +1191,11 @@
      }
      // Wrap received message in a server info object
      ReplicationServerInfo replServerInfo = ReplicationServerInfo
          .newInstance(msg, server);
      final ReplicationServerInfo replServerInfo =
          ReplicationServerInfo.newInstance(msg, server);
      // Sanity check
      DN repDN = replServerInfo.getBaseDN();
      final DN repDN = replServerInfo.getBaseDN();
      if (!getBaseDN().equals(repDN))
      {
        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(
@@ -1167,7 +1223,7 @@
      hasConnected = true;
      // If this connection as the one to use for sending and receiving
      // If this connection is the one to use for sending and receiving
      // updates, store it.
      if (keepConnection)
      {
@@ -1180,20 +1236,17 @@
    {
      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(),
          server, getBaseDN().toNormalizedString());
      return null;
    }
    catch (SocketTimeoutException e)
    {
      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(),
          server, getBaseDN().toNormalizedString());
      return null;
    }
    catch (Exception e)
    {
      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(),
          server, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e));
      return null;
    }
    finally
    {
@@ -1203,26 +1256,28 @@
        close(socket);
      }
      if (!hasConnected && errorMessage != null)
      if (keepConnection && !hasConnected)
      {
        // There was no server waiting on this host:port Log a notice and try
        // the next replicationServer in the list
        if (!connectionError)
        {
          if (keepConnection) // Log error message only for final connection
          {
            // the error message is only logged once to avoid overflowing
            // the error log
            logError(errorMessage);
          }
        connectedRS.set(ConnectedRS.noConnectedRS());
      }
          if (debugEnabled())
          {
            TRACER.debugInfo(errorMessage.toString());
          }
      if (!hasConnected && errorMessage != null && !connectionError)
      {
        // There was no server waiting on this host:port
        // Log a notice and will try the next replicationServer in the list
        if (keepConnection) // Log error message only for final connection
        {
          // log the error message only once to avoid overflowing the error log
          logError(errorMessage);
        }
        if (debugEnabled())
        {
          TRACER.debugInfo(errorMessage.toString());
        }
      }
    }
    return null;
  }
@@ -1234,7 +1289,7 @@
   *
   * @param server Server we are connecting with.
   */
  private void performECLPhaseTwoHandshake(String server)
  private void performECLPhaseTwoHandshake(String server, ConnectedRS rs)
  {
    try
    {
@@ -1252,14 +1307,16 @@
      // Alright set the timeout to the desired value
      localSession.setSoTimeout(timeout);
      connected = true;
    } catch (Exception e)
      connectedRS.set(rs.setConnected());
    }
    catch (Exception e)
    {
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
          getServerId(), server, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e));
      logError(message);
      connectedRS.set(ConnectedRS.noConnectedRS());
      setSession(null);
    }
  }
@@ -1287,8 +1344,7 @@
      // unit test purpose.
      if (domain != null)
      {
        startSessionMsg =
          new StartSessionMsg(
        startSessionMsg = new StartSessionMsg(
          initStatus,
          domain.getRefUrls(),
          domain.isAssured(),
@@ -1306,9 +1362,7 @@
      final Session localSession = session;
      localSession.publish(startSessionMsg);
      /*
       * Read the TopologyMsg that should come back.
       */
      // Read the TopologyMsg that should come back.
      final TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
      if (debugEnabled())
@@ -1320,13 +1374,15 @@
      // Alright set the timeout to the desired value
      localSession.setSoTimeout(timeout);
      return topologyMsg;
    } catch (Exception e)
    }
    catch (Exception e)
    {
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
          getServerId(), server, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e));
      logError(message);
      connectedRS.set(ConnectedRS.noConnectedRS());
      setSession(null);
      // Be sure to return null.
@@ -2263,13 +2319,14 @@
      numLostConnections++;
    }
    ConnectedRS rs;
    if (failingSession == session)
    {
      connected = false;
      rsGroupId = -1;
      rsServerId = -1;
      rsServerUrl = null;
      rs = ConnectedRS.noConnectedRS();
      connectedRS.set(rs);
      setSession(null);
    } else {
      rs = connectedRS.get();
    }
    while (true)
@@ -2277,14 +2334,15 @@
      // Synchronize inside the loop in order to allow shutdown.
      synchronized (startStopLock)
      {
        if (connected || shutdown)
        if (rs.connected || shutdown)
        {
          break;
        }
        try
        {
          connect();
          connect(rs);
          rs = connectedRS.get();
        }
        catch (Exception e)
        {
@@ -2295,11 +2353,10 @@
          logError(mb.toMessage());
        }
        if (connected || !infiniteTry)
        if (rs.connected || !infiniteTry)
        {
          break;
        }
      }
      try
      {
@@ -2313,8 +2370,8 @@
    if (debugEnabled())
    {
      debugInfo("end restart : connected=" + connected + " with RS("
          + getRsServerId() + ") genId=" + this.generationID);
      debugInfo("end restart : connected=" + rs.connected + " with RS("
          + rs.getServerId() + ") genId=" + generationID);
    }
  }
@@ -2533,7 +2590,8 @@
  {
    while (!shutdown)
    {
      if (reconnectOnFailure && !connected)
      final ConnectedRS rs = connectedRS.get();
      if (reconnectOnFailure && !rs.connected)
      {
        // infinite try to reconnect
        reStart(null, true);
@@ -2550,7 +2608,7 @@
      final int serverId = getServerId();
      final DN baseDN = getBaseDN();
      final int previousRsServerID = rsServerId;
      final int previousRsServerID = rs.getServerId();
      try
      {
        ReplicationMsg msg = localSession.receive();
@@ -2786,19 +2844,15 @@
  public void stop()
  {
    if (debugEnabled())
      debugInfo("is stopping and will close the connection to"
          + " replication server " + rsServerId);
      debugInfo("is stopping and will close the connection to RS("
          + getRsServerId() + ")");
    synchronized (startStopLock)
    {
      shutdown = true;
      connected = false;
      stopRSHeartBeatMonitoring();
      stopChangeTimeHeartBeatPublishing();
      replicationServer = "stopped";
      rsGroupId = -1;
      rsServerId = -1;
      rsServerUrl = null;
      connectedRS.set(ConnectedRS.stopped());
      setSession(null);
      deregisterReplicationMonitor();
    }
@@ -2834,7 +2888,7 @@
   */
  public String getReplicationServer()
  {
    return replicationServer;
    return connectedRS.get().replicationServer;
  }
  /**
@@ -2874,7 +2928,7 @@
   */
  public int getCurrentSendWindow()
  {
    if (connected)
    if (isConnected())
    {
      return sendWindow.availablePermits();
    }
@@ -2934,7 +2988,7 @@
   */
  public boolean isConnected()
  {
    return connected;
    return connectedRS.get().connected;
  }
  /**
@@ -3003,7 +3057,7 @@
  {
    List<Integer> connectedDSs = new ArrayList<Integer>();
    if (rsServerId == rsId)
    if (getRsServerId() == rsId)
    {
      /*
      If we are computing connected DSs for the RS we are connected
@@ -3044,15 +3098,13 @@
    {
      int rsId = rsInfo.getId();
      rssToKeep.add(rsId); // Mark this server as still existing
      List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
      final List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
      ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
      if (rsInfo2 == null)
      {
        // New replication server, create info for it add it to the list
        rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
        // Set the locally configured flag for this new RS only if it is
        // configured
        updateRSInfoLocallyConfiguredStatus(rsInfo2);
        setLocallyConfiguredFlag(rsInfo2);
        replicationServerInfos.put(rsId, rsInfo2);
      } else
      {
@@ -3061,21 +3113,9 @@
      }
    }
    /**
     * Now remove any replication server that may have disappeared from the
     * topology.
     */
    Iterator<Integer> rsInfoIt = replicationServerInfos.keySet().iterator();
    while (rsInfoIt.hasNext())
    {
      final Integer rsId = rsInfoIt.next();
      if (!rssToKeep.contains(rsId))
      {
        // This replication server has quit the topology, remove it from the
        // list
        rsInfoIt.remove();
      }
    }
    // Remove any replication server that may have disappeared from the topology
    replicationServerInfos.keySet().retainAll(rssToKeep);
    if (domain != null)
    {
      for (DSInfo info : dsList)
@@ -3231,18 +3271,8 @@
      .append(getServerId()).append("\",")
      .append(" groupId=").append(getGroupId())
      .append(", genId=").append(generationID)
      .append(", connected=").append(connected).append(", ");
    if (rsServerId == -1)
    {
      sb.append("no RS");
    }
    else
    {
      sb.append("bestRS(serverId=").append(rsServerId)
        .append(", serverUrl=").append(rsServerUrl)
        .append(", groupId=").append(rsGroupId)
        .append(")");
    }
      .append(", ");
    connectedRS.get().toString(sb);
    return sb.toString();
  }