mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

jcduff
23.04.2008 f73b655466092169abac34833fb628fce1fcdebe
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -44,7 +44,6 @@
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@@ -66,8 +65,8 @@
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
@@ -114,7 +113,6 @@
  private String localURL = "null";
  private boolean shutdown = false;
  private short replicationServerId;
  private ReplicationDbEnv dbEnv;
  private int rcvWindow;
  private int queueSize;
@@ -139,6 +137,19 @@
  private boolean connectedInTopology = false;
  private final Object connectedInTopologyLock = new Object();
  /*
   * Assured mode properties
   */
  // Timeout (in milliseconds) when waiting for acknowledgments
  private long assuredTimeout = 1000;
  // Group id
  private byte groupId = (byte)1;
  // Number of pending changes for a DS, considered as threshold value to put
  // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled
  private int degradedStatusThreshold = 5000;
  /**
   * The tracer object for the debug logger.
   */
@@ -156,7 +167,7 @@
    super("Replication Server" + configuration.getReplicationPort());
    replicationPort = configuration.getReplicationPort();
    replicationServerId = (short) configuration.getReplicationServerId();
    serverId = (short) configuration.getReplicationServerId();
    replicationServers = configuration.getReplicationServer();
    if (replicationServers == null)
      replicationServers = new ArrayList<String>();
@@ -187,9 +198,12 @@
      Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
      throw new ConfigException(msg, e);
    }
    groupId = (byte)configuration.getGroupId();
    assuredTimeout = configuration.getAssuredTimeout();
    degradedStatusThreshold = configuration.getDegradedStatusThreshold();
    replSessionSecurity = new ReplSessionSecurity(configuration);
    initialize(replicationServerId, replicationPort);
    initialize(replicationPort);
    configuration.addChangeListener(this);
    DirectoryServer.registerMonitorProvider(this);
@@ -246,11 +260,11 @@
      try
      {
        newSocket =  listenSocket.accept();
        newSocket.setReceiveBufferSize(1000000);
        newSocket.setTcpNoDelay(true);
        newSocket.setKeepAlive(true);
        ProtocolSession session =
             replSessionSecurity.createServerSession(newSocket);
             replSessionSecurity.createServerSession(newSocket,
             ReplSessionSecurity.HANDSHAKE_TIMEOUT);
        if (session == null) // Error, go back to accept
          continue;
        ServerHandler handler = new ServerHandler(session, queueSize);
@@ -362,12 +376,12 @@
      InetSocketAddress ServerAddr = new InetSocketAddress(
                     InetAddress.getByName(hostname), Integer.parseInt(port));
      Socket socket = new Socket();
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      socket.connect(ServerAddr, 500);
      ServerHandler handler = new ServerHandler(
           replSessionSecurity.createClientSession(serverURL, socket),
           replSessionSecurity.createClientSession(serverURL, socket,
           ReplSessionSecurity.HANDSHAKE_TIMEOUT),
           queueSize);
      handler.start(baseDn, serverId, this.serverURL, rcvWindow,
                    sslEncryption, this);
@@ -382,12 +396,11 @@
  /**
   * initialization function for the replicationServer.
   *
   * @param  changelogId       The unique identifier for this replicationServer.
   * @param  changelogPort     The port on which the replicationServer should
   *                           listen.
   *
   */
  private void initialize(short changelogId, int changelogPort)
  private void initialize(int changelogPort)
  {
    shutdown = false;
@@ -400,11 +413,6 @@
          this);
      /*
       * create replicationServer replicationServerDomain
       */
      serverId = changelogId;
      /*
       * Open replicationServer socket
       */
      String localhostname = InetAddress.getLocalHost().getHostName();
@@ -412,7 +420,6 @@
      serverURL = localhostname + ":" + String.valueOf(changelogPort);
      localURL = localAdddress + ":" + String.valueOf(changelogPort);
      listenSocket = new ServerSocket();
      listenSocket.setReceiveBufferSize(1000000);
      listenSocket.bind(new InetSocketAddress(changelogPort));
      /*
@@ -421,9 +428,10 @@
       */
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates connect threads");
            " creates connect thread");
      connectThread =
        new ReplicationServerConnectThread("Replication Server Connect", this);
        new ReplicationServerConnectThread("Replication Server Connect " +
        serverId , this);
      connectThread.start();
      // FIXME : Is it better to have the time to receive the ReplServerInfo
@@ -432,10 +440,11 @@
      try { Thread.sleep(300);} catch(Exception e) {}
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates listen threads");
            " creates listen thread");
      listenThread =
        new ReplicationServerListenThread("Replication Server Listener", this);
        new ReplicationServerListenThread("Replication Server Listener " +
        serverId , this);
      listenThread.start();
      if (debugEnabled())
@@ -546,7 +555,7 @@
   * DN given in parameter.
   *
   * @param id The serverId for which the dbHandler must be created.
   * @param baseDn The DN for which the dbHandler muste be created.
   * @param baseDn The DN for which the dbHandler must be created.
   * @return The new DB handler for this ReplicationServer and the serverId and
   *         DN given in parameter.
   * @throws DatabaseException in case of underlying database problem.
