From 334014cfe2047b73bb9770035d2ea4e5e089cfdb Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 04 Jun 2014 10:19:55 +0000
Subject: [PATCH] OPENDJ-1483 : On Windows, problem with changelog's file rotation when running    replication topology with file-based changelog OPENDJ-1487 : File based changelog : cursors opened when clearing the log

---
 opends/src/server/org/opends/server/replication/server/changelog/file/Log.java |  126 +++++++++++++++++++++++++++++++----------
 1 files changed, 94 insertions(+), 32 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 350b70e..20522bd 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -663,13 +663,9 @@
       }
       if (!openCursors.isEmpty())
       {
-        // TODO: throwing an exception makes the replication/totalupdate.txt robot functional test fail because
-        // there is one open cursor when clearing.
-        // Switch to logging until this issue is solved
-        // throw new ChangelogException(Message.raw("Can't clean log '%s' because there are %d cursor(s) opened on it",
-        //    logPath.getPath(), openCursors.size()));
-        ErrorLogger.logError(
-            ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG.get(logPath.getPath(), openCursors.size()));
+        // Allow opened cursors at this point, but turn them into empty cursors.
+        // This behavior is needed by the change number indexer thread.
+        switchCursorsOpenedIntoEmptyCursors();
       }
 
       // delete all log files
@@ -747,9 +743,18 @@
     return logFiles.firstEntry().getValue();
   }
 
-  /** Rotate the head log file to a read-only log file, and open a new empty head log file to write in. */
+  /**
+   * Rotate the head log file to a read-only log file, and open a new empty head
+   * log file to write in.
+   * <p>
+   * All cursors opened on this log are temporarily disabled (closing underlying resources)
+   * and then re-open with their previous state.
+   */
   private void rotateHeadLogFile() throws ChangelogException
   {
+    // Temporarily disable cursors opened on head, saving their state
+    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
+
     final LogFile<K, V> headLogFile = getHeadLogFile();
     final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
     headLogFile.close();
@@ -757,7 +762,9 @@
 
     openHeadLogFile();
     openReadOnlyLogFile(readOnlyLogFile);
-    updateCursorsOpenedOnHead();
+
+    // Re-enable cursors previously opened on head, with the saved state
+    updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
   }
 
   private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
@@ -816,31 +823,55 @@
        + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
   }
 
-  /** Update the open cursors after a rotation of the head log file. */
-  private void updateCursorsOpenedOnHead() throws ChangelogException
+  /** Update the cursors that were pointing to head after a rotation of the head log file. */
+  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
+      throws ChangelogException
   {
-    for (LogCursor<K, V> cursor : openCursors)
+    for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
     {
-      final CursorState<K, V> state = cursor.getState();
+      final CursorState<K, V> cursorState = pair.getSecond();
+
       // Need to update the cursor only if it is pointing to the head log file
-      if (isHeadLogFile(state.logFile))
+      if (isHeadLogFile(cursorState.logFile))
       {
-        updateOpenCursor(cursor, state);
+        final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
+        final LogFile<K, V> logFile = findLogFileFor(previousKey);
+        final LogCursor<K, V> cursor = pair.getFirst();
+        cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
       }
     }
   }
 
-  /**
-   * Update the provided open cursor with the provided state.
-   * <p>
-   * The cursor must report the previous state on the head log file to the same
-   * state (position in file, current record) in the read-only log file just created.
-   */
-  private void updateOpenCursor(final LogCursor<K,V> cursor, final CursorState<K, V> state) throws ChangelogException
+  private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
   {
-    final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
-    final LogFile<K, V> logFile = findLogFileFor(previousKey);
-    cursor.reinitializeTo(new CursorState<K, V>(logFile, state.filePosition, state.record));
+    for (LogCursor<K, V> cursor : openCursors)
+    {
+      cursor.actAsEmptyCursor();
+    }
+    openCursors.clear();
+  }
+
+  /**
+   * Disable the cursors opened on the head log file log, by closing their underlying cursor.
+   * Returns the state of each cursor just before the close operation.
+   *
+   * @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
+   * @throws ChangelogException
+   *           If an error occurs.
+   */
+  private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
+  {
+    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
+        new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
+    for (LogCursor<K, V> cursor : openCursors)
+    {
+      if (isHeadLogFile(cursor.currentLogFile))
+      {
+        openCursorsStates.add(Pair.of(cursor, cursor.getState()));
+        cursor.closeUnderlyingCursor();
+      }
+    }
+    return openCursorsStates;
   }
 
   private void openHeadLogFile() throws ChangelogException
@@ -930,6 +961,9 @@
    * {@code cursor.next()} method.
    * <p>
    * The cursor uses the log shared lock to ensure reads are not done during a rotation.
+   * <p>
+   * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
+   * method.
    */
   private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
   {
@@ -937,6 +971,7 @@
 
     private LogFile<K, V> currentLogFile;
     private LogFileCursor<K, V> currentCursor;
+    private boolean actAsEmptyCursor;
 
     /**
      * Creates a cursor on the provided log.
@@ -949,19 +984,24 @@
     private LogCursor(final Log<K, V> log) throws ChangelogException
     {
       this.log = log;
+      this.actAsEmptyCursor = false;
     }
 
     /** {@inheritDoc} */
     @Override
     public Record<K, V> getRecord()
     {
-      return currentCursor.getRecord();
+      return currentCursor != null ? currentCursor.getRecord() : null;
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean next() throws ChangelogException
     {
+      if (actAsEmptyCursor)
+      {
+        return false;
+      }
       log.sharedLock.lock();
       try
       {
@@ -1004,6 +1044,10 @@
     @Override
     public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException
     {
+      if (actAsEmptyCursor)
+      {
+        return false;
+      }
       log.sharedLock.lock();
       try
       {
@@ -1033,15 +1077,31 @@
     /** Returns the state of this cursor. */
     private CursorState<K, V> getState() throws ChangelogException
     {
-      return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+      return !actAsEmptyCursor ?
+          new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
+    }
+
+    private void closeUnderlyingCursor()
+    {
+      StaticUtils.close(currentCursor);
     }
 
     /** Reinitialize this cursor to the provided state. */
     private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
     {
-      StaticUtils.close(currentCursor);
-      currentLogFile = cursorState.logFile;
-      currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
+      if (!actAsEmptyCursor)
+      {
+        currentLogFile = cursorState.logFile;
+        currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
+      }
+    }
+
+    /** Turn this cursor into an empty cursor, with no actual resource used. */
+    private void actAsEmptyCursor()
+    {
+      currentLogFile = null;
+      currentCursor = null;
+      actAsEmptyCursor = true;
     }
 
     /** Switch the cursor to the provided log file. */
@@ -1055,8 +1115,10 @@
     /** {@inheritDoc} */
     public String toString()
     {
-      return String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
-          log.logPath, currentLogFile.getFile().getName(), currentCursor);
+      return actAsEmptyCursor ?
+          String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
+          String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
+              log.logPath, currentLogFile.getFile().getName(), currentCursor);
     }
   }
 

--
Gitblit v1.10.0