mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.51.2013 403f2977dffbdb72660538effbfdd6ea9473af3a
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -565,7 +565,7 @@
  private String findCookie(final int startDraftCN) throws ChangelogException,
      DirectoryException
  {
    ChangelogDB changelogDB = replicationServer.getChangelogDB();
    final ChangelogDB changelogDB = replicationServer.getChangelogDB();
    if (startDraftCN <= 1)
    {
@@ -580,9 +580,10 @@
        return null;
      }
      final int firstKey = changelogDB.getFirstKey();
      String crossDomainStartState = changelogDB.getPreviousCookie(firstKey);
      changelogDBIter = changelogDB.generateIterator(firstKey);
      final int firstDraftCN = changelogDB.getFirstDraftCN();
      final String crossDomainStartState =
          changelogDB.getPreviousCookie(firstDraftCN);
      changelogDBIter = changelogDB.generateIterator(firstDraftCN);
      return crossDomainStartState;
    }
@@ -633,7 +634,7 @@
        return null;
      }
      final int lastKey = changelogDB.getLastKey();
      final int lastKey = changelogDB.getLastDraftCN();
      crossDomainStartState = changelogDB.getPreviousCookie(lastKey);
      changelogDBIter = changelogDB.generateIterator(lastKey);
      return crossDomainStartState;
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -89,8 +89,8 @@
  private Thread listenThread;
  private Thread connectThread;
  /** The list of replication servers configured by the administrator. */
  private Collection<String> replicationServers;
  /** The list of replication server URLs configured by the administrator. */
  private Collection<String> replicationServerUrls;
  /**
   * This table is used to store the list of dn for which we are currently
@@ -219,9 +219,9 @@
  {
    replicationPort = configuration.getReplicationPort();
    serverId = configuration.getReplicationServerId();
    replicationServers = configuration.getReplicationServer();
    if (replicationServers == null)
      replicationServers = new ArrayList<String>();
    replicationServerUrls = configuration.getReplicationServer();
    if (replicationServerUrls == null)
      replicationServerUrls = new ArrayList<String>();
    queueSize = configuration.getQueueSize();
    purgeDelay = configuration.getReplicationPurgeDelay();
    dbDirname = configuration.getReplicationDBDirectory();
@@ -259,8 +259,8 @@
    configuration.addChangeListener(this);
    try
    {
      backendConfigEntryDN = DN.decode(
      "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
      backendConfigEntryDN =
         DN.decode("ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
    } catch (Exception e) { /* do nothing */ }
    // Creates the backend associated to this ReplicationServer
@@ -404,14 +404,14 @@
          /*
           * check that all replication server in the config are in the
           * connected Set. If not create the connection
           * connected Set. If not, create the connection
           */
          for (String aServerURL : replicationServers)
          for (String rsURL : replicationServerUrls)
          {
            final int separator = aServerURL.lastIndexOf(':');
            final String portString = aServerURL.substring(separator + 1);
            final int port = Integer.parseInt(portString);
            final String hostname = aServerURL.substring(0, separator);
            final int separator = rsURL.lastIndexOf(':');
            final String hostname = rsURL.substring(0, separator);
            final int port = Integer.parseInt(rsURL.substring(separator + 1));
            final InetAddress inetAddress;
            try
            {
@@ -436,13 +436,13 @@
            }
            // Don't connect to a server if it is already connected.
            final String normalizedServerURL = normalizeServerURL(aServerURL);
            final String normalizedServerURL = normalizeServerURL(rsURL);
            if (connectedRSUrls.contains(normalizedServerURL))
            {
              continue;
            }
            connect(aServerURL, domain.getBaseDn());
            connect(rsURL, domain.getBaseDn());
          }
        }
@@ -538,10 +538,7 @@
      listenSocket = new ServerSocket();
      listenSocket.bind(new InetSocketAddress(replicationPort));
      /*
       * creates working threads
       * We must first connect, then start to listen.
       */
      // creates working threads: we must first connect, then start to listen.
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates connect thread");
@@ -559,7 +556,7 @@
      // can know me and really enableECL.
      if (WorkflowImpl.getWorkflow(externalChangeLogWorkflowID) != null)
      {
        // Already done . Nothing to do
        // Already done. Nothing to do
        return;
      }
      eclwe = new ECLWorkflowElement(this);
