mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.51.2013 403f2977dffbdb72660538effbfdd6ea9473af3a
Found problems in the replication ECL code.
Also made the code more explicit.


DraftCNBDHandler.java, ChangelogDB.java:
Renamed firstkey and lastkey to firstDraftCN and lastDraftCN + renamed getters too.
In clear() added very a important comment about unreliable method call + renamed currentkey local variable to currentDraftCN + used final keyword.

ECLServerHandler.java:
Renamed all the *Key local variables to *DraftCN where possible.

ReplicationServer.java:
Renamed field replicationServers to replicationServerUrls.
Changed excludedBaseDNs from Collection to Set.
Did some renaming here and there.
Extracted method contains().
In getECLDraftCNLimits(), used primitive variables. There was some code that worked by chance more than by design: see in the old code "if (newestDate == 0L)"

DraftCNDbHandlerTest.java:
Renamed firstkey to firstDraftCN.
5 files modified
193 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 106 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java 52 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java 20 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -565,7 +565,7 @@
  private String findCookie(final int startDraftCN) throws ChangelogException,
      DirectoryException
  {
    ChangelogDB changelogDB = replicationServer.getChangelogDB();
    final ChangelogDB changelogDB = replicationServer.getChangelogDB();
    if (startDraftCN <= 1)
    {
@@ -580,9 +580,10 @@
        return null;
      }
      final int firstKey = changelogDB.getFirstKey();
      String crossDomainStartState = changelogDB.getPreviousCookie(firstKey);
      changelogDBIter = changelogDB.generateIterator(firstKey);
      final int firstDraftCN = changelogDB.getFirstDraftCN();
      final String crossDomainStartState =
          changelogDB.getPreviousCookie(firstDraftCN);
      changelogDBIter = changelogDB.generateIterator(firstDraftCN);
      return crossDomainStartState;
    }
@@ -633,7 +634,7 @@
        return null;
      }
      final int lastKey = changelogDB.getLastKey();
      final int lastKey = changelogDB.getLastDraftCN();
      crossDomainStartState = changelogDB.getPreviousCookie(lastKey);
      changelogDBIter = changelogDB.generateIterator(lastKey);
      return crossDomainStartState;
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -89,8 +89,8 @@
  private Thread listenThread;
  private Thread connectThread;
  /** The list of replication servers configured by the administrator. */
  private Collection<String> replicationServers;
  /** The list of replication server URLs configured by the administrator. */
  private Collection<String> replicationServerUrls;
  /**
   * This table is used to store the list of dn for which we are currently
@@ -219,9 +219,9 @@
  {
    replicationPort = configuration.getReplicationPort();
    serverId = configuration.getReplicationServerId();
    replicationServers = configuration.getReplicationServer();
    if (replicationServers == null)
      replicationServers = new ArrayList<String>();
    replicationServerUrls = configuration.getReplicationServer();
    if (replicationServerUrls == null)
      replicationServerUrls = new ArrayList<String>();
    queueSize = configuration.getQueueSize();
    purgeDelay = configuration.getReplicationPurgeDelay();
    dbDirname = configuration.getReplicationDBDirectory();
@@ -259,8 +259,8 @@
    configuration.addChangeListener(this);
    try
    {
      backendConfigEntryDN = DN.decode(
      "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
      backendConfigEntryDN =
         DN.decode("ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
    } catch (Exception e) { /* do nothing */ }
    // Creates the backend associated to this ReplicationServer
@@ -404,14 +404,14 @@
          /*
           * check that all replication server in the config are in the
           * connected Set. If not create the connection
           * connected Set. If not, create the connection
           */
          for (String aServerURL : replicationServers)
          for (String rsURL : replicationServerUrls)
          {
            final int separator = aServerURL.lastIndexOf(':');
            final String portString = aServerURL.substring(separator + 1);
            final int port = Integer.parseInt(portString);
            final String hostname = aServerURL.substring(0, separator);
            final int separator = rsURL.lastIndexOf(':');
            final String hostname = rsURL.substring(0, separator);
            final int port = Integer.parseInt(rsURL.substring(separator + 1));
            final InetAddress inetAddress;
            try
            {
@@ -436,13 +436,13 @@
            }
            // Don't connect to a server if it is already connected.
            final String normalizedServerURL = normalizeServerURL(aServerURL);
            final String normalizedServerURL = normalizeServerURL(rsURL);
            if (connectedRSUrls.contains(normalizedServerURL))
            {
              continue;
            }
            connect(aServerURL, domain.getBaseDn());
            connect(rsURL, domain.getBaseDn());
          }
        }
