mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
27.20.2013 fda4ada5109628583ee67ccf984ebfcf662486ef
Session.java, ReplicationServer.java:
More javadoc/code cleanup.
Removed useless null checks.
2 files modified
309 ■■■■ changed files
opends/src/server/org/opends/server/replication/protocol/Session.java 29 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 280 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/Session.java
@@ -29,22 +29,17 @@
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;
import javax.net.ssl.SSLSocket;
@@ -58,7 +53,7 @@
/**
 * This class defines a replication session using TLS.
 */
public final class Session extends DirectoryThread
public final class Session extends DirectoryThread implements Closeable
{
  /**
   * The tracer object for the debug logger.
@@ -84,7 +79,7 @@
   */
  private volatile long lastReceiveTime = 0;
  /*
  /**
   * Close and error guarded by stateLock: use a different lock to publish since
   * publishing can block, and we don't want to block while closing failed
   * connections.
@@ -93,25 +88,25 @@
  private volatile boolean closeInitiated = false;
  private Throwable sessionError = null;
  /*
  /**
   * Publish guarded by publishLock: use a full lock here so that we can
   * optionally publish StopMsg during close.
   */
  private final Lock publishLock = new ReentrantLock();
  /*
  /**
   * These do not need synchronization because they are only modified during the
   * initial single threaded handshake.
   */
  private short protocolVersion = ProtocolVersion.getCurrentVersion();
  private boolean isEncrypted = true; // Initially encrypted.
  /*
  /**
   * Use a buffered input stream to avoid too many system calls.
   */
  private BufferedInputStream input;
  /*
  /**
   * Use a buffered output stream in order to combine message length and content
   * into a single TCP packet if possible.
   */
@@ -165,6 +160,7 @@
   * This method is called when the session with the remote must be closed.
   * This object won't be used anymore after this method is called.
   */
  @Override
  public void close()
  {
    Throwable localSessionError;
@@ -564,6 +560,7 @@
   * Run method for the Session.
   * Loops waiting for buffers from the queue and sends them when available.
   */
  @Override
  public void run()
  {
    isRunning.set(true);
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -27,16 +27,11 @@
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.ServerConstants.EOL;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.isLocalAddress;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.File;
import java.io.IOException;
@@ -70,9 +65,6 @@
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import com.sleepycat.je.DatabaseException;
import org.opends.server.types.SearchScope;
/**
 * ReplicationServer Listener. This singleton is the main object of the
@@ -93,10 +85,11 @@
  private Thread listenThread;
  private Thread connectThread;
  /* The list of replication servers configured by the administrator */
  /** The list of replication servers configured by the administrator. */
  private Collection<String> replicationServers;
  /* This table is used to store the list of dn for which we are currently
  /**
   * This table is used to store the list of dn for which we are currently
   * handling servers.
   */
  private final Map<String, ReplicationServerDomain> baseDNs =
@@ -108,52 +101,62 @@
  private int queueSize;
  private String dbDirname = null;
  // The delay (in sec) after which the  changes must
  // be deleted from the persistent storage.
  /**
   * The delay (in sec) after which the changes must be deleted from the
   * persistent storage.
   */
  private long purgeDelay;
  private int replicationPort;
  private boolean stopListen = false;
  private ReplSessionSecurity replSessionSecurity;
  // For the backend associated to this replication server,
  // DN of the config entry of the backend
  /**
   * For the backend associated to this replication server, DN of the config
   * entry of the backend.
   */
  private DN backendConfigEntryDN;
  // ID of the backend
  /** ID of the backend. */
  private static final String backendId = "replicationChanges";
  /*
   * Assured mode properties
   */
  // Timeout (in milliseconds) when waiting for acknowledgments
  /** Timeout (in milliseconds) when waiting for acknowledgments. */
  private long assuredTimeout = 1000;
  // Group id
  private byte groupId = (byte)1;
  /** Group id. */
  private byte groupId = 1;
  // Number of pending changes for a DS, considered as threshold value to put
  // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled
  /**
   * Number of pending changes for a DS, considered as threshold value to put
   * the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled.
   */
  private int degradedStatusThreshold = 5000;
  // Number of milliseconds to wait before sending new monitoring messages.
  // If value is 0, monitoring publisher is disabled
  /**
   * Number of milliseconds to wait before sending new monitoring messages. If
   * value is 0, monitoring publisher is disabled.
   */
  private long monitoringPublisherPeriod = 3000;
  // The handler of the draft change numbers database, the database used to
  // store the relation between a draft change number ('seqnum') and the
  // associated cookie.
  //
  // Guarded by draftCNLock
  //
  /**
   * The handler of the draft change numbers database, the database used to
   * store the relation between a draft change number ('seqnum') and the
   * associated cookie.
   * <p>
   * Guarded by draftCNLock
   */
  private DraftCNDbHandler draftCNDbHandler;
  // The last value generated of the draft change number.
  //
  // Guarded by draftCNLock
  //
  /**
   * The last value generated of the draft change number.
   * <p>
   * Guarded by draftCNLock
   **/
  private int lastGeneratedDraftCN = 0;
  // Used for protecting draft CN related state.
  /** Used for protecting draft CN related state. */
  private final Object draftCNLock = new Object();
  /**
@@ -166,8 +169,10 @@
  private ECLWorkflowElement eclwe;
  private WorkflowImpl externalChangeLogWorkflowImpl = null;
  // This is required for unit testing, so that we can keep track of all the
  // replication servers which are running in the VM.
  /**
   * This is required for unit testing, so that we can keep track of all the
   * replication servers which are running in the VM.
   */
  private static Set<Integer> localPorts = new CopyOnWriteArraySet<Integer>();
  // Monitors for synchronizing domain creation with the connect thread.
@@ -175,8 +180,8 @@
  private final Object connectThreadLock = new Object();
  private long domainTicket = 0L;
  // ServiceIDs excluded for ECL
  private  ArrayList<String> excludedServiceIDs = new ArrayList<String>();
  /** ServiceIDs excluded for ECL. */
  private Collection<String> excludedServiceIDs = new ArrayList<String>();
  /**
   * The weight affected to the replication server.
@@ -233,7 +238,6 @@
    }
    catch (Exception e)
    {
      MessageBuilder mb = new MessageBuilder();
      mb.append(e.getLocalizedMessage());
      mb.append(" ");
@@ -459,7 +463,7 @@
        }
        catch (InterruptedException e)
        {
          // Signalled to shutdown.
          // Signaled to shutdown.
          return;
        }
      }
@@ -502,21 +506,9 @@
    }
    catch (Exception e)
    {
      if (session != null)
      {
        session.close();
      }
      try
      {
        socket.close();
      }
      catch (IOException ignored)
      {
        // Ignore.
      }
      close(session);
      close(socket);
    }
  }
  /**
@@ -990,6 +982,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public ConfigChangeResult applyConfigurationChange(
      ReplicationServerCfg configuration)
  {
@@ -1194,6 +1187,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean isConfigurationChangeAcceptable(
      ReplicationServerCfg configuration, List<Message> unacceptableReasons)
  {
@@ -1335,6 +1329,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processBackupBegin(Backend backend, BackupConfig config)
  {
    // Nothing is needed at the moment
@@ -1343,6 +1338,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processBackupEnd(Backend backend, BackupConfig config,
                               boolean successful)
  {
@@ -1352,6 +1348,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processRestoreBegin(Backend backend, RestoreConfig config)
  {
    if (backend.getBackendID().equals(backendId))
@@ -1361,6 +1358,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processRestoreEnd(Backend backend, RestoreConfig config,
                                boolean successful)
  {
@@ -1371,6 +1369,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processImportBegin(Backend backend, LDIFImportConfig config)
  {
    // Nothing is needed at the moment
@@ -1379,6 +1378,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processImportEnd(Backend backend, LDIFImportConfig config,
                               boolean successful)
  {
@@ -1388,6 +1388,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processExportBegin(Backend backend, LDIFExportConfig config)
  {
    if (debugEnabled())
@@ -1406,6 +1407,7 @@
  /**
   * {@inheritDoc}
   */
  @Override
  public void processExportEnd(Backend backend, LDIFExportConfig config,
                               boolean successful)
  {
@@ -1435,14 +1437,9 @@
   */
  public void clearDb()
  {
    Iterator<ReplicationServerDomain> rcachei = getDomainIterator();
    if (rcachei != null)
    for (ReplicationServerDomain rsd : getReplicationServerDomains())
    {
      while (rcachei.hasNext())
      {
        ReplicationServerDomain rsd = rcachei.next();
        rsd.clearDbs();
      }
      rsd.clearDbs();
    }
    synchronized (draftCNLock)
@@ -1646,7 +1643,7 @@
   * @param excludedServiceIDs the provided list of serviceIDs excluded from
   *                          the computation of eligibleCN.
   */
  public void disableEligibility(ArrayList<String> excludedServiceIDs)
  public void disableEligibility(List<String> excludedServiceIDs)
  {
    this.excludedServiceIDs = excludedServiceIDs;
  }
@@ -1663,29 +1660,24 @@
    // traverse the domains and get the eligible CN from each domain
    // store the oldest one as the cross domain eligible CN
    ChangeNumber eligibleCN = null;
    Iterator<ReplicationServerDomain> rsdi = this.getDomainIterator();
    if (rsdi != null)
    for (ReplicationServerDomain domain : getReplicationServerDomains())
    {
      while (rsdi.hasNext())
      {
        ReplicationServerDomain domain = rsdi.next();
        if ((excludedServiceIDs != null) &&
            excludedServiceIDs.contains(domain.getBaseDn()))
          continue;
      if ((excludedServiceIDs != null) &&
          excludedServiceIDs.contains(domain.getBaseDn()))
        continue;
        ChangeNumber domainEligibleCN = domain.getEligibleCN();
        String dates = "";
        if (domainEligibleCN != null)
      ChangeNumber domainEligibleCN = domain.getEligibleCN();
      String dates = "";
      if (domainEligibleCN != null)
      {
        if ((eligibleCN == null) || (domainEligibleCN.older(eligibleCN)))
        {
          if ((eligibleCN == null) || (domainEligibleCN.older(eligibleCN)))
          {
            eligibleCN = domainEligibleCN;
          }
          dates = new Date(domainEligibleCN.getTime()).toString();
          eligibleCN = domainEligibleCN;
        }
        debugLog += "[dn=" + domain.getBaseDn()
             + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]";
        dates = new Date(domainEligibleCN.getTime()).toString();
      }
      debugLog += "[dn=" + domain.getBaseDn()
           + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]";
    }
    if (eligibleCN==null)
@@ -1821,14 +1813,13 @@
     *     (this diff is done domain by domain)
     */
    int firstDraftCN;
    int lastDraftCN;
    Boolean dbEmpty = false;
    Long newestDate = 0L;
    DraftCNDbHandler draftCNDbH = this.getDraftCNDbHandler();
    // Get the first DraftCN from the DraftCNdb
    firstDraftCN = draftCNDbH.getFirstKey();
    int firstDraftCN = draftCNDbH.getFirstKey();
    Map<String,ServerState> domainsServerStateForLastSeqnum = null;
    ChangeNumber changeNumberForLastSeqnum = null;
    String domainForLastSeqnum = null;
@@ -1860,60 +1851,53 @@
    }
    // Domain by domain
    Iterator<ReplicationServerDomain> rsdi = this.getDomainIterator();
    if (rsdi != null)
    for (ReplicationServerDomain rsd : getReplicationServerDomains())
    {
      while (rsdi.hasNext())
      if (excludedServiceIDs.contains(rsd.getBaseDn()))
        continue;
      // for this domain, have the state in the replchangelog
      // where the last DraftCN update is
      long ec;
      if (domainsServerStateForLastSeqnum == null)
      {
        // process a domain
        ReplicationServerDomain rsd = rsdi.next();
        if (excludedServiceIDs.contains(rsd.getBaseDn()))
          continue;
        // for this domain, have the state in the replchangelog
        // where the last DraftCN update is
        long ec;
        if (domainsServerStateForLastSeqnum == null)
        {
          // Count changes of this domain from the beginning of the changelog
          ChangeNumber trimCN =
              new ChangeNumber(rsd.getLatestDomainTrimDate(), 0,0);
          ec = rsd.getEligibleCount(
                    rsd.getStartState().duplicateOnlyOlderThan(trimCN),
                    crossDomainEligibleCN);
        }
        else
        {
          // There are records in the draftDB (so already returned to clients)
          // BUT
          //  There is nothing related to this domain in the last draft record
          //  (may be this domain was disabled when this record was returned).
          // In that case, are counted the changes from
          //  the date of the most recent change from this last draft record
          if (newestDate == 0L)
          {
            newestDate = changeNumberForLastSeqnum.getTime();
          }
          // And count changes of this domain from the date of the
          // lastseqnum record (that does not refer to this domain)
          ChangeNumber cnx = new ChangeNumber(newestDate,
              changeNumberForLastSeqnum.getSeqnum(), 0);
          ec = rsd.getEligibleCount(cnx, crossDomainEligibleCN);
          if (domainForLastSeqnum.equalsIgnoreCase(rsd.getBaseDn()))
            ec--;
        }
        // cumulates on domains
        lastDraftCN += ec;
        // DraftCN Db is empty and there are eligible updates in the replication
        // changelog then init first DraftCN
        if ((ec>0) && (firstDraftCN==0))
          firstDraftCN = 1;
        // Count changes of this domain from the beginning of the changelog
        ChangeNumber trimCN =
            new ChangeNumber(rsd.getLatestDomainTrimDate(), 0,0);
        ec = rsd.getEligibleCount(
                  rsd.getStartState().duplicateOnlyOlderThan(trimCN),
                  crossDomainEligibleCN);
      }
      else
      {
        // There are records in the draftDB (so already returned to clients)
        // BUT
        //  There is nothing related to this domain in the last draft record
        //  (may be this domain was disabled when this record was returned).
        // In that case, are counted the changes from
        //  the date of the most recent change from this last draft record
        if (newestDate == 0L)
        {
          newestDate = changeNumberForLastSeqnum.getTime();
        }
        // And count changes of this domain from the date of the
        // lastseqnum record (that does not refer to this domain)
        ChangeNumber cnx = new ChangeNumber(newestDate,
            changeNumberForLastSeqnum.getSeqnum(), 0);
        ec = rsd.getEligibleCount(cnx, crossDomainEligibleCN);
        if (domainForLastSeqnum.equalsIgnoreCase(rsd.getBaseDn()))
          ec--;
      }
      // cumulates on domains
      lastDraftCN += ec;
      // DraftCN Db is empty and there are eligible updates in the replication
      // changelog then init first DraftCN
      if ((ec>0) && (firstDraftCN==0))
        firstDraftCN = 1;
    }
    if (dbEmpty)
    {
@@ -1931,30 +1915,22 @@
   * @return the last cookie value.
   */
  public MultiDomainServerState getLastECLCookie(
    ArrayList<String> excludedServiceIDs)
    List<String> excludedServiceIDs)
  {
    disableEligibility(excludedServiceIDs);
    MultiDomainServerState result = new MultiDomainServerState();
    // Initialize start state for  all running domains with empty state
    Iterator<ReplicationServerDomain> rsdk = this.getDomainIterator();
    if (rsdk != null)
    for (ReplicationServerDomain rsd : getReplicationServerDomains())
    {
      while (rsdk.hasNext())
      {
        // process a domain
        ReplicationServerDomain rsd = rsdk.next();
      if ((excludedServiceIDs != null)
          && (excludedServiceIDs.contains(rsd.getBaseDn())))
        continue;
        if ((excludedServiceIDs!=null)
            && (excludedServiceIDs.contains(rsd.getBaseDn())))
          continue;
      if (rsd.getDbServerState().isEmpty())
        continue;
        if (rsd.getDbServerState().isEmpty())
          continue;
        result.update(rsd.getBaseDn(), rsd.getEligibleState(
            getEligibleCN()));
      }
      result.update(rsd.getBaseDn(), rsd.getEligibleState(getEligibleCN()));
    }
    return result;
  }