opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java
@@ -12,13 +12,18 @@ * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011-2015 ForgeRock AS. * Portions Copyright 2011-2016 ForgeRock AS. */ package org.opends.server.replication.protocol; import static org.opends.server.util.StaticUtils.*; import java.io.*; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.SocketException; import java.util.concurrent.CountDownLatch; @@ -31,8 +36,9 @@ import javax.net.ssl.SSLSocket; import org.opends.server.api.DirectoryThread; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.opends.server.api.DirectoryThread; import org.opends.server.types.HostPort; import org.opends.server.util.StaticUtils; /** @@ -48,17 +54,12 @@ private final OutputStream plainOutput; private final byte[] rcvLengthBuf = new byte[8]; private final String readableRemoteAddress; private final String remoteAddress; private final String localUrl; private final HostPort remoteAddress; private final HostPort localUrl; /** * The time the last message published to this session. */ /** The time the last message published to this session. */ private volatile long lastPublishTime; /** * The time the last message was received on this session. */ /** The time the last message was received on this session. */ private volatile long lastReceiveTime; /** @@ -129,11 +130,9 @@ this.plainOutput = plainSocket.getOutputStream(); this.input = new BufferedInputStream(secureSocket.getInputStream()); this.output = new BufferedOutputStream(secureSocket.getOutputStream()); this.readableRemoteAddress = plainSocket.getRemoteSocketAddress() .toString(); this.remoteAddress = plainSocket.getInetAddress().getHostAddress(); this.localUrl = plainSocket.getLocalAddress().getHostName() + ":" + plainSocket.getLocalPort(); this.readableRemoteAddress = plainSocket.getRemoteSocketAddress().toString(); this.remoteAddress = new HostPort(plainSocket.getInetAddress().getHostAddress(), plainSocket.getPort()); this.localUrl = new HostPort(plainSocket.getLocalAddress().getHostName(), plainSocket.getLocalPort()); } @@ -256,7 +255,7 @@ * * @return The local URL. */ public String getLocalUrl() public HostPort getLocalUrl() { return localUrl; } @@ -276,11 +275,11 @@ /** * Retrieve the IP address of the remote server. * Retrieve the IP address and port of the remote server. * * @return The IP address of the remote server. * @return The IP address and port of the remote server. */ public String getRemoteAddress() public HostPort getRemoteAddress() { return remoteAddress; } opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -102,7 +102,7 @@ { final int port = HostPort.valueOf(serverURL).getPort(); // Ensure correct formatting of IPv6 addresses by using a HostPort instance. return new HostPort(session.getRemoteAddress(), port).toString(); return new HostPort(session.getRemoteAddress().getHost(), port).toString(); } /** opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
@@ -20,9 +20,21 @@ import java.math.BigDecimal; import java.math.MathContext; import java.math.RoundingMode; import java.net.*; import java.util.*; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -34,13 +46,33 @@ import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.util.Utils; import org.forgerock.opendj.server.config.server.ReplicationDomainCfg; import org.opends.server.core.DirectoryServer; import org.opends.server.replication.common.*; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.*; import org.forgerock.opendj.ldap.DN; import org.forgerock.opendj.server.config.server.ReplicationDomainCfg; import org.forgerock.util.Utils; import org.opends.server.core.DirectoryServer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.DSInfo; import org.opends.server.replication.common.RSInfo; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.ChangeStatusMsg; import org.opends.server.replication.protocol.MonitorMsg; import org.opends.server.replication.protocol.MonitorRequestMsg; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplServerStartDSMsg; import org.opends.server.replication.protocol.ReplServerStartMsg; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.ServerStartMsg; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.protocol.StartMsg; import org.opends.server.replication.protocol.StartSessionMsg; import org.opends.server.replication.protocol.StopMsg; import org.opends.server.replication.protocol.TopologyMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.protocol.WindowMsg; import org.opends.server.replication.protocol.WindowProbeMsg; import org.opends.server.types.HostPort; import static org.opends.messages.ReplicationMessages.*; @@ -61,17 +93,15 @@ @Immutable private static final class ConnectedRS { private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS( NO_CONNECTED_SERVER); private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(NO_CONNECTED_SERVER); /** The info of the RS we are connected to. */ private final ReplicationServerInfo rsInfo; /** Contains a connected session to the RS if any exist, null otherwise. */ private final Session session; private final String replicationServer; private final HostPort replicationServer; private ConnectedRS(String replicationServer) private ConnectedRS(HostPort replicationServer) { this.rsInfo = null; this.session = null; @@ -82,14 +112,14 @@ { this.rsInfo = rsInfo; this.session = session; this.replicationServer = session != null ? session.getReadableRemoteAddress() this.replicationServer = session != null ? session.getRemoteAddress() : NO_CONNECTED_SERVER; } private static ConnectedRS stopped() { return new ConnectedRS("stopped"); return NO_CONNECTED_RS; } private static ConnectedRS noConnectedRS() @@ -142,10 +172,8 @@ private volatile boolean shutdown; private final Object startStopLock = new Object(); private volatile ReplicationDomainCfg config; /** * String reported under CSN=monitor when there is no connected RS. */ static final String NO_CONNECTED_SERVER = "Not connected"; /** String reported under CSN=monitor when there is no connected RS. */ static final HostPort NO_CONNECTED_SERVER = new HostPort(null, 0); private final ServerState state; private Semaphore sendWindow; private int maxSendWindow; @@ -2700,13 +2728,11 @@ } /** * Get the name of the replicationServer to which this broker is currently * connected. * Get the host and port of the replicationServer to which this broker is currently connected. * * @return the name of the replicationServer to which this domain * is currently connected. * @return the host and port of the replicationServer to which this domain is currently connected. */ public String getReplicationServer() public HostPort getReplicationServer() { return connectedRS.get().replicationServer; } @@ -3254,10 +3280,10 @@ * * @return The local address. */ String getLocalUrl() HostPort getLocalUrl() { final Session session = connectedRS.get().session; return session != null ? session.getLocalUrl() : ""; return session != null ? session.getLocalUrl() : new HostPort(null, 0); } /** opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
@@ -83,6 +83,7 @@ import org.opends.server.tasks.InitializeTargetTask; import org.opends.server.tasks.InitializeTask; import org.opends.server.types.DirectoryException; import org.opends.server.types.HostPort; /** * This class should be used as a base for Replication implementations. @@ -2698,7 +2699,7 @@ * @return the name of the replicationServer to which this domain * is currently connected. */ public String getReplicationServer() public HostPort getReplicationServer() { if (broker != null) { @@ -3447,10 +3448,10 @@ * * @return The local address. */ String getLocalUrl() HostPort getLocalUrl() { final ReplicationBroker tmp = broker; return tmp != null ? tmp.getLocalUrl() : ""; return tmp != null ? tmp.getLocalUrl() : ReplicationBroker.NO_CONNECTED_SERVER; } /** opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
@@ -61,7 +61,6 @@ import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.Entry; import org.opends.server.types.HostPort; import org.opends.server.types.Modification; import org.opends.server.types.SearchResultEntry; import org.opends.server.util.TestTimer; @@ -550,17 +549,13 @@ @Override public Void call() throws Exception { assertEquals(DirectoryServer.entryExists(dn), exist); assertEquals(DirectoryServer.entryExists(dn), exist, "Expected entry with dn \"" + dn + "\" would exist"); return null; } }); Entry entry = DirectoryServer.getEntry(dn); if (entry != null) { return entry.duplicate(true); } return null; return entry != null ? entry.duplicate(true) : null; } /** Update the monitor count for the specified monitor attribute. */ @@ -923,9 +918,8 @@ boolean rightPort = false; if (connected) { String rsUrl = rd.getReplicationServer(); try { rdPort = HostPort.valueOf(rsUrl).getPort(); rdPort = rd.getReplicationServer().getPort(); rightPort = rdPort == rsPort; } catch (IllegalArgumentException notConnectedYet) opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -52,7 +52,6 @@ import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.Entry; import org.opends.server.types.HostPort; import org.opends.server.types.Modification; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -372,11 +371,8 @@ DN baseDN = DN.valueOf(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING); replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 1000, generationId); // Test connection assertTrue(replicationDomain.isConnected()); // Check connected server port String serverStr = replicationDomain.getReplicationServer(); assertEquals(HostPort.valueOf(serverStr).getPort(), replServerPort); assertEquals(replicationDomain.getReplicationServer().getPort(), replServerPort); } private void initTest() throws Exception opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -31,7 +31,6 @@ import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.types.HostPort; import org.testng.annotations.Test; /** @@ -128,7 +127,7 @@ // DS1 connected to RS1 ? // Check which replication server is connected to this LDAP server rsPort = findReplServerConnected(rd1); rsPort = rd1.getReplicationServer().getPort(); if (rsPort == rs1Port) { @@ -332,16 +331,4 @@ return replicationDomain; } private int findReplServerConnected(LDAPReplicationDomain rd) { // First check that the Replication domain is connected if (!rd.isConnected()) { return -1; } String serverStr = rd.getReplicationServer(); return HostPort.valueOf(serverStr).getPort(); } } opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -68,7 +68,6 @@ import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.service.ReplicationDomain; import org.opends.server.types.DirectoryException; import org.opends.server.types.HostPort; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -318,11 +317,8 @@ fakeReplicationDomain.startListenService(); } // Test connection assertTrue(fakeReplicationDomain.isConnected()); // Check connected server port HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer()); assertEquals(rd.getPort(), getRsPort(rsId)); assertEquals(fakeReplicationDomain.getReplicationServer().getPort(), getRsPort(rsId)); return fakeReplicationDomain; }