| | |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.messages.ToolMessages.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.messages.ToolMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.replication.plugin.Historical.ENTRYUIDNAME; |
| | | import static org.opends.server.replication.protocol.OperationContext.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | import static org.opends.server.util.StaticUtils.createEntry; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.OutputStream; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | |
| | | import java.util.SortedMap; |
| | | import java.util.TreeMap; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.zip.Adler32; |
| | | import java.util.zip.CheckedOutputStream; |
| | | import java.util.zip.DataFormatException; |
| | | import java.util.zip.Adler32; |
| | | import java.io.OutputStream; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | |
| | | import org.opends.server.core.ModifyDNOperationBasis; |
| | | import org.opends.server.core.ModifyOperation; |
| | | import org.opends.server.core.ModifyOperationBasis; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.protocols.asn1.ASN1Exception; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPAttribute; |
| | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.protocol.AckMessage; |
| | | import org.opends.server.replication.protocol.AddContext; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DeleteContext; |
| | | import org.opends.server.replication.protocol.DoneMessage; |
| | | import org.opends.server.replication.protocol.EntryMessage; |
| | | import org.opends.server.replication.protocol.ErrorMessage; |
| | | import org.opends.server.replication.protocol.HeartbeatMessage; |
| | | import org.opends.server.replication.protocol.InitializeRequestMessage; |
| | | import org.opends.server.replication.protocol.InitializeTargetMessage; |
| | | import org.opends.server.replication.protocol.ModifyContext; |
| | | import org.opends.server.replication.protocol.ModifyDNMsg; |
| | | import org.opends.server.replication.protocol.ModifyDnContext; |
| | | import org.opends.server.replication.protocol.OperationContext; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.replication.protocol.ReplicationMessage; |
| | | import org.opends.server.replication.protocol.ResetGenerationId; |
| | | import org.opends.server.replication.protocol.RoutableMessage; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.tasks.InitializeTargetTask; |
| | | import org.opends.server.tasks.InitializeTask; |
| | | import org.opends.server.tasks.TaskUtils; |
| | |
| | | * Initializes the import/export counters with the provider value. |
| | | * @param count The value with which to initialize the counters. |
| | | */ |
| | | public void initImportExportCounters(long count) |
| | | public void setCounters(long total, long left) |
| | | throws DirectoryException |
| | | { |
| | | entryCount = count; |
| | | entryLeftCount = count; |
| | | entryCount = total; |
| | | entryLeftCount = left; |
| | | |
| | | if (initializeTask != null) |
| | | { |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public String toString() |
| | | { |
| | | return new String("[ Entry count=" + this.entryCount + |
| | | ", Entry left count=" + this.entryLeftCount + "]"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | if (debugEnabled()) |
| | | if (!(msg instanceof HeartbeatMessage)) |
| | | TRACER.debugInfo("Message received <" + msg + ">"); |
| | | TRACER.debugVerbose("Message received <" + msg + ">"); |
| | | |
| | | if (msg instanceof AckMessage) |
| | | { |
| | |
| | | msg = broker.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | TRACER.debugVerbose( |
| | | " sid:" + this.serverId + |
| | | " base DN:" + this.baseDN + |
| | | " Import EntryBytes received " + msg); |
| | |
| | | // FIXME TBD Treat the case where the error happens while entries |
| | | // are being exported |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugVerbose( |
| | | " abandonImportExport:" + this.serverId + |
| | | " base DN:" + this.baseDN + |
| | | " Error Msg received " + errorMsg); |
| | | |
| | | if (ieContext != null) |
| | | { |
| | | ieContext.exception = new DirectoryException(ResultCode.OTHER, |
| | |
| | | |
| | | if (ieContext.checksumOutput == false) |
| | | { |
| | | // Actually send the entry |
| | | EntryMessage entryMessage = new EntryMessage( |
| | | serverId, ieContext.exportTarget, lDIFEntry.getBytes()); |
| | | broker.publish(entryMessage); |
| | |
| | | if (initTask != null) |
| | | { |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.initImportExportCounters(entryCount); |
| | | } |
| | | ieContext.setCounters(entryCount, entryCount); |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMessage initializeMessage = new InitializeTargetMessage( |
| | |
| | | |
| | | ieContext.importSource = initializeMessage.getsenderID(); |
| | | ieContext.entryLeftCount = initializeMessage.getEntryCount(); |
| | | ieContext.initImportExportCounters(initializeMessage.getEntryCount()); |
| | | ieContext.setCounters(initializeMessage.getEntryCount(), |
| | | initializeMessage.getEntryCount()); |
| | | |
| | | preBackendImport(backend); |
| | | |
| | |
| | | // Process import |
| | | backend.importLDIF(importConfig); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("The import has ended successfully on " + |
| | | this.baseDN); |
| | | stateSavingDisabled = false; |
| | | } |
| | | } |
| | |
| | | |
| | | // Re-enable backend |
| | | closeBackendImport(backend); |
| | | |
| | | backend = retrievesBackend(baseDN); |
| | | } |
| | | |
| | | // Update the task that initiated the import |