mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.13.2007 a8a50d12008ef3b1471e4967125d36888c5cbdee
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -25,21 +25,19 @@
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ToolMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.server.protocols.asn1.ASN1OctetString;
import static org.opends.server.util.ServerConstants.*;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
@@ -52,11 +50,12 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
import java.util.zip.Adler32;
import java.io.OutputStream;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
@@ -75,7 +74,9 @@
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.ModifyOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
@@ -84,7 +85,25 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.AddContext;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteContext;
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.HeartbeatMessage;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.InitializeTargetMessage;
import org.opends.server.replication.protocol.ModifyContext;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
@@ -281,11 +300,11 @@
     * Initializes the import/export counters with the provider value.
     * @param count The value with which to initialize the counters.
     */
    public void initImportExportCounters(long count)
    public void setCounters(long total, long left)
      throws DirectoryException
    {
      entryCount = count;
      entryLeftCount = count;
      entryCount = total;
      entryLeftCount = left;
      if (initializeTask != null)
      {
@@ -323,6 +342,15 @@
        }
      }
    }
    /**
     * {@inheritDoc}
     */
    public String toString()
    {
      return new String("[ Entry count=" + this.entryCount +
                        ", Entry left count=" + this.entryLeftCount + "]");
    }
  }
  /**
@@ -815,7 +843,7 @@
            if (debugEnabled())
              if (!(msg instanceof HeartbeatMessage))
                TRACER.debugInfo("Message received <" + msg + ">");
                TRACER.debugVerbose("Message received <" + msg + ">");
            if (msg instanceof AckMessage)
            {
@@ -2526,7 +2554,7 @@
        msg = broker.receive();
        if (debugEnabled())
          TRACER.debugInfo(
          TRACER.debugVerbose(
              " sid:" + this.serverId +
              " base DN:" + this.baseDN +
              " Import EntryBytes received " + msg);
@@ -2586,6 +2614,12 @@
    // FIXME TBD Treat the case where the error happens while entries
    // are being exported
    if (debugEnabled())
      TRACER.debugVerbose(
          " abandonImportExport:" + this.serverId +
          " base DN:" + this.baseDN +
          " Error Msg received " + errorMsg);
    if (ieContext != null)
    {
      ieContext.exception = new DirectoryException(ResultCode.OTHER,
@@ -2841,7 +2875,6 @@
    if (ieContext.checksumOutput == false)
    {
      // Actually send the entry
      EntryMessage entryMessage = new EntryMessage(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
      broker.publish(entryMessage);
@@ -3037,8 +3070,8 @@
      if (initTask != null)
      {
        ieContext.initializeTask = initTask;
        ieContext.initImportExportCounters(entryCount);
      }
      ieContext.setCounters(entryCount, entryCount);
      // Send start message to the peer
      InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
@@ -3132,7 +3165,8 @@
        ieContext.importSource = initializeMessage.getsenderID();
        ieContext.entryLeftCount = initializeMessage.getEntryCount();
        ieContext.initImportExportCounters(initializeMessage.getEntryCount());
        ieContext.setCounters(initializeMessage.getEntryCount(),
            initializeMessage.getEntryCount());
        preBackendImport(backend);
@@ -3151,9 +3185,6 @@
        // Process import
        backend.importLDIF(importConfig);
        if (debugEnabled())
          TRACER.debugInfo("The import has ended successfully on " +
              this.baseDN);
        stateSavingDisabled = false;
      }
    }
@@ -3174,6 +3205,8 @@
        // Re-enable backend
        closeBackendImport(backend);
        backend = retrievesBackend(baseDN);
      }
      // Update the task that initiated the import