mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
10.43.2009 ccc4127f23f63214f4dc2f94d26a021a3ec2eec6
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,16 +42,18 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.Backend;
@@ -61,13 +63,22 @@
import org.opends.server.api.RestoreTaskListener;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.WorkflowImpl;
import org.opends.server.core.networkgroups.NetworkGroup;
import org.opends.server.loggers.LogLevel;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ExternalChangeLogSession;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartECLMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
@@ -75,7 +86,9 @@
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.ServerConstants;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import com.sleepycat.je.DatabaseException;
@@ -155,6 +168,8 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private String externalChangeLogWorkflowID = "External Changelog Workflow ID";
  ECLWorkflowElement eclwe;
  private static HashSet<Integer> localPorts = new HashSet<Integer>();
  /**
@@ -267,9 +282,35 @@
             ReplSessionSecurity.HANDSHAKE_TIMEOUT);
        if (session == null) // Error, go back to accept
          continue;
        ServerHandler handler = new ServerHandler(session, queueSize);
        handler.start(null, serverId, serverURL, rcvWindow,
                      false, this);
        ReplicationMsg msg = session.receive();
        if (msg instanceof ServerStartMsg)
        {
          DataServerHandler handler = new DataServerHandler(session,
              queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteDS((ServerStartMsg)msg);
        }
        else if (msg instanceof ReplServerStartMsg)
        {
          ReplicationServerHandler handler = new ReplicationServerHandler(
              session,queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteRS((ReplServerStartMsg)msg);
        }
        else if (msg instanceof ServerStartECLMsg)
        {
          ECLServerHandler handler = new ECLServerHandler(
              session,queueSize,serverURL,serverId,this,rcvWindow);
          handler.startFromRemoteServer((ServerStartECLMsg)msg);
        }
        else
        {
          // We did not recognize the message, close session as what
          // can happen after is undetermined and we do not want the server to
          // be disturbed
          ServerHandler.closeSession(session, null, null);
          return;
        }
      }
      catch (Exception e)
      {
@@ -277,6 +318,10 @@
        // shutdown or changing the port number process.
        // Just log debug information and loop.
        // Do not log the message during shutdown.
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        if (shutdown == false) {
          Message message =
            ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
@@ -379,20 +424,20 @@
  /**
   * Establish a connection to the server with the address and port.
   *
   * @param serverURL  The address and port for the server, separated by a
   * @param remoteServerURL  The address and port for the server, separated by a
   *                    colon.
   * @param baseDn     The baseDn of the connection
   */
  private void connect(String serverURL, String baseDn)
  private void connect(String remoteServerURL, String baseDn)
  {
    int separator = serverURL.lastIndexOf(':');
    String port = serverURL.substring(separator + 1);
    String hostname = serverURL.substring(0, separator);
    boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL);
    int separator = remoteServerURL.lastIndexOf(':');
    String port = remoteServerURL.substring(separator + 1);
    String hostname = remoteServerURL.substring(0, separator);
    boolean sslEncryption =replSessionSecurity.isSslEncryption(remoteServerURL);
    if (debugEnabled())
      TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
               " connects to " + serverURL);
               " connects to " + remoteServerURL);
    try
    {
@@ -402,12 +447,25 @@
      socket.setTcpNoDelay(true);
      socket.connect(ServerAddr, 500);
      /*
      ServerHandler handler = new ServerHandler(
           replSessionSecurity.createClientSession(serverURL, socket,
           ReplSessionSecurity.HANDSHAKE_TIMEOUT),
           queueSize);
      handler.start(baseDn, serverId, this.serverURL, rcvWindow,
                    sslEncryption, this);
      */
      ReplicationServerHandler handler = new ReplicationServerHandler(
          replSessionSecurity.createClientSession(remoteServerURL,
              socket,
              ReplSessionSecurity.HANDSHAKE_TIMEOUT),
              queueSize,
              this.serverURL,
              serverId,
              this,
              rcvWindow);
      handler.connect(baseDn, sslEncryption);
    }
    catch (Exception e)
    {
@@ -470,6 +528,10 @@
        serverId , this);
      listenThread.start();
      // Initialize the External Changelog
      // FIXME: how is WF creation enabed/disabled in the RS ?
      initializeECL();
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " successfully initialized");
@@ -493,10 +555,88 @@
      Message message =
          ERR_COULD_NOT_BIND_CHANGELOG.get(changelogPort, e.getMessage());
      logError(message);
    } catch (DirectoryException e)
    {
      //FIXME:DirectoryException is raised by initializeECL => fix err msg
      Message message = Message.raw(Category.SYNC, Severity.SEVERE_ERROR,
        "Directory Exception raised by ECL initialization: " + e.getMessage());
      logError(message);
    }
  }
  /**
   * Initializes the ECL access by creating a dedicated workflow element.
   * @throws DirectoryException
   */
  private void initializeECL()
  throws DirectoryException
  {
    WorkflowImpl externalChangeLogWorkflow;
    if (WorkflowImpl.getWorkflow(externalChangeLogWorkflowID)
        !=null)
      return;
    ECLWorkflowElement eclwe = new ECLWorkflowElement(this);
    // Create the workflow for the base DN and register the workflow with
    // the server.
    externalChangeLogWorkflow = new WorkflowImpl(
        externalChangeLogWorkflowID,
        DN.decode(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT),
        eclwe.getWorkflowElementID(),
        eclwe);
    externalChangeLogWorkflow.register();
    NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
    defaultNetworkGroup.registerWorkflow(externalChangeLogWorkflow);
    // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
    NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
    adminNetworkGroup.registerWorkflow(externalChangeLogWorkflow);
    // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup?
    NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup();
    internalNetworkGroup.registerWorkflow(externalChangeLogWorkflow);
}
  private void finalizeECL()
  {
    WorkflowImpl eclwf =
      (WorkflowImpl)WorkflowImpl.getWorkflow(externalChangeLogWorkflowID);
    // do it only if not already done by another RS (unit test case)
    // if (DirectoryServer.getWorkflowElement(externalChangeLogWorkflowID)
    if (eclwf!=null)
    {
    // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup?
    NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup();
    internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
    // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
    NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
    adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
    NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
    defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
    eclwf.deregister();
    eclwf.finalizeWorkflow();
    }
    eclwe = (ECLWorkflowElement)
    DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
    if (eclwe!=null)
    {
      DirectoryServer.deregisterWorkflowElement(eclwe);
      eclwe.finalizeWorkflowElement();
    }
}
  /**
   * Get the ReplicationServerDomain associated to the base DN given in
   * parameter.
   *
@@ -571,6 +711,8 @@
    {
      dbEnv.shutdown();
    }
    finalizeECL();
}
@@ -1220,6 +1362,38 @@
  }
  /**
   * Returns the number of domains managed by this replication server.
   * @return the number of domains managed.
   */
  public int getCacheSize()
  {
    return baseDNs.size();
  }
  /**
   * Create a new session to get the ECL.
   * @param msg The message that specifies the ECL request.
   * @return Returns the created session.
   * @throws DirectoryException When an error occurs.
   */
  public ExternalChangeLogSession createECLSession(StartECLSessionMsg msg)
  throws DirectoryException
  {
    ExternalChangeLogSessionImpl session =
      new ExternalChangeLogSessionImpl(this, msg);
    return session;
  }
  /**
   * Getter on the server URL.
   * @return the server URL.
   */
  public String getServerURL()
  {
    return this.serverURL;
  }
  /**
   * This method allows to check if the Replication Server given
   * as the parameter is running in the local JVM.
   *