From 3225377d57acd0db675bdc27723a1f9f536526ab Mon Sep 17 00:00:00 2001
From: Jean-Noël Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 07 Jul 2016 10:38:32 +0000
Subject: [PATCH] Use HostPort more in replication

---
 opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java |   15 ----
 opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java                     |   41 ++++++-------
 opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java  |    6 -
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java      |    2 
 opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java     |    6 -
 opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java            |   80 +++++++++++++++++---------
 opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java            |    7 +-
 opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java                  |   12 +---
 8 files changed, 84 insertions(+), 85 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java
index d92ac16..5965219 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java
@@ -12,13 +12,18 @@
  * information: "Portions Copyright [year] [name of copyright owner]".
  *
  * Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS.
+ * Portions Copyright 2011-2016 ForgeRock AS.
  */
 package org.opends.server.replication.protocol;
 
 import static org.opends.server.util.StaticUtils.*;
 
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
 import java.util.concurrent.CountDownLatch;
@@ -31,8 +36,9 @@
 
 import javax.net.ssl.SSLSocket;
 
-import org.opends.server.api.DirectoryThread;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.types.HostPort;
 import org.opends.server.util.StaticUtils;
 
 /**
@@ -48,17 +54,12 @@
   private final OutputStream plainOutput;
   private final byte[] rcvLengthBuf = new byte[8];
   private final String readableRemoteAddress;
-  private final String remoteAddress;
-  private final String localUrl;
+  private final HostPort remoteAddress;
+  private final HostPort localUrl;
 
-  /**
-   * The time the last message published to this session.
-   */
+  /** The time the last message published to this session. */
   private volatile long lastPublishTime;
-
-  /**
-   * The time the last message was received on this session.
-   */
+  /** The time the last message was received on this session. */
   private volatile long lastReceiveTime;
 
   /**
@@ -129,11 +130,9 @@
     this.plainOutput = plainSocket.getOutputStream();
     this.input = new BufferedInputStream(secureSocket.getInputStream());
     this.output = new BufferedOutputStream(secureSocket.getOutputStream());
-    this.readableRemoteAddress = plainSocket.getRemoteSocketAddress()
-        .toString();
-    this.remoteAddress = plainSocket.getInetAddress().getHostAddress();
-    this.localUrl = plainSocket.getLocalAddress().getHostName() + ":"
-        + plainSocket.getLocalPort();
+    this.readableRemoteAddress = plainSocket.getRemoteSocketAddress().toString();
+    this.remoteAddress = new HostPort(plainSocket.getInetAddress().getHostAddress(), plainSocket.getPort());
+    this.localUrl = new HostPort(plainSocket.getLocalAddress().getHostName(), plainSocket.getLocalPort());
   }
 
 
@@ -256,7 +255,7 @@
    *
    * @return The local URL.
    */
-  public String getLocalUrl()
+  public HostPort getLocalUrl()
   {
     return localUrl;
   }
@@ -276,11 +275,11 @@
 
 
   /**
-   * Retrieve the IP address of the remote server.
+   * Retrieve the IP address and port of the remote server.
    *
-   * @return The IP address of the remote server.
+   * @return The IP address and port of the remote server.
    */
-  public String getRemoteAddress()
+  public HostPort getRemoteAddress()
   {
     return remoteAddress;
   }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java
