mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noël Rouvignac
26.02.2016 3225377d57acd0db675bdc27723a1f9f536526ab
Use HostPort more in replication
8 files modified
169 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java 41 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java 2 ●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java 80 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java 7 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java 12 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java 6 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java 15 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java 6 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/protocol/Session.java
@@ -12,13 +12,18 @@
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Copyright 2006-2009 Sun Microsystems, Inc.
 * Portions Copyright 2011-2015 ForgeRock AS.
 * Portions Copyright 2011-2016 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import static org.opends.server.util.StaticUtils.*;
import java.io.*;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
@@ -31,8 +36,9 @@
import javax.net.ssl.SSLSocket;
import org.opends.server.api.DirectoryThread;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.api.DirectoryThread;
import org.opends.server.types.HostPort;
import org.opends.server.util.StaticUtils;
/**
@@ -48,17 +54,12 @@
  private final OutputStream plainOutput;
  private final byte[] rcvLengthBuf = new byte[8];
  private final String readableRemoteAddress;
  private final String remoteAddress;
  private final String localUrl;
  private final HostPort remoteAddress;
  private final HostPort localUrl;
  /**
   * The time the last message published to this session.
   */
  /** The time the last message published to this session. */
  private volatile long lastPublishTime;
  /**
   * The time the last message was received on this session.
   */
  /** The time the last message was received on this session. */
  private volatile long lastReceiveTime;
  /**
@@ -129,11 +130,9 @@
    this.plainOutput = plainSocket.getOutputStream();
    this.input = new BufferedInputStream(secureSocket.getInputStream());
    this.output = new BufferedOutputStream(secureSocket.getOutputStream());
    this.readableRemoteAddress = plainSocket.getRemoteSocketAddress()
        .toString();
    this.remoteAddress = plainSocket.getInetAddress().getHostAddress();
    this.localUrl = plainSocket.getLocalAddress().getHostName() + ":"
        + plainSocket.getLocalPort();
    this.readableRemoteAddress = plainSocket.getRemoteSocketAddress().toString();
    this.remoteAddress = new HostPort(plainSocket.getInetAddress().getHostAddress(), plainSocket.getPort());
    this.localUrl = new HostPort(plainSocket.getLocalAddress().getHostName(), plainSocket.getLocalPort());
  }
@@ -256,7 +255,7 @@
   *
   * @return The local URL.
   */
  public String getLocalUrl()
  public HostPort getLocalUrl()
  {
    return localUrl;
  }
@@ -276,11 +275,11 @@
  /**
   * Retrieve the IP address of the remote server.
   * Retrieve the IP address and port of the remote server.
   *
   * @return The IP address of the remote server.
   * @return The IP address and port of the remote server.
   */
  public String getRemoteAddress()
  public HostPort getRemoteAddress()
  {
    return remoteAddress;
  }
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -102,7 +102,7 @@
  {
    final int port = HostPort.valueOf(serverURL).getPort();
    // Ensure correct formatting of IPv6 addresses by using a HostPort instance.
    return new HostPort(session.getRemoteAddress(), port).toString();
    return new HostPort(session.getRemoteAddress().getHost(), port).toString();
  }
  /**
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationBroker.java
@@ -20,9 +20,21 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.net.*;
import java.util.*;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -34,13 +46,33 @@
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Utils;
import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.server.config.server.ReplicationDomainCfg;
import org.forgerock.util.Utils;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.types.HostPort;
import static org.opends.messages.ReplicationMessages.*;
@@ -61,17 +93,15 @@
  @Immutable
  private static final class ConnectedRS
  {
    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
        NO_CONNECTED_SERVER);
    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(NO_CONNECTED_SERVER);
    /** The info of the RS we are connected to. */
    private final ReplicationServerInfo rsInfo;
    /** Contains a connected session to the RS if any exist, null otherwise. */
    private final Session session;
    private final String replicationServer;
    private final HostPort replicationServer;
    private ConnectedRS(String replicationServer)
    private ConnectedRS(HostPort replicationServer)
    {
      this.rsInfo = null;
      this.session = null;
@@ -82,14 +112,14 @@
    {
      this.rsInfo = rsInfo;
      this.session = session;
      this.replicationServer = session != null ?
          session.getReadableRemoteAddress()
      this.replicationServer = session != null
          ? session.getRemoteAddress()
          : NO_CONNECTED_SERVER;
    }
    private static ConnectedRS stopped()
    {
      return new ConnectedRS("stopped");
      return NO_CONNECTED_RS;
    }
    private static ConnectedRS noConnectedRS()
@@ -142,10 +172,8 @@
  private volatile boolean shutdown;
  private final Object startStopLock = new Object();
  private volatile ReplicationDomainCfg config;
  /**
   * String reported under CSN=monitor when there is no connected RS.
   */
  static final String NO_CONNECTED_SERVER = "Not connected";
  /** String reported under CSN=monitor when there is no connected RS. */
  static final HostPort NO_CONNECTED_SERVER = new HostPort(null, 0);
  private final ServerState state;
  private Semaphore sendWindow;
  private int maxSendWindow;
@@ -2700,13 +2728,11 @@
  }
  /**
   * Get the name of the replicationServer to which this broker is currently
   * connected.
   * Get the host and port of the replicationServer to which this broker is currently connected.
   *
   * @return the name of the replicationServer to which this domain
   *         is currently connected.
   * @return the host and port of the replicationServer to which this domain is currently connected.
   */
  public String getReplicationServer()
  public HostPort getReplicationServer()
  {
    return connectedRS.get().replicationServer;
  }
