mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noël Rouvignac
26.02.2016 3225377d57acd0db675bdc27723a1f9f536526ab
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
@@ -20,9 +20,21 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.net.*;
import java.util.*;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -34,13 +46,33 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Utils;
import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
import org.forgerock.util.Utils;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.types.HostPort;
import static org.opends.messages.ReplicationMessages.*;
@@ -61,17 +93,15 @@
  @Immutable
  private static final class ConnectedRS
  {
    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
        NO_CONNECTED_SERVER);
    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(NO_CONNECTED_SERVER);
    /** The info of the RS we are connected to. */
    private final ReplicationServerInfo rsInfo;
    /** Contains a connected session to the RS if any exist, null otherwise. */
    private final Session session;
    private final String replicationServer;
    private final HostPort replicationServer;
    private ConnectedRS(String replicationServer)
    private ConnectedRS(HostPort replicationServer)
    {
      this.rsInfo = null;
      this.session = null;
@@ -82,14 +112,14 @@
    {
      this.rsInfo = rsInfo;
      this.session = session;
      this.replicationServer = session != null ?
          session.getReadableRemoteAddress()
      this.replicationServer = session != null
          ? session.getRemoteAddress()
          : NO_CONNECTED_SERVER;
    }
    private static ConnectedRS stopped()
    {
      return new ConnectedRS("stopped");
      return NO_CONNECTED_RS;
    }
    private static ConnectedRS noConnectedRS()
@@ -142,10 +172,8 @@
  private volatile boolean shutdown;
  private final Object startStopLock = new Object();
  private volatile ReplicationDomainCfg config;
  /**
   * String reported under CSN=monitor when there is no connected RS.
   */
  static final String NO_CONNECTED_SERVER = "Not connected";
  /** String reported under CSN=monitor when there is no connected RS. */
  static final HostPort NO_CONNECTED_SERVER = new HostPort(null, 0);
  private final ServerState state;
  private Semaphore sendWindow;
  private int maxSendWindow;
@@ -2700,13 +2728,11 @@
  }
  /**
   * Get the name of the replicationServer to which this broker is currently
   * connected.
   * Get the host and port of the replicationServer to which this broker is currently connected.
   *
   * @return the name of the replicationServer to which this domain
   *         is currently connected.
   * @return the host and port of the replicationServer to which this domain is currently connected.
   */
  public String getReplicationServer()
  public HostPort getReplicationServer()
  {
    return connectedRS.get().replicationServer;
  }
@@ -3254,10 +3280,10 @@
   *
   * @return The local address.
   */
  String getLocalUrl()
  HostPort getLocalUrl()
  {
    final Session session = connectedRS.get().session;
    return session != null ? session.getLocalUrl() : "";
    return session != null ? session.getLocalUrl() : new HostPort(null, 0);
  }
  /**