mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

coulbeck
28.54.2007 f7036e50348484f4daf39f9e8457de602ab83939
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -54,15 +54,7 @@
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ServerStartMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.WindowMessage;
import org.opends.server.replication.protocol.WindowProbe;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ResultCode;
@@ -101,6 +93,7 @@
  private int maxRcvWindow;
  private int timeout = 0;
  private short protocolVersion;
  private ReplSessionSecurity replSessionSecurity;
  /**
   * The time in milliseconds between heartbeats from the replication
@@ -150,10 +143,12 @@
   * @param window The size of the send and receive window to use.
   * @param heartbeatInterval The interval between heartbeats requested of the
   * replicationServer, or zero if no heartbeats are requested.
   * @param replSessionSecurity The session security configuration.
   */
  public ReplicationBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval)
      int maxSendDelay, int window, long heartbeatInterval,
      ReplSessionSecurity replSessionSecurity)
  {
    this.baseDn = baseDn;
    this.serverID = serverID;
@@ -169,6 +164,7 @@
    this.halfRcvWindow = window/2;
    this.heartbeatInterval = heartbeatInterval;
    this.protocolVersion = ProtocolVersion.currentVersion();
    this.replSessionSecurity = replSessionSecurity;
  }
  /**
@@ -199,7 +195,6 @@
   * Connect to a ReplicationServer.
   *
   * @throws NumberFormatException address was invalid
   * @throws IOException error during connection phase
   */
  private void connect()
  {
@@ -236,15 +231,16 @@
            socket.setReceiveBufferSize(1000000);
            socket.setTcpNoDelay(true);
            socket.connect(ServerAddr, 500);
            session = new SocketSession(socket);
            session = replSessionSecurity.createClientSession(server, socket);
            boolean isSslEncryption =
                 replSessionSecurity.isSslEncryption(server);
            /*
             * Send our ServerStartMessage.
             */
            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                halfRcvWindow*2, heartbeatInterval, state,
                protocolVersion);
                protocolVersion, isSslEncryption);
            session.publish(msg);
@@ -264,6 +260,11 @@
                startMsg.getVersion());
            session.setSoTimeout(timeout);
            if (!isSslEncryption)
            {
              session.stopEncryption();
            }
            /*
             * We must not publish changes to a replicationServer that has not
             * seen all our previous changes because this could cause some
@@ -854,4 +855,18 @@
  {
    return !connectionError;
  }
  /**
   * Determine whether the connection to the replication server is encrypted.
   * @return true if the connection is encrypted, false otherwise.
   */
  public boolean isSessionEncrypted()
  {
    boolean isEncrypted = false;
    if (session != null)
    {
      return session.isEncrypted();
    }
    return isEncrypted;
  }
}