@@ -3254,10 +3280,10 @@
   *
   * @return The local address.
   */
  String getLocalUrl()
  HostPort getLocalUrl()
  {
    final Session session = connectedRS.get().session;
    return session != null ? session.getLocalUrl() : "";
    return session != null ? session.getLocalUrl() : new HostPort(null, 0);
  }
  /**
opendj-server-legacy/src/main/java/org/opends/server/replication/service/ReplicationDomain.java
@@ -83,6 +83,7 @@
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.HostPort;
/**
 * This class should be used as a base for Replication implementations.
@@ -2698,7 +2699,7 @@
   * @return the name of the replicationServer to which this domain
   *         is currently connected.
   */
  public String getReplicationServer()
  public HostPort getReplicationServer()
  {
    if (broker != null)
    {
@@ -3447,10 +3448,10 @@
   *
   * @return The local address.
   */
  String getLocalUrl()
  HostPort getLocalUrl()
  {
    final ReplicationBroker tmp = broker;
    return tmp != null ? tmp.getLocalUrl() : "";
    return tmp != null ? tmp.getLocalUrl() : ReplicationBroker.NO_CONNECTED_SERVER;
  }
  /**
opendj-server-legacy/src/test/java/org/opends/server/replication/ReplicationTestCase.java
@@ -61,7 +61,6 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.Entry;
import org.opends.server.types.HostPort;
import org.opends.server.types.Modification;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.util.TestTimer;
@@ -550,17 +549,13 @@
      @Override
      public Void call() throws Exception
      {
        assertEquals(DirectoryServer.entryExists(dn), exist);
        assertEquals(DirectoryServer.entryExists(dn), exist, "Expected entry with dn \"" + dn + "\" would exist");
        return null;
      }
    });
    Entry entry = DirectoryServer.getEntry(dn);
    if (entry != null)
    {
      return entry.duplicate(true);
    }
    return null;
    return entry != null ? entry.duplicate(true) : null;
  }
  /** Update the monitor count for the specified monitor attribute. */
@@ -923,9 +918,8 @@
      boolean rightPort = false;
      if (connected)
      {
        String rsUrl = rd.getReplicationServer();
        try {
          rdPort = HostPort.valueOf(rsUrl).getPort();
          rdPort = rd.getReplicationServer().getPort();
          rightPort = rdPort == rsPort;
        }
        catch (IllegalArgumentException notConnectedYet)
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -52,7 +52,6 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.Entry;
import org.opends.server.types.HostPort;
import org.opends.server.types.Modification;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -372,11 +371,8 @@
    DN baseDN = DN.valueOf(firstBackend ? TEST_ROOT_DN_STRING : TEST2_ROOT_DN_STRING);
    replicationDomain = new FakeReplicationDomain(baseDN, DS2_ID, replicationServers, 1000, generationId);
    // Test connection
    assertTrue(replicationDomain.isConnected());
    // Check connected server port
    String serverStr = replicationDomain.getReplicationServer();
    assertEquals(HostPort.valueOf(serverStr).getPort(), replServerPort);
    assertEquals(replicationDomain.getReplicationServer().getPort(), replServerPort);
  }
  private void initTest() throws Exception
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/ReplicationServerFailoverTest.java
@@ -31,7 +31,6 @@
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.HostPort;
import org.testng.annotations.Test;
/**
@@ -128,7 +127,7 @@
      // DS1 connected to RS1 ?
      // Check which replication server is connected to this LDAP server
      rsPort = findReplServerConnected(rd1);
      rsPort = rd1.getReplicationServer().getPort();
      if (rsPort == rs1Port)
      {
@@ -332,16 +331,4 @@
    return replicationDomain;
  }
  private int findReplServerConnected(LDAPReplicationDomain rd)
  {
    // First check that the Replication domain is connected
    if (!rd.isConnected())
    {
      return -1;
    }
    String serverStr = rd.getReplicationServer();
    return HostPort.valueOf(serverStr).getPort();
  }
}
opendj-server-legacy/src/test/java/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -68,7 +68,6 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.HostPort;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -318,11 +317,8 @@
      fakeReplicationDomain.startListenService();
    }
    // Test connection
    assertTrue(fakeReplicationDomain.isConnected());
    // Check connected server port
    HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
    assertEquals(rd.getPort(), getRsPort(rsId));
    assertEquals(fakeReplicationDomain.getReplicationServer().getPort(), getRsPort(rsId));
    return fakeReplicationDomain;
  }