@@ -538,10 +538,7 @@
      listenSocket = new ServerSocket();
      listenSocket.bind(new InetSocketAddress(replicationPort));
      /*
       * creates working threads
       * We must first connect, then start to listen.
       */
      // creates working threads: we must first connect, then start to listen.
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates connect thread");
@@ -559,7 +556,7 @@
      // can know me and really enableECL.
      if (WorkflowImpl.getWorkflow(externalChangeLogWorkflowID) != null)
      {
        // Already done . Nothing to do
        // Already done. Nothing to do
        return;
      }
      eclwe = new ECLWorkflowElement(this);
@@ -567,7 +564,6 @@
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " successfully initialized");
    } catch (ChangelogException e)
    {
      Message message = ERR_COULD_NOT_READ_DB.get(
@@ -927,7 +923,7 @@
        try
        {
          lastGeneratedDraftCN = changelogDB.getLastKey();
          lastGeneratedDraftCN = changelogDB.getLastDraftCN();
        }
        catch (Exception ignored)
        {
@@ -995,9 +991,9 @@
    disconnectRemovedReplicationServers(configuration.getReplicationServer());
    replicationServers = configuration.getReplicationServer();
    if (replicationServers == null)
      replicationServers = new ArrayList<String>();
    replicationServerUrls = configuration.getReplicationServer();
    if (replicationServerUrls == null)
      replicationServerUrls = new ArrayList<String>();
    queueSize = configuration.getQueueSize();
    long newPurgeDelay = configuration.getReplicationPurgeDelay();
@@ -1088,8 +1084,8 @@
      broadcastConfigChange();
    }
    if ((configuration.getReplicationDBDirectory() != null) &&
        (!dbDirname.equals(configuration.getReplicationDBDirectory())))
    final String newDir = configuration.getReplicationDBDirectory();
    if (newDir != null && !dbDirname.equals(newDir))
    {
      return new ConfigChangeResult(ResultCode.SUCCESS, true);
    }
@@ -1109,25 +1105,24 @@
     * First try the set of configured replication servers to see if one of them
     * is this replication server (this should always be the case).
     */
    for (String rs : replicationServers)
    for (String rsUrl : replicationServerUrls)
    {
      /*
       * No need validate the string format because the admin framework has
       * already done it.
       */
      final int index = rs.lastIndexOf(":");
      final String hostname = rs.substring(0, index);
      final int port = Integer.parseInt(rs.substring(index + 1));
      final int index = rsUrl.lastIndexOf(":");
      final String hostname = rsUrl.substring(0, index);
      final int port = Integer.parseInt(rsUrl.substring(index + 1));
      if (port == replicationPort && isLocalAddress(hostname))
      {
        serverURL = rs;
        serverURL = rsUrl;
        return;
      }
    }
    /*
     * Fall-back to the machine hostname.
     */
    // Fall-back to the machine hostname.
    serverURL = InetAddress.getLocalHost().getHostName() + ":"
        + replicationPort;
  }
@@ -1249,8 +1244,7 @@
  public void remove()
  {
    if (debugEnabled())
      TRACER.debugInfo("RS " +getMonitorInstanceName()+
          " starts removing");
      TRACER.debugInfo("RS " + getMonitorInstanceName() + " starts removing");
    shutdown();
    removeBackend();
@@ -1471,23 +1465,20 @@
  {
    Collection<String> serversToDisconnect = new ArrayList<String>();
    for (String server: replicationServers)
    for (String rsUrl : replicationServerUrls)
    {
      if (!newReplServers.contains(server))
      if (!newReplServers.contains(rsUrl))
      {
        try
        {
          // translate the server name into IP address
          // and keep the port number
          String[] host = server.split(":");
          // translate the server name into IP address and keep the port number
          String[] host = rsUrl.split(":");
          serversToDisconnect.add(
              (InetAddress.getByName(host[0])).getHostAddress()
              + ":" + host[1]);
              InetAddress.getByName(host[0]).getHostAddress() + ":" + host[1]);
        }
        catch (IOException e)
        {
          Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(server);
          logError(message);
          logError(ERR_COULD_NOT_SOLVE_HOSTNAME.get(rsUrl));
        }
      }
    }
@@ -1686,7 +1677,7 @@
    {
      if (changelogDB != null)
      {
        return changelogDB.getFirstKey();
        return changelogDB.getFirstDraftCN();
      }
      return 0;
    }
@@ -1702,7 +1693,7 @@
    {
      if (changelogDB != null)
      {
        return changelogDB.getLastKey();
        return changelogDB.getLastDraftCN();
      }
      return 0;
    }
@@ -1754,10 +1745,10 @@
    int lastDraftCN;
    boolean dbEmpty = false;
    long newestDate = 0L;
    ChangelogDB changelogDB = getChangelogDB();
    long newestDate = 0;
    final ChangelogDB changelogDB = getChangelogDB();
    int firstDraftCN = changelogDB.getFirstKey();
    int firstDraftCN = changelogDB.getFirstDraftCN();
    Map<String,ServerState> domainsServerStateForLastSeqnum = null;
    ChangeNumber changeNumberForLastSeqnum = null;
    String domainForLastSeqnum = null;
@@ -1769,7 +1760,7 @@
    }
    else
    {
      lastDraftCN = changelogDB.getLastKey();
      lastDraftCN = changelogDB.getLastDraftCN();
      // Get the generalized state associated with the current last DraftCN
      // and initializes from it the startStates table
@@ -1813,7 +1804,7 @@
        //  (may be this domain was disabled when this record was returned).
        // In that case, are counted the changes from
        //  the date of the most recent change from this last draft record
        if (newestDate == 0L)
        if (newestDate == 0)
        {
          newestDate = changeNumberForLastSeqnum.getTime();
        }
@@ -1900,7 +1891,6 @@
  }
  private String normalizeServerURL(final String url)
  {
    final int separator = url.lastIndexOf(':');
@@ -1909,16 +1899,12 @@
    try
    {
      final InetAddress inetAddress = InetAddress.getByName(hostname);
      if (isLocalAddress(inetAddress))
      {
        // It doesn't matter whether we use an IP or hostname here.
        // It does not matter whether we use an IP or hostname here.
        return InetAddress.getLocalHost().getHostAddress() + ":" + portString;
      }
      else
      {
        return inetAddress.getHostAddress() + ":" + portString;
      }
      return inetAddress.getHostAddress() + ":" + portString;
    }
    catch (UnknownHostException e)
    {
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -70,14 +70,14 @@
   *
   * @return Returns the first draftCN in the DB.
   */
  int getFirstKey();
  int getFirstDraftCN();
  /**
   * Get the lastChange.
   *
   * @return Returns the last draftCN in the DB
   */
  int getLastKey();
  int getLastDraftCN();
  /**
   * Add an update to the list of messages that must be saved to the db managed
opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -77,8 +77,16 @@
  private static int NO_KEY = 0;
  private DraftCNDB db;
  private int firstkey = NO_KEY;
  private int lastkey = NO_KEY;
  /**
   * FIXME Is this field that useful? {@link #getFirstDraftCN()} does not even
   * use it!
   */
  private int firstDraftCN = NO_KEY;
  /**
   * FIXME Is this field that useful? {@link #getLastDraftCN()} does not even
   * use it! It is not even updated.
   */
  private int lastDraftCN = NO_KEY;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private boolean shutdown = false;
  private boolean trimDone = false;
@@ -117,8 +125,8 @@
    // DB initialization
    db = new DraftCNDB(dbenv);
    firstkey = db.readFirstDraftCN();
    lastkey = db.readLastDraftCN();
    firstDraftCN = db.readFirstDraftCN();
    lastDraftCN = db.readLastDraftCN();
    // Trimming thread
    thread = new DirectoryThread(this, "Replication DraftCN db");
@@ -147,14 +155,14 @@
  /** {@inheritDoc} */
  @Override
  public int getFirstKey()
  public int getFirstDraftCN()
  {
    return db.readFirstDraftCN();
  }
  /** {@inheritDoc} */
  @Override
  public int getLastKey()
  public int getLastDraftCN()
  {
    return db.readLastDraftCN();
  }
@@ -309,11 +317,18 @@
      return;
    }
    // FIXME is this correct?
    // This code is not setting the excludedBaseDNs of the RS which means it
    // could take any value set by one of the other methods!
    // In addition, this code is not thread safe, but I suspect it is used in a
    // multi-threaded way.
    // The call to RS.getEligibleCN() is not reliable in any way and could
    // return very different values even if the DB content did not change!!
    ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
    for (int i = 0; i < 100; i++)
    {
      DraftCNDBCursor cursor = db.openDeleteCursor();
      final DraftCNDBCursor cursor = db.openDeleteCursor();
      try
      {
        for (int j = 0; j < 50; j++)
@@ -326,16 +341,15 @@
          }
          // From the draftCNDb change record, get the domain and changeNumber
          ChangeNumber cn = cursor.currentChangeNumber();
          String baseDN = cursor.currentBaseDN();
          if ((baseDNToClear != null)
              && (baseDNToClear.equalsIgnoreCase(baseDN)))
          final ChangeNumber cn = cursor.currentChangeNumber();
          final String baseDN = cursor.currentBaseDN();
          if (baseDNToClear != null && baseDNToClear.equalsIgnoreCase(baseDN))
          {
            cursor.delete();
            continue;
          }
          ReplicationServerDomain domain = replicationServer
          final ReplicationServerDomain domain = replicationServer
              .getReplicationServerDomain(baseDN, false);
          if (domain == null)
          {
@@ -346,14 +360,14 @@
            continue;
          }
          ServerState startState = domain.getStartState();
          final ServerState startState = domain.getStartState();
          // We don't use the returned endState but it's updating CN as reading
          domain.getEligibleState(crossDomainEligibleCN);
          ChangeNumber fcn = startState.getChangeNumber(cn.getServerId());
          final ChangeNumber fcn = startState.getChangeNumber(cn.getServerId());
          int currentKey = cursor.currentKey();
          final int currentDraftCN = cursor.currentKey();
          if (cn.older(fcn))
          {
@@ -391,7 +405,7 @@
            continue;
          }
          firstkey = currentKey;
          firstDraftCN = currentDraftCN;
          cursor.close();
          return;
        }
@@ -465,7 +479,7 @@
  @Override
  public String toString()
  {
    return "draftCNdb:" + " " + firstkey + " " + lastkey;
    return "draftCNdb:" + " " + firstDraftCN + " " + lastDraftCN;
  }
  /**
@@ -482,8 +496,8 @@
  public void clear() throws ChangelogException
  {
    db.clear();
    firstkey = db.readFirstDraftCN();
    lastkey = db.readLastDraftCN();
    firstDraftCN = db.readFirstDraftCN();
    lastDraftCN = db.readLastDraftCN();
  }
  private ReentrantLock lock = new ReentrantLock();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandlerTest.java
@@ -111,11 +111,11 @@
      handler.add(sn3, value3, baseDN3, changeNumber3);
      // The ChangeNumber should not get purged
      int firstkey = handler.getFirstKey();
      assertEquals(firstkey, sn1);
      assertEquals(handler.getLastKey(), sn3);
      final int firstDraftCN = handler.getFirstDraftCN();
      assertEquals(firstDraftCN, sn1);
      assertEquals(handler.getLastDraftCN(), sn3);
      DraftCNDBCursor dbc = handler.getReadCursor(firstkey);
      DraftCNDBCursor dbc = handler.getReadCursor(firstDraftCN);
      try
      {
        assertEquals(dbc.currentChangeNumber(), changeNumber1);
@@ -149,8 +149,8 @@
      {
        Thread.sleep(200);
      }
      assertEquals(handler.getFirstKey(), 0);
      assertEquals(handler.getLastKey(), 0);
      assertEquals(handler.getFirstDraftCN(), 0);
      assertEquals(handler.getLastDraftCN(), 0);
    } finally
@@ -239,8 +239,8 @@
      Thread.sleep(500);
      // Checks
      assertEquals(handler.getFirstKey(), sn1);
      assertEquals(handler.getLastKey(), sn3);
      assertEquals(handler.getFirstDraftCN(), sn1);
      assertEquals(handler.getLastDraftCN(), sn3);
      assertEquals(handler.count(), 3, "Db count");
@@ -260,8 +260,8 @@
      handler.clear();
      // Check the db is cleared.
      assertEquals(handler.getFirstKey(), 0);
      assertEquals(handler.getLastKey(), 0);
      assertEquals(handler.getFirstDraftCN(), 0);
      assertEquals(handler.getLastDraftCN(), 0);
      assertEquals(handler.count(), 0);
    } finally
    {