@@ -594,7 +603,7 @@
   * @param configuration The configuration to check.
   * @param unacceptableReasons When the configuration is not acceptable, this
   *                            table is use to return the reasons why this
   *                            configuration is not acceptbale.
   *                            configuration is not acceptable.
   *
   * @return true if the configuration is acceptable, false other wise.
   */
@@ -666,7 +675,6 @@
        serverURL = localhostname + ":" + String.valueOf(replicationPort);
        localURL = localAdddress + ":" + String.valueOf(replicationPort);
        listenSocket = new ServerSocket();
        listenSocket.setReceiveBufferSize(1000000);
        listenSocket.bind(new InetSocketAddress(replicationPort));
        listenThread =
@@ -690,6 +698,31 @@
      }
    }
    // Update threshold value for status analyzers (stop them if requested
    // value is 0)
    if (degradedStatusThreshold != configuration.getDegradedStatusThreshold())
    {
      int oldThresholdValue = degradedStatusThreshold;
      degradedStatusThreshold = configuration.getDegradedStatusThreshold();
      for(ReplicationServerDomain rsd : baseDNs.values())
      {
        if (degradedStatusThreshold == 0)
        {
          // Requested to stop analyzers
          rsd.stopStatusAnalyzer();
        } else if (rsd.isRunningStatusAnalyzer())
        {
          // Update the threshold value for this running analyzer
          rsd.updateStatusAnalyzer(degradedStatusThreshold);
        } else if (oldThresholdValue == 0)
        {
          // Requested to start analyzers with provided threshold value
          if (rsd.getConnectedDSs().size() > 0)
            rsd.startStatusAnalyzer();
        }
      }
    }
    if ((configuration.getReplicationDBDirectory() != null) &&
        (!dbDirname.equals(configuration.getReplicationDBDirectory())))
    {
@@ -724,7 +757,7 @@
  public String getMonitorInstanceName()
  {
    return "Replication Server " + this.replicationPort + " "
           + replicationServerId;
           + serverId;
  }
  /**
@@ -757,31 +790,23 @@
     * publish the server id and the port number.
     */
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
    attributes.add(new Attribute("replication server id",
    attributes.add(Attributes.create("replication server id",
        String.valueOf(serverId)));
    attributes.add(new Attribute("replication server port",
    attributes.add(Attributes.create("replication server port",
        String.valueOf(replicationPort)));
    /*
     * Add all the base DNs that are known by this replication server.
     */
    AttributeType baseType=
      DirectoryServer.getAttributeType("base-dn", true);
    LinkedHashSet<AttributeValue> baseValues =
      new LinkedHashSet<AttributeValue>();
    AttributeBuilder builder = new AttributeBuilder("base-dn");
    for (DN base : baseDNs.keySet())
    {
      baseValues.add(new AttributeValue(baseType, base. toString()));
      builder.add(base.toString());
    }
    Attribute bases = new Attribute(baseType, "base-dn", baseValues);
    attributes.add(bases);
    attributes.add(builder.toAttribute());
    // Publish to monitor the generation ID by replicationServerDomain
    AttributeType generationIdType=
      DirectoryServer.getAttributeType("base-dn-generation-id", true);
    LinkedHashSet<AttributeValue> generationIdValues =
      new LinkedHashSet<AttributeValue>();
    builder = new AttributeBuilder("base-dn-generation-id");
    for (DN base : baseDNs.keySet())
    {
      long generationId=-1;
@@ -789,12 +814,9 @@
              getReplicationServerDomain(base, false);
      if (replicationServerDomain != null)
        generationId = replicationServerDomain.getGenerationId();
      generationIdValues.add(new AttributeValue(generationIdType,
          base.toString() + " " + generationId));
      builder.add(base.toString() + " " + generationId);
    }
    Attribute generationIds = new Attribute(generationIdType, "generation-id",
        generationIdValues);
    attributes.add(generationIds);
    attributes.add(builder.toAttribute());
    return attributes;
  }
@@ -916,7 +938,7 @@
  {
    try
    {
      if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
      if (DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
      {
        // Delete the replication backend
        DirectoryServer.getConfigHandler().deleteEntry(backendConfigEntryDN,
@@ -964,7 +986,7 @@
                                boolean successful)
  {
    if (backend.getBackendID().equals(backendId))
      initialize(this.replicationServerId, this.replicationPort);
      initialize(this.replicationPort);
  }
  /**
@@ -1041,6 +1063,33 @@
  }
  /**
   * Get the assured mode timeout.
   * @return The assured mode timeout.
   */
  public long getAssuredTimeout()
  {
    return assuredTimeout;
  }
  /**
   * Get The replication server group id.
   * @return The replication server group id.
   */
  public byte getGroupId()
  {
    return groupId;
  }
  /**
   * Get the threshold value for status analyzer.
   * @return The threshold value for status analyzer.
   */
  public int getDegradedStatusThreshold()
  {
    return degradedStatusThreshold;
  }
  /**
   * Compute the list of replication servers that are not any
   * more connected to this Replication Server and stop the
   * corresponding handlers.