index a91f18f..dce9118 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -102,7 +102,7 @@
   {
     final int port = HostPort.valueOf(serverURL).getPort();
     // Ensure correct formatting of IPv6 addresses by using a HostPort instance.
-    return new HostPort(session.getRemoteAddress(), port).toString();
+    return new HostPort(session.getRemoteAddress().getHost(), port).toString();
   }
 
   /**
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
index 9553f8e..0e4f4de 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
@@ -20,9 +20,21 @@
 import java.math.BigDecimal;
 import java.math.MathContext;
 import java.math.RoundingMode;
-import java.net.*;
-import java.util.*;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -34,13 +46,33 @@
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.util.Utils;
-import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.replication.common.*;
-import org.opends.server.replication.plugin.MultimasterReplication;
-import org.opends.server.replication.protocol.*;
 import org.forgerock.opendj.ldap.DN;
+import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
+import org.forgerock.util.Utils;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartDSMsg;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ServerStartMsg;
+import org.opends.server.replication.protocol.Session;
+import org.opends.server.replication.protocol.StartMsg;
+import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.WindowMsg;
+import org.opends.server.replication.protocol.WindowProbeMsg;
 import org.opends.server.types.HostPort;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -61,17 +93,15 @@
   @Immutable
   private static final class ConnectedRS
   {
-
-    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
-        NO_CONNECTED_SERVER);
+    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(NO_CONNECTED_SERVER);
 
     /** The info of the RS we are connected to. */
     private final ReplicationServerInfo rsInfo;
     /** Contains a connected session to the RS if any exist, null otherwise. */
     private final Session session;
-    private final String replicationServer;
+    private final HostPort replicationServer;
 
-    private ConnectedRS(String replicationServer)
+    private ConnectedRS(HostPort replicationServer)
     {
       this.rsInfo = null;
       this.session = null;
@@ -82,14 +112,14 @@
     {
       this.rsInfo = rsInfo;
       this.session = session;
-      this.replicationServer = session != null ?
-          session.getReadableRemoteAddress()
+      this.replicationServer = session != null
+          ? session.getRemoteAddress()
           : NO_CONNECTED_SERVER;
     }
 
     private static ConnectedRS stopped()
     {
-      return new ConnectedRS("stopped");
+      return NO_CONNECTED_RS;
     }
 
     private static ConnectedRS noConnectedRS()
@@ -142,10 +172,8 @@
   private volatile boolean shutdown;
   private final Object startStopLock = new Object();
   private volatile ReplicationDomainCfg config;
-  /**
-   * String reported under CSN=monitor when there is no connected RS.
-   */
-  static final String NO_CONNECTED_SERVER = "Not connected";
+  /** String reported under CSN=monitor when there is no connected RS. */
+  static final HostPort NO_CONNECTED_SERVER = new HostPort(null, 0);
   private final ServerState state;
   private Semaphore sendWindow;
   private int maxSendWindow;
@@ -2700,13 +2728,11 @@
   }
 
   /**
-   * Get the name of the replicationServer to which this broker is currently
-   * connected.
+   * Get the host and port of the replicationServer to which this broker is currently connected.
    *
-   * @return the name of the replicationServer to which this domain
-   *         is currently connected.
+   * @return the host and port of the replicationServer to which this domain is currently connected.
    */
-  public String getReplicationServer()
+  public HostPort getReplicationServer()
   {
     return connectedRS.get().replicationServer;
   }
@@ -3254,10 +3280,10 @@
    *
    * @return The local address.
    */
-  String getLocalUrl()
+  HostPort getLocalUrl()
   {
     final Session session = connectedRS.get().session;
-    return session != null ? session.getLocalUrl() : "";
+    return session != null ? session.getLocalUrl() : new HostPort(null, 0);
   }
 
   /**
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
index 18bdbe5..688cc98 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
@@ -83,6 +83,7 @@
 import org.opends.server.tasks.InitializeTargetTask;
 import org.opends.server.tasks.InitializeTask;
 import org.opends.server.types.DirectoryException;
+import org.opends.server.types.HostPort;
 
 /**
  * This class should be used as a base for Replication implementations.
@@ -2698,7 +2699,7 @@
    * @return the name of the replicationServer to which this domain
    *         is currently connected.
    */