@@ -567,7 +564,6 @@
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " successfully initialized");
    } catch (ChangelogException e)
    {
      Message message = ERR_COULD_NOT_READ_DB.get(
@@ -927,7 +923,7 @@
        try
        {
          lastGeneratedDraftCN = changelogDB.getLastKey();
          lastGeneratedDraftCN = changelogDB.getLastDraftCN();
        }
        catch (Exception ignored)
        {
@@ -995,9 +991,9 @@
    disconnectRemovedReplicationServers(configuration.getReplicationServer());
    replicationServers = configuration.getReplicationServer();
    if (replicationServers == null)
      replicationServers = new ArrayList<String>();
    replicationServerUrls = configuration.getReplicationServer();
    if (replicationServerUrls == null)
      replicationServerUrls = new ArrayList<String>();
    queueSize = configuration.getQueueSize();
    long newPurgeDelay = configuration.getReplicationPurgeDelay();
@@ -1088,8 +1084,8 @@
      broadcastConfigChange();
    }
    if ((configuration.getReplicationDBDirectory() != null) &&
        (!dbDirname.equals(configuration.getReplicationDBDirectory())))
    final String newDir = configuration.getReplicationDBDirectory();
    if (newDir != null && !dbDirname.equals(newDir))
    {
      return new ConfigChangeResult(ResultCode.SUCCESS, true);
    }
@@ -1109,25 +1105,24 @@
     * First try the set of configured replication servers to see if one of them
     * is this replication server (this should always be the case).
     */
    for (String rs : replicationServers)
    for (String rsUrl : replicationServerUrls)
    {
      /*
       * No need validate the string format because the admin framework has
       * already done it.
       */
      final int index = rs.lastIndexOf(":");
      final String hostname = rs.substring(0, index);
      final int port = Integer.parseInt(rs.substring(index + 1));
      final int index = rsUrl.lastIndexOf(":");
      final String hostname = rsUrl.substring(0, index);
      final int port = Integer.parseInt(rsUrl.substring(index + 1));
      if (port == replicationPort && isLocalAddress(hostname))
      {
        serverURL = rs;
        serverURL = rsUrl;
        return;
      }
    }
    /*
     * Fall-back to the machine hostname.
     */
    // Fall-back to the machine hostname.
    serverURL = InetAddress.getLocalHost().getHostName() + ":"
        + replicationPort;
  }
@@ -1249,8 +1244,7 @@
  public void remove()
  {
    if (debugEnabled())
      TRACER.debugInfo("RS " +getMonitorInstanceName()+
          " starts removing");
      TRACER.debugInfo("RS " + getMonitorInstanceName() + " starts removing");
    shutdown();
    removeBackend();
@@ -1471,23 +1465,20 @@
  {
    Collection<String> serversToDisconnect = new ArrayList<String>();
    for (String server: replicationServers)
    for (String rsUrl : replicationServerUrls)
    {
      if (!newReplServers.contains(server))
      if (!newReplServers.contains(rsUrl))
      {
        try
        {
          // translate the server name into IP address
          // and keep the port number
          String[] host = server.split(":");
          // translate the server name into IP address and keep the port number
          String[] host = rsUrl.split(":");
          serversToDisconnect.add(
              (InetAddress.getByName(host[0])).getHostAddress()
              + ":" + host[1]);
              InetAddress.getByName(host[0]).getHostAddress() + ":" + host[1]);
        }
        catch (IOException e)
        {
          Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(server);
          logError(message);
          logError(ERR_COULD_NOT_SOLVE_HOSTNAME.get(rsUrl));
        }
      }
    }
@@ -1686,7 +1677,7 @@
    {
      if (changelogDB != null)
      {
        return changelogDB.getFirstKey();
        return changelogDB.getFirstDraftCN();
      }
      return 0;
    }
@@ -1702,7 +1693,7 @@
    {
      if (changelogDB != null)
      {
        return changelogDB.getLastKey();
        return changelogDB.getLastDraftCN();
      }
      return 0;
    }
@@ -1754,10 +1745,10 @@
    int lastDraftCN;
    boolean dbEmpty = false;
    long newestDate = 0L;
    ChangelogDB changelogDB = getChangelogDB();
    long newestDate = 0;
    final ChangelogDB changelogDB = getChangelogDB();
    int firstDraftCN = changelogDB.getFirstKey();
    int firstDraftCN = changelogDB.getFirstDraftCN();
    Map<String,ServerState> domainsServerStateForLastSeqnum = null;
    ChangeNumber changeNumberForLastSeqnum = null;
    String domainForLastSeqnum = null;
@@ -1769,7 +1760,7 @@
    }
    else
    {
      lastDraftCN = changelogDB.getLastKey();
      lastDraftCN = changelogDB.getLastDraftCN();
      // Get the generalized state associated with the current last DraftCN
      // and initializes from it the startStates table
@@ -1813,7 +1804,7 @@
        //  (may be this domain was disabled when this record was returned).
        // In that case, are counted the changes from
        //  the date of the most recent change from this last draft record
        if (newestDate == 0L)
        if (newestDate == 0)
        {
          newestDate = changeNumberForLastSeqnum.getTime();
        }
@@ -1900,7 +1891,6 @@
  }
  private String normalizeServerURL(final String url)
  {
    final int separator = url.lastIndexOf(':');
@@ -1909,16 +1899,12 @@
    try
    {
      final InetAddress inetAddress = InetAddress.getByName(hostname);
      if (isLocalAddress(inetAddress))
      {
        // It doesn't matter whether we use an IP or hostname here.
        // It does not matter whether we use an IP or hostname here.
        return InetAddress.getLocalHost().getHostAddress() + ":" + portString;
      }
      else
      {
        return inetAddress.getHostAddress() + ":" + portString;
      }
      return inetAddress.getHostAddress() + ":" + portString;
    }
    catch (UnknownHostException e)
    {
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -70,14 +70,14 @@
   *
   * @return Returns the first draftCN in the DB.
   */
  int getFirstKey();
  int getFirstDraftCN();
  /**
   * Get the lastChange.
   *
   * @return Returns the last draftCN in the DB
   */
  int getLastKey();
  int getLastDraftCN();
  /**
   * Add an update to the list of messages that must be saved to the db managed
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -77,8 +77,16 @@
  private static int NO_KEY = 0;
  private DraftCNDB db;
  private int firstkey = NO_KEY;
  private int lastkey = NO_KEY;
  /**
   * FIXME Is this field that useful? {@link #getFirstDraftCN()} does not even
   * use it!
   */
  private int firstDraftCN = NO_KEY;
  /**
   * FIXME Is this field that useful? {@link #getLastDraftCN()} does not even
   * use it! It is not even updated.
   */
  private int lastDraftCN = NO_KEY;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private boolean shutdown = false;
  private boolean trimDone = false;
@@ -117,8 +125,8 @@
    // DB initialization
    db = new DraftCNDB(dbenv);
    firstkey = db.readFirstDraftCN();
    lastkey = db.readLastDraftCN();
    firstDraftCN = db.readFirstDraftCN();
    lastDraftCN = db.readLastDraftCN();
    // Trimming thread
    thread = new DirectoryThread(this, "Replication DraftCN db");
@@ -147,14 +155,14 @@
  /** {@inheritDoc} */
  @Override
  public int getFirstKey()
  public int getFirstDraftCN()
  {
    return db.readFirstDraftCN();
  }
  /** {@inheritDoc} */
  @Override
  public int getLastKey()
  public int getLastDraftCN()
  {
    return db.readLastDraftCN();
  }
@@ -309,11 +317,18 @@
      return;
    }
    // FIXME is this correct?
    // This code is not setting the excludedBaseDNs of the RS which means it
    // could take any value set by one of the other methods!
    // In addition, this code is not thread safe, but I suspect it is used in a
    // multi-threaded way.
    // The call to RS.getEligibleCN() is not reliable in any way and could
    // return very different values even if the DB content did not change!!
    ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
    for (int i = 0; i < 100; i++)
    {
      DraftCNDBCursor cursor = db.openDeleteCursor();
      final DraftCNDBCursor cursor = db.openDeleteCursor();
      try
      {
        for (int j = 0; j < 50; j++)
@@ -326,16 +341,15 @@
          }
          // From the draftCNDb change record, get the domain and changeNumber
          ChangeNumber cn = cursor.currentChangeNumber();
          String baseDN = cursor.currentBaseDN();
          if ((baseDNToClear != null)
              && (baseDNToClear.equalsIgnoreCase(baseDN)))
          final ChangeNumber cn = cursor.currentChangeNumber();
          final String baseDN = cursor.currentBaseDN();
          if (baseDNToClear != null && baseDNToClear.equalsIgnoreCase(baseDN))
          {
            cursor.delete();
            continue;
          }
          ReplicationServerDomain domain = replicationServer
          final ReplicationServerDomain domain = replicationServer
              .getReplicationServerDomain(baseDN, false);
          if (domain == null)
          {
@@ -346,14 +360,14 @@
            continue;
          }
          ServerState startState = domain.getStartState();
          final ServerState startState = domain.getStartState();
          // We don't use the returned endState but it's updating CN as reading
          domain.getEligibleState(crossDomainEligibleCN);
          ChangeNumber fcn = startState.getChangeNumber(cn.getServerId());
          final ChangeNumber fcn = startState.getChangeNumber(cn.getServerId());
          int currentKey = cursor.currentKey();
          final int currentDraftCN = cursor.currentKey();
          if (cn.older(fcn))
          {
@@ -391,7 +405,7 @@
            continue;
          }
          firstkey = currentKey;
          firstDraftCN = currentDraftCN;
          cursor.close();
          return;
        }
@@ -465,7 +479,7 @@
  @Override
  public String toString()
  {
    return "draftCNdb:" + " " + firstkey + " " + lastkey;
    return "draftCNdb:" + " " + firstDraftCN + " " + lastDraftCN;
  }
  /**
@@ -482,8 +496,8 @@
  public void clear() throws ChangelogException
  {
    db.clear();
    firstkey = db.readFirstDraftCN();
    lastkey = db.readLastDraftCN();
    firstDraftCN = db.readFirstDraftCN();
    lastDraftCN = db.readLastDraftCN();
  }
  private ReentrantLock lock = new ReentrantLock();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
@@ -111,11 +111,11 @@
      handler.add(sn3, value3, baseDN3, changeNumber3);
      // The ChangeNumber should not get purged
      int firstkey = handler.getFirstKey();
      assertEquals(firstkey, sn1);
      assertEquals(handler.getLastKey(), sn3);
      final int firstDraftCN = handler.getFirstDraftCN();
      assertEquals(firstDraftCN, sn1);
      assertEquals(handler.getLastDraftCN(), sn3);
      DraftCNDBCursor dbc = handler.getReadCursor(firstkey);
      DraftCNDBCursor dbc = handler.getReadCursor(firstDraftCN);
      try
      {
        assertEquals(dbc.currentChangeNumber(), changeNumber1);
@@ -149,8 +149,8 @@
      {
        Thread.sleep(200);
      }
      assertEquals(handler.getFirstKey(), 0);
      assertEquals(handler.getLastKey(), 0);
      assertEquals(handler.getFirstDraftCN(), 0);
      assertEquals(handler.getLastDraftCN(), 0);
    } finally
@@ -239,8 +239,8 @@
      Thread.sleep(500);
      // Checks
      assertEquals(handler.getFirstKey(), sn1);
      assertEquals(handler.getLastKey(), sn3);
      assertEquals(handler.getFirstDraftCN(), sn1);
      assertEquals(handler.getLastDraftCN(), sn3);
      assertEquals(handler.count(), 3, "Db count");
@@ -260,8 +260,8 @@
      handler.clear();
      // Check the db is cleared.
      assertEquals(handler.getFirstKey(), 0);
      assertEquals(handler.getLastKey(), 0);
      assertEquals(handler.getFirstDraftCN(), 0);
      assertEquals(handler.getLastDraftCN(), 0);
      assertEquals(handler.count(), 0);
    } finally
    {