mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -25,13 +25,13 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import org.opends.messages.MessageBuilder;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.io.IOException;
@@ -61,6 +61,8 @@
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.ResultCode;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import com.sleepycat.je.DatabaseException;
@@ -101,12 +103,17 @@
  private int queueSize;
  private String dbDirname = null;
  private long trimAge; // the time (in sec) after which the  changes must
                        // be deleted from the persistent storage.
  private int replicationPort;
                        // de deleted from the persistent storage.
  private boolean stopListen = false;
  private ReplSessionSecurity replSessionSecurity;
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  /**
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param configuration The configuration of this replication server.
@@ -191,7 +198,7 @@
        // The socket has probably been closed as part of the
        // shutdown or changing the port number process.
        // just log debug information and loop.
        Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get();
        Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
        logError(message);
      }
    }
@@ -272,6 +279,10 @@
    String hostname = serverURL.substring(0, separator);
    boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL);
    if (debugEnabled())
      TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
               " connects to " + serverURL);
    try
    {
      InetSocketAddress ServerAddr = new InetSocketAddress(
@@ -329,22 +340,41 @@
      listenSocket.bind(new InetSocketAddress(changelogPort));
      /*
       * create working threads
       * creates working threads
       * We must first connect, then start to listen.
       */
      listenThread =
        new ReplicationServerListenThread("Replication Server Listener", this);
      listenThread.start();
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates connect threads");
      connectThread =
        new ReplicationServerConnectThread("Replication Server Connect", this);
      connectThread.start();
      // FIXME : Is it better to have the time to receive the ReplServerInfo
      // from all the other replication servers since this info is necessary
      // to route an early received total update request.
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates listen threads");
      listenThread =
        new ReplicationServerListenThread("Replication Server Listener", this);
      listenThread.start();
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " successfully initialized");
    } catch (DatabaseException e)
    {
      Message message = ERR_COULD_NOT_INITIALIZE_DB.get(dbDirname);
      Message message = ERR_COULD_NOT_INITIALIZE_DB.get(
        getFileForPath(dbDirname).getAbsolutePath());
      logError(message);
    } catch (ReplicationDBException e)
    {
      Message message = ERR_COULD_NOT_READ_DB.get(dbDirname);
      Message message = ERR_COULD_NOT_READ_DB.get(dbDirname,
          e.getLocalizedMessage());
      logError(message);
    } catch (UnknownHostException e)
    {
@@ -362,18 +392,22 @@
   * Get the ReplicationCache associated to the base DN given in parameter.
   *
   * @param baseDn The base Dn for which the ReplicationCache must be returned.
   * @param create Specifies whether to create the ReplicationCache if it does
   *        not already exist.
   * @return The ReplicationCache associated to the base DN given in parameter.
   */
  public ReplicationCache getReplicationCache(DN baseDn)
  public ReplicationCache getReplicationCache(DN baseDn, boolean create)
  {
    ReplicationCache replicationCache;
    synchronized (baseDNs)
    {
      replicationCache = baseDNs.get(baseDn);
      if (replicationCache == null)
      if ((replicationCache == null) && (create))
      {
        replicationCache = new ReplicationCache(baseDn, this);
      baseDNs.put(baseDn, replicationCache);
        baseDNs.put(baseDn, replicationCache);
      }
    }
    return replicationCache;
@@ -384,6 +418,9 @@
   */
  public void shutdown()
  {
    if (shutdown)
      return;
    shutdown = true;
    // shutdown the connect thread
@@ -404,6 +441,12 @@
      // replication Server service is closing anyway.
    }
    // shutdown the listen thread
    if (listenThread != null)
    {
      listenThread.interrupt();
    }
    // shutdown all the ChangelogCaches
    for (ReplicationCache replicationCache : baseDNs.values())
    {
@@ -424,13 +467,36 @@
   *
   * @param id The serverId for which the dbHandler must be created.
   * @param baseDn The DN for which the dbHandler muste be created.
   * @param generationId The generationId for this server and this domain.
   * @return The new DB handler for this ReplicationServer and the serverId and
   *         DN given in parameter.
   * @throws DatabaseException in case of underlying database problem.
   */
  DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException
  public DbHandler newDbHandler(short id, DN baseDn, long generationId)
  throws DatabaseException
  {
    return new DbHandler(id, baseDn, this, dbEnv);
    return new DbHandler(id, baseDn, this, dbEnv, generationId);
  }
  /**
   * Clears the generationId for the domain related to the provided baseDn.
   * @param  baseDn The baseDn for which to delete the generationId.
   * @throws DatabaseException When it occurs.
   */
  public void clearGenerationId(DN baseDn)
  throws DatabaseException
  {
    try
    {
      dbEnv.clearGenerationId(baseDn);
    }
    catch(Exception e)
    {
      TRACER.debugInfo(
          "In RS <" + getMonitorInstanceName() +
          " Exception in clearGenerationId" +
          stackTraceToSingleLineString(e) + e.getLocalizedMessage());
    }
  }
  /**
@@ -618,7 +684,50 @@
    Attribute bases = new Attribute(baseType, "base-dn", baseValues);
    attributes.add(bases);
    // Publish to monitor the generation ID by domain
    AttributeType generationIdType=
      DirectoryServer.getAttributeType("base-dn-generation-id", true);
    LinkedHashSet<AttributeValue> generationIdValues =
      new LinkedHashSet<AttributeValue>();
    for (DN base : baseDNs.keySet())
    {
      long generationId=-1;
      ReplicationCache cache = getReplicationCache(base, false);
      if (cache != null)
        generationId = cache.getGenerationId();
      generationIdValues.add(new AttributeValue(generationIdType,
          base.toString() + " " + generationId));
    }
    Attribute generationIds = new Attribute(generationIdType, "generation-id",
        generationIdValues);
    attributes.add(generationIds);
    return attributes;
  }
  /**
   * Get the value of generationId for the replication domain
   * associated with the provided baseDN.
   *
   * @param baseDN The baseDN of the domain.
   * @return The value of the generationID.
   */
  public long getGenerationId(DN baseDN)
  {
    ReplicationCache rc = this.getReplicationCache(baseDN, false);
    if (rc!=null)
      return rc.getGenerationId();
    return -1;
  }
  /**
   * Get the serverId for this replication server.
   *
   * @return The value of the serverId.
   *
   */
  public short getServerId()
  {
    return serverId;
  }
}