-  public String getReplicationServer()
+  public HostPort getReplicationServer()
   {
     if (broker != null)
     {
@@ -3447,10 +3448,10 @@
    *
    * @return The local address.
    */
-  String getLocalUrl()
+  HostPort getLocalUrl()
   {
     final ReplicationBroker tmp = broker;
-    return tmp != null ? tmp.getLocalUrl() : "";
+    return tmp != null ? tmp.getLocalUrl() : ReplicationBroker.NO_CONNECTED_SERVER;
   }
 
   /**
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
index 1a781a3..e1f6f10 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
@@ -61,7 +61,6 @@
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.Entry;
-import org.opends.server.types.HostPort;
 import org.opends.server.types.Modification;
 import org.opends.server.types.SearchResultEntry;
 import org.opends.server.util.TestTimer;
@@ -550,17 +549,13 @@
       @Override
       public Void call() throws Exception
       {
-        assertEquals(DirectoryServer.entryExists(dn), exist);
+        assertEquals(DirectoryServer.entryExists(dn), exist, "Expected entry with dn \"" + dn + "\" would exist");
         return null;
       }
     });
 
     Entry entry = DirectoryServer.getEntry(dn);
-    if (entry != null)
-    {
-      return entry.duplicate(true);
-    }
-    return null;
+    return entry != null ? entry.duplicate(true) : null;
   }
 
   /** Update the monitor count for the specified monitor attribute. */
@@ -923,9 +918,8 @@
       boolean rightPort = false;
       if (connected)
       {
-        String rsUrl = rd.getReplicationServer();
         try {
-          rdPort = HostPort.valueOf(rsUrl).getPort();
+          rdPort = rd.getReplicationServer().getPort();
           rightPort = rdPort == rsPort;
         }
         catch (IllegalArgumentException notConnectedYet)
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 1e612e5..58edf32 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -52,7 +52,6 @@
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.Entry;
-import org.opends.server.types.HostPort;
 import org.opends.server.types.Modification;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -372,11 +371,8 @@
     DN baseDN = DN.valueOf(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING);
     replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 1000, generationId);
 
-    // Test connection
     assertTrue(replicationDomain.isConnected());
-    // Check connected server port
-    String serverStr = replicationDomain.getReplicationServer();
-    assertEquals(HostPort.valueOf(serverStr).getPort(), replServerPort);
+    assertEquals(replicationDomain.getReplicationServer().getPort(), replServerPort);
   }
 
   private void initTest() throws Exception
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
index 4fbad98..29ac77e 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -31,7 +31,6 @@
 import org.opends.server.replication.ReplicationTestCase;
 import org.opends.server.replication.server.ReplServerFakeConfiguration;
 import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.types.HostPort;
 import org.testng.annotations.Test;
 
 /**
@@ -128,7 +127,7 @@
 
       // DS1 connected to RS1 ?
       // Check which replication server is connected to this LDAP server
-      rsPort = findReplServerConnected(rd1);
+      rsPort = rd1.getReplicationServer().getPort();
 
       if (rsPort == rs1Port)
       {
@@ -332,16 +331,4 @@
 
     return replicationDomain;
   }
-
-  private int findReplServerConnected(LDAPReplicationDomain rd)
-  {
-    // First check that the Replication domain is connected
-    if (!rd.isConnected())
-    {
-      return -1;
-    }
-
-    String serverStr = rd.getReplicationServer();
-    return HostPort.valueOf(serverStr).getPort();
-  }
 }
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 2d07319..8dd483f 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -68,7 +68,6 @@
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.types.DirectoryException;
-import org.opends.server.types.HostPort;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -318,11 +317,8 @@
       fakeReplicationDomain.startListenService();
     }
 
-    // Test connection
     assertTrue(fakeReplicationDomain.isConnected());
-    // Check connected server port
-    HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
-    assertEquals(rd.getPort(), getRsPort(rsId));
+    assertEquals(fakeReplicationDomain.getReplicationServer().getPort(), getRsPort(rsId));
 
     return fakeReplicationDomain;
   }

--
Gitblit v1.10.0