From 13819a2e81db0422a7c8c186f838c7b243173170 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 03 Sep 2014 06:30:37 +0000
Subject: [PATCH] OPENDJ-1205 (CR-4428) Remove network layer from External ChangeLog implementation

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  154 ++++++++++----------------------------------------
 1 files changed, 32 insertions(+), 122 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 6dd0fa4..67195ef 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -27,10 +27,16 @@
 package org.opends.server.replication.server;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
@@ -43,12 +49,33 @@
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.*;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
+import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
+import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
-import org.opends.server.types.*;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.HostPort;
+import org.opends.server.types.ResultCode;
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
@@ -114,9 +141,6 @@
   private final Map<Integer, ReplicationServerHandler> connectedRSs =
     new ConcurrentHashMap<Integer, ReplicationServerHandler>();
 
-  private final Queue<MessageHandler> otherHandlers =
-    new ConcurrentLinkedQueue<MessageHandler>();
-
   private final ReplicationDomainDB domainDB;
   /** The ReplicationServer that created the current instance. */
   private final ReplicationServer localReplicationServer;
@@ -368,11 +392,6 @@
         addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
       }
     }
-
-    // Push the message to the other subscribing handlers
-    for (MessageHandler mHandler : otherHandlers) {
-      mHandler.add(updateMsg);
-    }
   }
 
   private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
@@ -1097,10 +1116,6 @@
         {
           unregisterServerHandler(sHandler, shutdown, true);
         }
-        else if (otherHandlers.contains(sHandler))
-        {
-          unregisterOtherHandler(sHandler);
-        }
       }
       catch(Exception e)
       {
@@ -1117,12 +1132,6 @@
     }
   }
 
-  private void unregisterOtherHandler(MessageHandler mHandler)
-  {
-    unRegisterHandler(mHandler);
-    mHandler.shutdown();
-  }
-
   private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
       boolean isDirectoryServer)
   {
@@ -1149,60 +1158,6 @@
   }
 
   /**
-   * Stop the handler.
-   * @param mHandler The handler to stop.
-   */
-  public void stopServer(MessageHandler mHandler)
-  {
-    // TODO JNR merge with stopServer(ServerHandler, boolean)
-    if (debugEnabled())
-    {
-      debug("stopServer() on the message handler " + mHandler);
-    }
-    /*
-     * We must prevent deadlock on replication server domain lock, when for
-     * instance this code is called from dying ServerReader but also dying
-     * ServerWriter at the same time, or from a thread that wants to shut down
-     * the handler. So use a thread safe flag to know if the job must be done
-     * or not (is already being processed or not).
-     */
-    if (!mHandler.engageShutdown())
-      // Only do this once (prevent other thread to enter here again)
-    {
-      try
-      {
-        // Acquire lock on domain (see more details in comment of start() method
-        // of ServerHandler)
-        lock();
-      }
-      catch (InterruptedException ex)
-      {
-        // We can't deal with this here, so re-interrupt thread so that it is
-        // caught during subsequent IO.
-        Thread.currentThread().interrupt();
-        return;
-      }
-
-      try
-      {
-        if (otherHandlers.contains(mHandler))
-        {
-          unregisterOtherHandler(mHandler);
-        }
-      }
-      catch(Exception e)
-      {
-        logError(Message.raw(Category.SYNC, Severity.NOTICE,
-            stackTraceToSingleLineString(e)));
-      }
-      finally
-      {
-        release();
-      }
-    }
-  }
-
-  /**
    * Unregister this handler from the list of handlers registered to this
    * domain.
    * @param sHandler the provided handler to unregister.
@@ -2427,39 +2382,6 @@
     return attributes;
   }
 
-  /**
-   * Register in the domain an handler that subscribes to changes.
-   * @param mHandler the provided subscribing handler.
-   */
-  public void registerHandler(MessageHandler mHandler)
-  {
-    this.otherHandlers.add(mHandler);
-  }
-
-  /**
-   * Unregister from the domain an handler.
-   * @param mHandler the provided unsubscribing handler.
-   * @return Whether this handler has been unregistered with success.
-   */
-  public boolean unRegisterHandler(MessageHandler mHandler)
-  {
-    return this.otherHandlers.remove(mHandler);
-  }
-
-  /**
-   * Returns the oldest known state for the domain, made of the oldest CSN
-   * stored for each serverId.
-   * <p>
-   * Note: Because the replication changelogDB trimming always keep one change
-   * whatever its date, the CSN contained in the returned state can be very old.
-   *
-   * @return the start state of the domain.
-   */
-  public ServerState getOldestState()
-  {
-    return domainDB.getDomainOldestCSNs(baseDN);
-  }
-
   private void sendTopologyMsg(String type, ServerHandler handler,
       TopologyMsg msg)
   {
@@ -2524,18 +2446,6 @@
     }
   }
 
-
-
-  /**
-   * Get the latest (more recent) trim date of the changelog dbs associated
-   * to this domain.
-   * @return The latest trim date.
-   */
-  public long getLatestDomainTrimDate()
-  {
-    return domainDB.getDomainLatestTrimDate(baseDN);
-  }
-
   /**
    * Return the monitor instance name of the ReplicationServer that created the
    * current instance.

--
Gitblit v1.10.0