mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.57.2013 157717b205d4c1f957cf810e04e06f11530c619c
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -27,32 +27,12 @@
 */
package org.opends.server.replication.service;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.server.ReplicationServer.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.net.*;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
@@ -62,17 +42,19 @@
import org.opends.messages.MessageBuilder;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.replication.server.ReplicationServer.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * The broker for Multi-master Replication.
 */
@@ -91,7 +73,7 @@
  private volatile Collection<String> replicationServerUrls;
  private volatile boolean connected = false;
  /**
   * String reported under cn=monitor when there is no connected RS.
   * String reported under CSN=monitor when there is no connected RS.
   */
  public final static String NO_CONNECTED_SERVER = "Not connected";
  private volatile String replicationServer = NO_CONNECTED_SERVER;
@@ -223,7 +205,7 @@
   * @param groupId The group id of our domain.
   * @param changeTimeHeartbeatInterval The interval (in ms) between Change
   *        time  heartbeats are sent to the RS,
   *        or zero if no CN heartbeat should be sent.
   *        or zero if no CSN heartbeat should be sent.
   */
  public ReplicationBroker(ReplicationDomain replicationDomain,
    ServerState state, String baseDn, int serverID2, int window,
@@ -1583,8 +1565,11 @@
    /**
     * Now apply the choice base on the weight to the best servers list
     */
    if (bestServers.size() > 1)
    if (bestServers.size() == 1)
    {
      return bestServers.values().iterator().next();
    }
      if (firstConnection)
      {
        // We are not connected to a server yet
@@ -1599,10 +1584,6 @@
        return computeBestServerForWeight(bestServers, rsServerId,
          localServerId);
      }
    } else
    {
      return bestServers.values().iterator().next();
    }
  }
  /**
@@ -1738,12 +1719,11 @@
    Map<Integer, ReplicationServerInfo> moreUpToDateServers =
      new HashMap<Integer, ReplicationServerInfo>();
    // Extract the change number of the latest change generated by the local
    // server
    ChangeNumber myChangeNumber = localState.getChangeNumber(localServerId);
    if (myChangeNumber == null)
    // Extract the CSN of the latest change generated by the local server
    CSN myCSN = localState.getCSN(localServerId);
    if (myCSN == null)
    {
      myChangeNumber = new ChangeNumber(0, 0, localServerId);
      myCSN = new CSN(0, 0, localServerId);
    }
    /**
@@ -1751,23 +1731,23 @@
     * if for instance we failed and restarted, having sent some changes to the
     * RS but without having time to store our own state) regarding our own
     * server id. If some servers more up to date, prefer this list but take
     * only the latest change number.
     * only the latest CSN.
     */
    ChangeNumber latestRsChangeNumber = null;
    CSN latestRsCSN = null;
    for (Integer rsId : bestServers.keySet())
    {
      ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
      ServerState rsState = replicationServerInfo.getServerState();
      ChangeNumber rsChangeNumber = rsState.getChangeNumber(localServerId);
      if (rsChangeNumber == null)
      CSN rsCSN = rsState.getCSN(localServerId);
      if (rsCSN == null)
      {
        rsChangeNumber = new ChangeNumber(0, 0, localServerId);
        rsCSN = new CSN(0, 0, localServerId);
      }
      // Has this replication server the latest local change ?
      if (myChangeNumber.olderOrEqual(rsChangeNumber))
      if (myCSN.olderOrEqual(rsCSN))
      {
        if (myChangeNumber.equals(rsChangeNumber))
        if (myCSN.equals(rsCSN))
        {
          // This replication server has exactly the latest change from the
          // local server
@@ -1776,14 +1756,14 @@
        {
          // This replication server is even more up to date than the local
          // server
          if (latestRsChangeNumber == null)
          if (latestRsCSN == null)
          {
            // Initialize the latest change number
            latestRsChangeNumber = rsChangeNumber;
            // Initialize the latest CSN
            latestRsCSN = rsCSN;
          }
          if (rsChangeNumber.newerOrEquals(latestRsChangeNumber))
          if (rsCSN.newerOrEquals(latestRsCSN))
          {
            if (rsChangeNumber.equals(latestRsChangeNumber))
            if (rsCSN.equals(latestRsCSN))
            {
              moreUpToDateServers.put(rsId, replicationServerInfo);
            } else
@@ -1792,7 +1772,7 @@
              // new RS
              moreUpToDateServers.clear();
              moreUpToDateServers.put(rsId, replicationServerInfo);
              latestRsChangeNumber = rsChangeNumber;
              latestRsCSN = rsCSN;
            }
          }
        }
@@ -1802,10 +1782,8 @@
    {
      // Prefer servers more up to date than local server
      return moreUpToDateServers;
    } else
    {
      return upToDateServers;
    }
    return upToDateServers;
  }
@@ -3041,7 +3019,7 @@
   */
  public void startChangeTimeHeartBeatPublishing()
  {
    // Start a CN heartbeat thread.
    // Start a CSN heartbeat thread.
    if (changeTimeHeartbeatSendInterval > 0)
    {
      String threadName = "Replica DS("
@@ -3058,7 +3036,7 @@
    {
      if (debugEnabled())
        TRACER.debugInfo(this
          + " is not configured to send CN heartbeat interval");
          + " is not configured to send CSN heartbeat interval");
    }
  }