From 3225377d57acd0db675bdc27723a1f9f536526ab Mon Sep 17 00:00:00 2001
From: Jean-Noël Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 07 Jul 2016 10:38:32 +0000
Subject: [PATCH] Use HostPort more in replication
---
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java | 15 ----
opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java | 41 ++++++-------
opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java | 6 -
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java | 2
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java | 6 -
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java | 80 +++++++++++++++++---------
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java | 7 +-
opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java | 12 +---
8 files changed, 84 insertions(+), 85 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java
index d92ac16..5965219 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java
@@ -12,13 +12,18 @@
* information: "Portions Copyright [year] [name of copyright owner]".
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS.
+ * Portions Copyright 2011-2016 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
import static org.opends.server.util.StaticUtils.*;
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
@@ -31,8 +36,9 @@
import javax.net.ssl.SSLSocket;
-import org.opends.server.api.DirectoryThread;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.types.HostPort;
import org.opends.server.util.StaticUtils;
/**
@@ -48,17 +54,12 @@
private final OutputStream plainOutput;
private final byte[] rcvLengthBuf = new byte[8];
private final String readableRemoteAddress;
- private final String remoteAddress;
- private final String localUrl;
+ private final HostPort remoteAddress;
+ private final HostPort localUrl;
- /**
- * The time the last message published to this session.
- */
+ /** The time the last message published to this session. */
private volatile long lastPublishTime;
-
- /**
- * The time the last message was received on this session.
- */
+ /** The time the last message was received on this session. */
private volatile long lastReceiveTime;
/**
@@ -129,11 +130,9 @@
this.plainOutput = plainSocket.getOutputStream();
this.input = new BufferedInputStream(secureSocket.getInputStream());
this.output = new BufferedOutputStream(secureSocket.getOutputStream());
- this.readableRemoteAddress = plainSocket.getRemoteSocketAddress()
- .toString();
- this.remoteAddress = plainSocket.getInetAddress().getHostAddress();
- this.localUrl = plainSocket.getLocalAddress().getHostName() + ":"
- + plainSocket.getLocalPort();
+ this.readableRemoteAddress = plainSocket.getRemoteSocketAddress().toString();
+ this.remoteAddress = new HostPort(plainSocket.getInetAddress().getHostAddress(), plainSocket.getPort());
+ this.localUrl = new HostPort(plainSocket.getLocalAddress().getHostName(), plainSocket.getLocalPort());
}
@@ -256,7 +255,7 @@
*
* @return The local URL.
*/
- public String getLocalUrl()
+ public HostPort getLocalUrl()
{
return localUrl;
}
@@ -276,11 +275,11 @@
/**
- * Retrieve the IP address of the remote server.
+ * Retrieve the IP address and port of the remote server.
*
- * @return The IP address of the remote server.
+ * @return The IP address and port of the remote server.
*/
- public String getRemoteAddress()
+ public HostPort getRemoteAddress()
{
return remoteAddress;
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java
index a91f18f..dce9118 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -102,7 +102,7 @@
{
final int port = HostPort.valueOf(serverURL).getPort();
// Ensure correct formatting of IPv6 addresses by using a HostPort instance.
- return new HostPort(session.getRemoteAddress(), port).toString();
+ return new HostPort(session.getRemoteAddress().getHost(), port).toString();
}
/**
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
index 9553f8e..0e4f4de 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
@@ -20,9 +20,21 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
-import java.net.*;
-import java.util.*;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -34,13 +46,33 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
-import org.forgerock.util.Utils;
-import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.replication.common.*;
-import org.opends.server.replication.plugin.MultimasterReplication;
-import org.opends.server.replication.protocol.*;
import org.forgerock.opendj.ldap.DN;
+import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
+import org.forgerock.util.Utils;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartDSMsg;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ServerStartMsg;
+import org.opends.server.replication.protocol.Session;
+import org.opends.server.replication.protocol.StartMsg;
+import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.StopMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.WindowMsg;
+import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.types.HostPort;
import static org.opends.messages.ReplicationMessages.*;
@@ -61,17 +93,15 @@
@Immutable
private static final class ConnectedRS
{
-
- private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
- NO_CONNECTED_SERVER);
+ private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(NO_CONNECTED_SERVER);
/** The info of the RS we are connected to. */
private final ReplicationServerInfo rsInfo;
/** Contains a connected session to the RS if any exist, null otherwise. */
private final Session session;
- private final String replicationServer;
+ private final HostPort replicationServer;
- private ConnectedRS(String replicationServer)
+ private ConnectedRS(HostPort replicationServer)
{
this.rsInfo = null;
this.session = null;
@@ -82,14 +112,14 @@
{
this.rsInfo = rsInfo;
this.session = session;
- this.replicationServer = session != null ?
- session.getReadableRemoteAddress()
+ this.replicationServer = session != null
+ ? session.getRemoteAddress()
: NO_CONNECTED_SERVER;
}
private static ConnectedRS stopped()
{
- return new ConnectedRS("stopped");
+ return NO_CONNECTED_RS;
}
private static ConnectedRS noConnectedRS()
@@ -142,10 +172,8 @@
private volatile boolean shutdown;
private final Object startStopLock = new Object();
private volatile ReplicationDomainCfg config;
- /**
- * String reported under CSN=monitor when there is no connected RS.
- */
- static final String NO_CONNECTED_SERVER = "Not connected";
+ /** String reported under CSN=monitor when there is no connected RS. */
+ static final HostPort NO_CONNECTED_SERVER = new HostPort(null, 0);
private final ServerState state;
private Semaphore sendWindow;
private int maxSendWindow;
@@ -2700,13 +2728,11 @@
}
/**
- * Get the name of the replicationServer to which this broker is currently
- * connected.
+ * Get the host and port of the replicationServer to which this broker is currently connected.
*
- * @return the name of the replicationServer to which this domain
- * is currently connected.
+ * @return the host and port of the replicationServer to which this domain is currently connected.
*/
- public String getReplicationServer()
+ public HostPort getReplicationServer()
{
return connectedRS.get().replicationServer;
}
@@ -3254,10 +3280,10 @@
*
* @return The local address.
*/
- String getLocalUrl()
+ HostPort getLocalUrl()
{
final Session session = connectedRS.get().session;
- return session != null ? session.getLocalUrl() : "";
+ return session != null ? session.getLocalUrl() : new HostPort(null, 0);
}
/**
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
index 18bdbe5..688cc98 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
@@ -83,6 +83,7 @@
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.types.DirectoryException;
+import org.opends.server.types.HostPort;
/**
* This class should be used as a base for Replication implementations.
@@ -2698,7 +2699,7 @@
* @return the name of the replicationServer to which this domain
* is currently connected.
*/
- public String getReplicationServer()
+ public HostPort getReplicationServer()
{
if (broker != null)
{
@@ -3447,10 +3448,10 @@
*
* @return The local address.
*/
- String getLocalUrl()
+ HostPort getLocalUrl()
{
final ReplicationBroker tmp = broker;
- return tmp != null ? tmp.getLocalUrl() : "";
+ return tmp != null ? tmp.getLocalUrl() : ReplicationBroker.NO_CONNECTED_SERVER;
}
/**
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
index 1a781a3..e1f6f10 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
@@ -61,7 +61,6 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.Entry;
-import org.opends.server.types.HostPort;
import org.opends.server.types.Modification;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.util.TestTimer;
@@ -550,17 +549,13 @@
@Override
public Void call() throws Exception
{
- assertEquals(DirectoryServer.entryExists(dn), exist);
+ assertEquals(DirectoryServer.entryExists(dn), exist, "Expected entry with dn \"" + dn + "\" would exist");
return null;
}
});
Entry entry = DirectoryServer.getEntry(dn);
- if (entry != null)
- {
- return entry.duplicate(true);
- }
- return null;
+ return entry != null ? entry.duplicate(true) : null;
}
/** Update the monitor count for the specified monitor attribute. */
@@ -923,9 +918,8 @@
boolean rightPort = false;
if (connected)
{
- String rsUrl = rd.getReplicationServer();
try {
- rdPort = HostPort.valueOf(rsUrl).getPort();
+ rdPort = rd.getReplicationServer().getPort();
rightPort = rdPort == rsPort;
}
catch (IllegalArgumentException notConnectedYet)
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
index 1e612e5..58edf32 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -52,7 +52,6 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.Entry;
-import org.opends.server.types.HostPort;
import org.opends.server.types.Modification;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -372,11 +371,8 @@
DN baseDN = DN.valueOf(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING);
replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 1000, generationId);
- // Test connection
assertTrue(replicationDomain.isConnected());
- // Check connected server port
- String serverStr = replicationDomain.getReplicationServer();
- assertEquals(HostPort.valueOf(serverStr).getPort(), replServerPort);
+ assertEquals(replicationDomain.getReplicationServer().getPort(), replServerPort);
}
private void initTest() throws Exception
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
index 4fbad98..29ac77e 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -31,7 +31,6 @@
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.types.HostPort;
import org.testng.annotations.Test;
/**
@@ -128,7 +127,7 @@
// DS1 connected to RS1 ?
// Check which replication server is connected to this LDAP server
- rsPort = findReplServerConnected(rd1);
+ rsPort = rd1.getReplicationServer().getPort();
if (rsPort == rs1Port)
{
@@ -332,16 +331,4 @@
return replicationDomain;
}
-
- private int findReplServerConnected(LDAPReplicationDomain rd)
- {
- // First check that the Replication domain is connected
- if (!rd.isConnected())
- {
- return -1;
- }
-
- String serverStr = rd.getReplicationServer();
- return HostPort.valueOf(serverStr).getPort();
- }
}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 2d07319..8dd483f 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -68,7 +68,6 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.types.DirectoryException;
-import org.opends.server.types.HostPort;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -318,11 +317,8 @@
fakeReplicationDomain.startListenService();
}
- // Test connection
assertTrue(fakeReplicationDomain.isConnected());
- // Check connected server port
- HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
- assertEquals(rd.getPort(), getRsPort(rsId));
+ assertEquals(fakeReplicationDomain.getReplicationServer().getPort(), getRsPort(rsId));
return fakeReplicationDomain;
}
--
Gitblit v1.10.0