From 3225377d57acd0db675bdc27723a1f9f536526ab Mon Sep 17 00:00:00 2001
From: Jean-Noël Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 07 Jul 2016 10:38:32 +0000
Subject: [PATCH] Use HostPort more in replication

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java |   80 ++++++++++++++++++++++++++-------------
 1 files changed, 53 insertions(+), 27 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
index 9553f8e..0e4f4de 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
@@ -20,9 +20,21 @@
 import java.math.BigDecimal;
 import java.math.MathContext;
 import java.math.RoundingMode;
-import java.net.*;
-import java.util.*;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -34,13 +46,33 @@
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.util.Utils;
-import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.replication.common.*;
-import org.opends.server.replication.plugin.MultimasterReplication;
-import org.opends.server.replication.protocol.*;
 import org.forgerock.opendj.ldap.DN;
+import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
+import org.forgerock.util.Utils;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartDSMsg;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ServerStartMsg;
+import org.opends.server.replication.protocol.Session;
+import org.opends.server.replication.protocol.StartMsg;
+import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.WindowMsg;
+import org.opends.server.replication.protocol.WindowProbeMsg;
 import org.opends.server.types.HostPort;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -61,17 +93,15 @@
   @Immutable
   private static final class ConnectedRS
   {
-
-    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
-        NO_CONNECTED_SERVER);
+    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(NO_CONNECTED_SERVER);
 
     /** The info of the RS we are connected to. */
     private final ReplicationServerInfo rsInfo;
     /** Contains a connected session to the RS if any exist, null otherwise. */
     private final Session session;
-    private final String replicationServer;
+    private final HostPort replicationServer;
 
-    private ConnectedRS(String replicationServer)
+    private ConnectedRS(HostPort replicationServer)
     {
       this.rsInfo = null;
       this.session = null;
@@ -82,14 +112,14 @@
     {
       this.rsInfo = rsInfo;
       this.session = session;
-      this.replicationServer = session != null ?
-          session.getReadableRemoteAddress()
+      this.replicationServer = session != null
+          ? session.getRemoteAddress()
           : NO_CONNECTED_SERVER;
     }
 
     private static ConnectedRS stopped()
     {
-      return new ConnectedRS("stopped");
+      return NO_CONNECTED_RS;
     }
 
     private static ConnectedRS noConnectedRS()
@@ -142,10 +172,8 @@
   private volatile boolean shutdown;
   private final Object startStopLock = new Object();
   private volatile ReplicationDomainCfg config;
-  /**
-   * String reported under CSN=monitor when there is no connected RS.
-   */
-  static final String NO_CONNECTED_SERVER = "Not connected";
+  /** String reported under CSN=monitor when there is no connected RS. */
+  static final HostPort NO_CONNECTED_SERVER = new HostPort(null, 0);
   private final ServerState state;
   private Semaphore sendWindow;
   private int maxSendWindow;
@@ -2700,13 +2728,11 @@
   }
 
   /**
-   * Get the name of the replicationServer to which this broker is currently
-   * connected.
+   * Get the host and port of the replicationServer to which this broker is currently connected.
    *
-   * @return the name of the replicationServer to which this domain
-   *         is currently connected.
+   * @return the host and port of the replicationServer to which this domain is currently connected.
    */
-  public String getReplicationServer()
+  public HostPort getReplicationServer()
   {
     return connectedRS.get().replicationServer;
   }
@@ -3254,10 +3280,10 @@
    *
    * @return The local address.
    */
-  String getLocalUrl()
+  HostPort getLocalUrl()
   {
     final Session session = connectedRS.get().session;
-    return session != null ? session.getLocalUrl() : "";
+    return session != null ? session.getLocalUrl() : new HostPort(null, 0);
   }
 
   /**

--
Gitblit v1.10.0