| File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java |
| | |
| | | * Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.messages.ToolMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | |
| | | import static org.opends.server.util.StaticUtils.createEntry; |
| | | import static org.opends.server.util.StaticUtils.getFileForPath; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | |
| | | import org.opends.server.replication.protocol.LDAPUpdateMsg; |
| | | |
| | | import org.opends.server.replication.service.ReplicationMonitor; |
| | | |
| | | import java.util.Collection; |
| | | |
| | | import org.opends.server.types.Attributes; |
| | | |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.net.SocketTimeoutException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.HashSet; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.SortedMap; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeMap; |
| | | import java.util.Set; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.zip.CheckedOutputStream; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.api.SynchronizationProvider; |
| | | import org.opends.server.backends.jeb.BackendImpl; |
| | | import org.opends.server.backends.task.Task; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.AddOperation; |
| | | import org.opends.server.core.DeleteOperation; |
| | |
| | | import org.opends.server.protocols.asn1.ASN1Exception; |
| | | import org.opends.server.protocols.asn1.ASN1OctetString; |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.protocols.internal.InternalSearchListener; |
| | | import org.opends.server.protocols.internal.InternalSearchOperation; |
| | | import org.opends.server.protocols.ldap.LDAPAttribute; |
| | | import org.opends.server.protocols.ldap.LDAPFilter; |
| | | import org.opends.server.protocols.ldap.LDAPModification; |
| | | import org.opends.server.replication.service.ReplicationDomain; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ChangeNumberGenerator; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachine; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.AddContext; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.DeleteContext; |
| | | import org.opends.server.replication.protocol.DoneMsg; |
| | | import org.opends.server.replication.protocol.EntryMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.HeartbeatMsg; |
| | | import org.opends.server.replication.protocol.InitializeRequestMsg; |
| | | import org.opends.server.replication.protocol.InitializeTargetMsg; |
| | | import org.opends.server.replication.protocol.ModifyContext; |
| | | import org.opends.server.replication.protocol.ModifyDNMsg; |
| | | import org.opends.server.replication.protocol.ModifyDnContext; |
| | | import org.opends.server.replication.protocol.ModifyMsg; |
| | | import org.opends.server.replication.protocol.OperationContext; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.tasks.InitializeTargetTask; |
| | | import org.opends.server.tasks.InitializeTask; |
| | | import org.opends.server.tasks.TaskUtils; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.ExistingFileBehavior; |
| | | import org.opends.server.types.AbstractOperation; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.AttributeValue; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | |
| | | import org.opends.server.types.DereferencePolicy; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.ExistingFileBehavior; |
| | | import org.opends.server.types.LDAPException; |
| | | import org.opends.server.types.LDIFExportConfig; |
| | | import org.opends.server.types.LDIFImportConfig; |
| | |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SearchFilter; |
| | | import org.opends.server.types.SearchResultEntry; |
| | | import org.opends.server.types.SearchResultReference; |
| | | import org.opends.server.types.SearchScope; |
| | | import org.opends.server.types.SynchronizationProviderResult; |
| | | import org.opends.server.types.operation.PluginOperation; |
| | |
| | | * handle conflict resolution, |
| | | * handle protocol messages from the replicationServer. |
| | | */ |
| | | public class ReplicationDomain extends DirectoryThread |
| | | public class LDAPReplicationDomain extends ReplicationDomain |
| | | implements ConfigurationChangeListener<ReplicationDomainCfg>, |
| | | AlertGenerator |
| | | AlertGenerator, InternalSearchListener |
| | | { |
| | | /** |
| | | * The fully-qualified name of this class. |
| | |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private ReplicationMonitor monitor; |
| | | |
| | | private ReplicationBroker broker; |
| | | // Thread waiting for incoming update messages for this domain and pushing |
| | | // them to the global incoming update message queue for later processing by |
| | | // replay threads. |
| | | private ListenerThread listenerThread; |
| | | // The update to replay message queue where the listener thread is going to |
| | | // push incoming update messages. |
| | | private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue; |
| | | private SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs = |
| | | new TreeMap<ChangeNumber, UpdateMsg>(); |
| | | private AtomicInteger numRcvdUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numSentUpdates = new AtomicInteger(0); |
| | | private AtomicInteger numProcessedUpdates = new AtomicInteger(); |
| | | private AtomicInteger numResolvedNamingConflicts = new AtomicInteger(); |
| | | private AtomicInteger numResolvedModifyConflicts = new AtomicInteger(); |
| | | private AtomicInteger numUnresolvedNamingConflicts = new AtomicInteger(); |
| | | private int debugCount = 0; |
| | | private PersistentServerState state; |
| | | private final PersistentServerState state; |
| | | private int numReplayedPostOpCalled = 0; |
| | | |
| | | private int maxReceiveQueue = 0; |
| | | private int maxSendQueue = 0; |
| | | private int maxReceiveDelay = 0; |
| | | private int maxSendDelay = 0; |
| | | |
| | | private long generationId = -1; |
| | | private boolean generationIdSavedStatus = false; |
| | | |
| | | ChangeNumberGenerator generator; |
| | | private ChangeNumberGenerator generator; |
| | | |
| | | /** |
| | | * This object is used to store the list of update currently being |
| | |
| | | */ |
| | | private RemotePendingChanges remotePendingChanges; |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | private long heartbeatInterval = 0; |
| | | private short serverId; |
| | | |
| | | // The context related to an import or export being processed |
| | | // Null when none is being processed. |
| | | private IEContext ieContext = null; |
| | | |
| | | private Collection<String> replicationServers; |
| | | |
| | | private DN baseDn; |
| | | |
| | | private boolean shutdown = false; |
| | |
| | | private boolean disabled = false; |
| | | private boolean stateSavingDisabled = false; |
| | | |
| | | private int window = 100; |
| | | |
| | | /* |
| | | * Assured mode properties |
| | | */ |
| | | // Is assured mode enabled or not for this domain ? |
| | | private boolean assured = false; |
| | | // Assured sub mode (used when assured is true) |
| | | private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | // Safe Data level (used when assuredMode is SAFE_DATA) |
| | | private byte assuredSdLevel = (byte)1; |
| | | // Timeout (in milliseconds) when waiting for acknowledgments |
| | | private long assuredTimeout = 1000; |
| | | |
| | | // Group id |
| | | private byte groupId = (byte)1; |
| | | // Referrals urls to be published to other servers of the topology |
| | | // TODO: fill that with all currently opened urls if no urls configured |
| | | private List<String> refUrls = new ArrayList<String>(); |
| | | |
| | | // Current status for this replicated domain |
| | | private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS; |
| | | |
| | | /* |
| | | * Properties for the last topology info received from the network. |
| | | */ |
| | | // Info for other DSs. |
| | | // Warning: does not contain info for us (for our server id) |
| | | private List<DSInfo> dsList = new ArrayList<DSInfo>(); |
| | | // Info for other RSs. |
| | | private List<RSInfo> rsList = new ArrayList<RSInfo>(); |
| | | // This list is used to temporary store operations that needs |
| | | // to be replayed at session establishment time. |
| | | private TreeSet<FakeOperation> replayOperations = |
| | | new TreeSet<FakeOperation>(new FakeOperationComparator());; |
| | | |
| | | /** |
| | | * The isolation policy that this domain is going to use. |
| | |
| | | */ |
| | | private boolean done = true; |
| | | |
| | | private ServerStateFlush flushThread; |
| | | |
| | | /** |
| | | * This class contain the context related to an import or export |
| | | * launched on the domain. |
| | | * The thread that periodically saves the ServerState of this |
| | | * LDAPReplicationDomain in the database. |
| | | */ |
| | | private class IEContext |
| | | private class ServerStateFlush extends DirectoryThread |
| | | { |
| | | // The task that initiated the operation. |
| | | Task initializeTask; |
| | | // The input stream for the import |
| | | ReplLDIFInputStream ldifImportInputStream = null; |
| | | // The target in the case of an export |
| | | short exportTarget = RoutableMsg.UNKNOWN_SERVER; |
| | | // The source in the case of an import |
| | | short importSource = RoutableMsg.UNKNOWN_SERVER; |
| | | |
| | | // The total entry count expected to be processed |
| | | long entryCount = 0; |
| | | // The count for the entry not yet processed |
| | | long entryLeftCount = 0; |
| | | |
| | | // The exception raised when any |
| | | DirectoryException exception = null; |
| | | |
| | | /** |
| | | * Initializes the import/export counters with the provider value. |
| | | * @param total |
| | | * @param left |
| | | * @throws DirectoryException |
| | | */ |
| | | public void setCounters(long total, long left) |
| | | throws DirectoryException |
| | | protected ServerStateFlush() |
| | | { |
| | | entryCount = total; |
| | | entryLeftCount = left; |
| | | |
| | | if (initializeTask != null) |
| | | { |
| | | if (initializeTask instanceof InitializeTask) |
| | | { |
| | | ((InitializeTask)initializeTask).setTotal(entryCount); |
| | | ((InitializeTask)initializeTask).setLeft(entryCount); |
| | | } |
| | | else if (initializeTask instanceof InitializeTargetTask) |
| | | { |
| | | ((InitializeTargetTask)initializeTask).setTotal(entryCount); |
| | | ((InitializeTargetTask)initializeTask).setLeft(entryCount); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Update the counters of the task for each entry processed during |
| | | * an import or export. |
| | | * @throws DirectoryException |
| | | */ |
| | | public void updateCounters() |
| | | throws DirectoryException |
| | | { |
| | | entryLeftCount--; |
| | | |
| | | if (initializeTask != null) |
| | | { |
| | | if (initializeTask instanceof InitializeTask) |
| | | { |
| | | ((InitializeTask)initializeTask).setLeft(entryLeftCount); |
| | | } |
| | | else if (initializeTask instanceof InitializeTargetTask) |
| | | { |
| | | ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount); |
| | | } |
| | | } |
| | | super("Replication State Saver for server id " + |
| | | serverId + " and domain " + baseDn.toString()); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public String toString() |
| | | { |
| | | return new String("[ Entry count=" + this.entryCount + |
| | | ", Entry left count=" + this.entryLeftCount + "]"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This thread is launched when we want to export data to another server that |
| | | * has requested to be initialized with the data of our backend. |
| | | */ |
| | | private class ExportThread extends DirectoryThread |
| | | { |
| | | // Id of server that will receive updates |
| | | private short target; |
| | | |
| | | /** |
| | | * Constructor for the ExportThread. |
| | | * |
| | | * @param target Id of server that will receive updates |
| | | */ |
| | | public ExportThread(short target) |
| | | { |
| | | super("Export thread " + serverId); |
| | | this.target = target; |
| | | } |
| | | |
| | | /** |
| | | * Run method for this class. |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Export thread starting."); |
| | | } |
| | | done = false; |
| | | |
| | | try |
| | | while (shutdown == false) |
| | | { |
| | | initializeRemote(target, target, null); |
| | | } catch (DirectoryException de) |
| | | { |
| | | // An error message has been sent to the peer |
| | | // Nothing more to do locally |
| | | try |
| | | { |
| | | synchronized (this) |
| | | { |
| | | this.wait(1000); |
| | | if (!disabled && !stateSavingDisabled ) |
| | | { |
| | | // save the ServerState |
| | | state.save(); |
| | | } |
| | | } |
| | | } catch (InterruptedException e) |
| | | { } |
| | | } |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Export thread stopping."); |
| | | } |
| | | state.save(); |
| | | |
| | | done = true; |
| | | } |
| | | } |
| | | |
| | |
| | | * @param updateToReplayQueue The queue for update messages to replay. |
| | | * @throws ConfigException In case of invalid configuration. |
| | | */ |
| | | public ReplicationDomain(ReplicationDomainCfg configuration, |
| | | public LDAPReplicationDomain(ReplicationDomainCfg configuration, |
| | | LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue) |
| | | throws ConfigException |
| | | { |
| | | super("Replication State Saver for server id " + configuration.getServerId() |
| | | + " and domain " + configuration.getBaseDN()); |
| | | super(configuration.getBaseDN().toNormalizedString(), |
| | | (short) configuration.getServerId()); |
| | | |
| | | /** |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | long heartbeatInterval = 0; |
| | | |
| | | // Read the configuration parameters. |
| | | replicationServers = configuration.getReplicationServer(); |
| | | Set<String> replicationServers = configuration.getReplicationServer(); |
| | | serverId = (short) configuration.getServerId(); |
| | | baseDn = configuration.getBaseDN(); |
| | | window = configuration.getWindowSize(); |
| | | int window = configuration.getWindowSize(); |
| | | heartbeatInterval = configuration.getHeartbeatInterval(); |
| | | isolationpolicy = configuration.getIsolationPolicy(); |
| | | configDn = configuration.dn(); |
| | |
| | | switch (assuredType) |
| | | { |
| | | case NOT_ASSURED: |
| | | assured = false; |
| | | setAssured(false); |
| | | break; |
| | | case SAFE_DATA: |
| | | assured = true; |
| | | this.assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | setAssured(true); |
| | | setAssuredMode(AssuredMode.SAFE_DATA_MODE); |
| | | break; |
| | | case SAFE_READ: |
| | | assured = true; |
| | | this.assuredMode = AssuredMode.SAFE_READ_MODE; |
| | | setAssured(true); |
| | | setAssuredMode(AssuredMode.SAFE_READ_MODE); |
| | | break; |
| | | } |
| | | this.assuredSdLevel = (byte)configuration.getAssuredSdLevel(); |
| | | this.groupId = (byte)configuration.getGroupId(); |
| | | this.assuredTimeout = configuration.getAssuredTimeout(); |
| | | SortedSet<String> urls = configuration.getReferralsUrl(); |
| | | if (urls != null) |
| | | { |
| | | for (String url : urls) |
| | | { |
| | | this.refUrls.add(url); |
| | | } |
| | | } |
| | | setAssuredSdLevel((byte)configuration.getAssuredSdLevel()); |
| | | setAssuredTimeout(configuration.getAssuredTimeout()); |
| | | |
| | | setGroupId((byte)configuration.getGroupId()); |
| | | setURLs(configuration.getReferralsUrl()); |
| | | |
| | | /* |
| | | * Modify conflicts are solved for all suffixes but the schema suffix |
| | |
| | | solveConflictFlag = true; |
| | | } |
| | | |
| | | /* |
| | | * Create a new Persistent Server State that will be used to store |
| | | * the last ChangeNmber seen from all LDAP servers in the topology. |
| | | */ |
| | | state = new PersistentServerState(baseDn, serverId); |
| | | |
| | | /* |
| | | * Create a replication monitor object responsible for publishing |
| | | * monitoring information below cn=monitor. |
| | | */ |
| | | monitor = new ReplicationMonitor(this); |
| | | DirectoryServer.registerMonitorProvider(monitor); |
| | | |
| | | Backend backend = retrievesBackend(baseDn); |
| | | if (backend == null) |
| | | { |
| | |
| | | } |
| | | |
| | | /* |
| | | * create the broker object used to publish and receive changes |
| | | * Create a new Persistent Server State that will be used to store |
| | | * the last ChangeNmber seen from all LDAP servers in the topology. |
| | | */ |
| | | broker = new ReplicationBroker(this, state, baseDn, serverId, |
| | | maxReceiveQueue, maxReceiveDelay, maxSendQueue, maxSendDelay, window, |
| | | heartbeatInterval, generationId, |
| | | new ReplSessionSecurity(configuration),getGroupId()); |
| | | state = new PersistentServerState(baseDn, serverId, getServerState()); |
| | | |
| | | broker.start(replicationServers); |
| | | startPublishService(replicationServers, window, heartbeatInterval); |
| | | |
| | | /* |
| | | * ChangeNumberGenerator is used to create new unique ChangeNumbers |
| | |
| | | * The generator time is adjusted to the time of the last CN received from |
| | | * remote other servers. |
| | | */ |
| | | generator = |
| | | new ChangeNumberGenerator(serverId, state); |
| | | generator = getGenerator(); |
| | | |
| | | pendingChanges = |
| | | new PendingChanges(generator, |
| | | broker, state); |
| | | new PendingChanges(generator, this); |
| | | |
| | | remotePendingChanges = new RemotePendingChanges(generator, state); |
| | | remotePendingChanges = new RemotePendingChanges(getServerState()); |
| | | |
| | | // listen for changes on the configuration |
| | | configuration.addChangeListener(this); |
| | |
| | | DirectoryServer.registerAlertGenerator(this); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Returns the base DN of this ReplicationDomain. |
| | | * |
| | |
| | | { |
| | | // this isolation policy specifies that the updates are denied |
| | | // when the broker is not connected. |
| | | return broker.isConnected(); |
| | | return isConnected(); |
| | | } |
| | | // we should never get there as the only possible policies are |
| | | // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES |
| | |
| | | } |
| | | |
| | | /** |
| | | * Receives an update message from the replicationServer. |
| | | * also responsible for updating the list of pending changes |
| | | * @return the received message - null if none |
| | | */ |
| | | public UpdateMsg receive() |
| | | { |
| | | UpdateMsg update = null; |
| | | |
| | | while ( (update == null) && (!shutdown) ) |
| | | { |
| | | InitializeRequestMsg initMsg = null; |
| | | ReplicationMsg msg; |
| | | try |
| | | { |
| | | msg = broker.receive(); |
| | | if (msg == null) |
| | | { |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | if (!(msg instanceof HeartbeatMsg)) |
| | | TRACER.debugVerbose("Message received <" + msg + ">"); |
| | | |
| | | if (msg instanceof AckMsg) |
| | | { |
| | | AckMsg ack = (AckMsg) msg; |
| | | receiveAck(ack); |
| | | } |
| | | else if (msg instanceof InitializeRequestMsg) |
| | | { |
| | | // Another server requests us to provide entries |
| | | // for a total update |
| | | initMsg = (InitializeRequestMsg)msg; |
| | | } |
| | | else if (msg instanceof InitializeTargetMsg) |
| | | { |
| | | // Another server is exporting its entries to us |
| | | InitializeTargetMsg importMsg = (InitializeTargetMsg) msg; |
| | | |
| | | try |
| | | { |
| | | // This must be done while we are still holding the |
| | | // broker lock because we are now going to receive a |
| | | // bunch of entries from the remote server and we |
| | | // want the import thread to catch them and |
| | | // not the ListenerThread. |
| | | initialize(importMsg); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Returns an error message to notify the sender |
| | | ErrorMsg errorMsg = |
| | | new ErrorMsg(importMsg.getsenderID(), |
| | | de.getMessageObject()); |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(de.getMessageObject()); |
| | | TRACER.debugInfo(Message.toString(mb.toMessage())); |
| | | broker.publish(errorMsg); |
| | | } |
| | | } |
| | | else if (msg instanceof ErrorMsg) |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | // This is an error termination for the 2 following cases : |
| | | // - either during an export |
| | | // - or before an import really started |
| | | // For example, when we publish a request and the |
| | | // replicationServer did not find any import source. |
| | | abandonImportExport((ErrorMsg)msg); |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | * Log error message |
| | | */ |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | } |
| | | } |
| | | if (msg instanceof TopologyMsg) |
| | | { |
| | | TopologyMsg topoMsg = (TopologyMsg)msg; |
| | | receiveTopo(topoMsg); |
| | | } |
| | | if (msg instanceof ChangeStatusMsg) |
| | | { |
| | | ChangeStatusMsg csMsg = (ChangeStatusMsg)msg; |
| | | receiveChangeStatus(csMsg); |
| | | } |
| | | else if (msg instanceof UpdateMsg) |
| | | { |
| | | update = (UpdateMsg) msg; |
| | | receiveUpdate(update); |
| | | } |
| | | } |
| | | catch (SocketTimeoutException e) |
| | | { |
| | | // just retry |
| | | } |
| | | // Test if we have received and export request message and |
| | | // if that's the case handle it now. |
| | | // This must be done outside of the portion of code protected |
| | | // by the broker lock so that we keep receiveing update |
| | | // when we are doing and export and so that a possible |
| | | // closure of the socket happening when we are publishing the |
| | | // entries to the remote can be handled by the other |
| | | // replay thread when they call this method and therefore the |
| | | // broker.receive() method. |
| | | if (initMsg != null) |
| | | { |
| | | // Do this work in a thread to allow replay thread continue working |
| | | ExportThread exportThread = new ExportThread(initMsg.getsenderID()); |
| | | exportThread.start(); |
| | | } |
| | | } |
| | | return update; |
| | | } |
| | | |
| | | /** |
| | | * Processes an incoming TopologyMsg. |
| | | * Updates the structures for the local view of the topology. |
| | | * |
| | | * @param topoMsg The topology information received from RS. |
| | | */ |
| | | public void receiveTopo(TopologyMsg topoMsg) |
| | | { |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn |
| | | + " received topology info update:\n" + topoMsg); |
| | | |
| | | // Store new lists |
| | | synchronized(getDsList()) |
| | | { |
| | | synchronized(getRsList()) |
| | | { |
| | | dsList = topoMsg.getDsList(); |
| | | rsList = topoMsg.getRsList(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Set the initial status of the domain, once he is connected to the topology. |
| | | * @param initStatus The status to enter the state machine with |
| | | */ |
| | | public void setInitialStatus(ServerStatus initStatus) |
| | | { |
| | | // Sanity check: is it a valid initial status? |
| | | if (!isValidInitialStatus(initStatus)) |
| | | { |
| | | Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(), |
| | | baseDn.toString(), Short.toString(serverId)); |
| | | logError(msg); |
| | | } else |
| | | { |
| | | status = initStatus; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Processes an incoming ChangeStatusMsg. Compute new status according to |
| | | * given order. Then update domain for being compliant with new status |
| | | * definition. |
| | | * @param csMsg The received status message |
| | | */ |
| | | private void receiveChangeStatus(ChangeStatusMsg csMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn + |
| | | " received change status message:\n" + csMsg); |
| | | |
| | | ServerStatus reqStatus = csMsg.getRequestedStatus(); |
| | | |
| | | // Translate requested status to a state machine event |
| | | StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus); |
| | | if (event == StatusMachineEvent.INVALID_EVENT) |
| | | { |
| | | Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(), |
| | | baseDn.toString(), Short.toString(serverId)); |
| | | logError(msg); |
| | | return; |
| | | } |
| | | |
| | | // Compute new status and do matching tasks |
| | | // Use synchronized as admin task (thread) could order to go in admin status |
| | | // for instance (concurrent with receive thread). |
| | | synchronized (status) |
| | | { |
| | | ServerStatus newStatus = |
| | | StatusMachine.computeNewStatus(status, event); |
| | | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | | } |
| | | |
| | | // Store new status |
| | | status = newStatus; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn + |
| | | " new status is: " + status); |
| | | |
| | | // Perform whatever actions are needed to apply properties for being |
| | | // compliant with new status |
| | | updateDomainForNewStatus(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Called when first connection or disconnection detected. |
| | | */ |
| | | public void toNotConnectedStatus() |
| | | { |
| | | // Go into not connected status |
| | | // Use synchronized as somebody could ask another status change at the same |
| | | // time |
| | | synchronized (status) |
| | | { |
| | | StatusMachineEvent event = |
| | | StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT; |
| | | ServerStatus newStatus = |
| | | StatusMachine.computeNewStatus(status, event); |
| | | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | | } |
| | | |
| | | // Store new status |
| | | status = newStatus; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn + |
| | | " new status is: " + status); |
| | | |
| | | // Perform whatever actions are needed to apply properties for being |
| | | // compliant with new status |
| | | updateDomainForNewStatus(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Perform whatever actions are needed to apply properties for being |
| | | * compliant with new status. Must be called in synchronized section for |
| | | * status. The new status is already set in status variable. |
| | | */ |
| | | private void updateDomainForNewStatus() |
| | | { |
| | | switch (status) |
| | | { |
| | | case NOT_CONNECTED_STATUS: |
| | | break; |
| | | case NORMAL_STATUS: |
| | | break; |
| | | case DEGRADED_STATUS: |
| | | break; |
| | | case FULL_UPDATE_STATUS: |
| | | // Signal RS we just entered the full update status |
| | | broker.signalStatusChange(status); |
| | | break; |
| | | case BAD_GEN_ID_STATUS: |
| | | break; |
| | | default: |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("updateDomainForNewStatus: unexpected status: " + |
| | | status); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Do the necessary processing when an UpdateMsg was received. |
| | | * |
| | | * @param update The received UpdateMsg. |
| | | */ |
| | | public void receiveUpdate(UpdateMsg update) |
| | | { |
| | | remotePendingChanges.putRemoteUpdate(update); |
| | | numRcvdUpdates.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * Do the necessary processing when an AckMsg is received. |
| | | * |
| | | * @param ack The AckMsg that was received. |
| | | */ |
| | | public void receiveAck(AckMsg ack) |
| | | { |
| | | UpdateMsg update; |
| | | ChangeNumber changeNumber = ack.getChangeNumber(); |
| | | |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | update = waitingAckMsgs.remove(changeNumber); |
| | | } |
| | | if (update != null) |
| | | { |
| | | synchronized (update) |
| | | { |
| | | update.notify(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Check if an operation must be synchronized. |
| | | * Also update the list of pending changes and the server RUV |
| | | * @param op the operation |
| | |
| | | { |
| | | numReplayedPostOpCalled++; |
| | | } |
| | | UpdateMsg msg = null; |
| | | LDAPUpdateMsg msg = null; |
| | | |
| | | // Note that a failed non-replication operation might not have a change |
| | | // number. |
| | | ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op); |
| | | |
| | | boolean isAssured = isAssured(op); |
| | | |
| | | if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation())) |
| | | { |
| | | // Generate a replication message for a successful non-replication |
| | | // operation. |
| | | msg = UpdateMsg.generateMsg(op); |
| | | msg = LDAPUpdateMsg.generateMsg(op); |
| | | |
| | | if (msg == null) |
| | | { |
| | |
| | | return; |
| | | } |
| | | |
| | | if (msg != null && isAssured) |
| | | { |
| | | synchronized (waitingAckMsgs) |
| | | { |
| | | // Add the assured message to the list of update that are |
| | | // waiting acknowledgements |
| | | waitingAckMsgs.put(curChangeNumber, msg); |
| | | } |
| | | } |
| | | |
| | | if (generationIdSavedStatus != true) |
| | | { |
| | | this.saveGenerationId(generationId); |
| | |
| | | |
| | | if (!op.isSynchronizationOperation()) |
| | | { |
| | | int pushedChanges = pendingChanges.pushCommittedChanges(); |
| | | numSentUpdates.addAndGet(pushedChanges); |
| | | pendingChanges.pushCommittedChanges(); |
| | | } |
| | | |
| | | // Wait for acknowledgement of an assured message. |
| | | if (msg != null && isAssured) |
| | | { |
| | | synchronized (msg) |
| | | { |
| | | while (waitingAckMsgs.containsKey(msg.getChangeNumber())) |
| | | { |
| | | // TODO : should have a configurable timeout to get |
| | | // out of this loop |
| | | try |
| | | { |
| | | msg.wait(1000); |
| | | } catch (InterruptedException e) |
| | | { } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * get the number of updates received by the replication plugin. |
| | | * |
| | | * @return the number of updates received |
| | | */ |
| | | public int getNumRcvdUpdates() |
| | | { |
| | | if (numRcvdUpdates != null) |
| | | return numRcvdUpdates.get(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of updates sent by the replication plugin. |
| | | * |
| | | * @return the number of updates sent |
| | | */ |
| | | public int getNumSentUpdates() |
| | | { |
| | | if (numSentUpdates != null) |
| | | return numSentUpdates.get(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Increment the number of processed updates. |
| | | */ |
| | | public void incProcessedUpdates() |
| | | { |
| | | numProcessedUpdates.incrementAndGet(); |
| | | } |
| | | |
| | | /** |
| | | * get the number of updates replayed by the replication. |
| | | * |
| | | * @return The number of updates replayed by the replication |
| | | */ |
| | | public int getNumProcessedUpdates() |
| | | { |
| | | if (numProcessedUpdates != null) |
| | | return numProcessedUpdates.get(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * get the number of updates replayed successfully by the replication. |
| | | * |
| | | * @return The number of updates replayed successfully |
| | |
| | | } |
| | | |
| | | /** |
| | | * get the ServerState. |
| | | * |
| | | * @return the ServerState |
| | | */ |
| | | public ServerState getServerState() |
| | | { |
| | | return state; |
| | | } |
| | | |
| | | /** |
| | | * Get the debugCount. |
| | | * |
| | | * @return Returns the debugCount. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Send an Ack message. |
| | | * |
| | | * @param changeNumber The ChangeNumber for which the ack must be sent. |
| | | */ |
| | | public void ack(ChangeNumber changeNumber) |
| | | { |
| | | broker.publish(new AckMsg(changeNumber)); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | done = false; |
| | | |
| | | // Create the listener thread |
| | | listenerThread = new ListenerThread(this, updateToReplayQueue); |
| | | listenerThread.start(); |
| | | |
| | | while (shutdown == false) |
| | | { |
| | | try |
| | | { |
| | | synchronized (this) |
| | | { |
| | | this.wait(1000); |
| | | if (!disabled && !stateSavingDisabled ) |
| | | { |
| | | // save the RUV |
| | | state.save(); |
| | | } |
| | | } |
| | | } catch (InterruptedException e) |
| | | { } |
| | | } |
| | | state.save(); |
| | | |
| | | done = true; |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ReplicationDomain. |
| | | */ |
| | | public void shutdown() |
| | |
| | | // stop the flush thread |
| | | shutdown = true; |
| | | |
| | | // Stop the listener thread |
| | | if (listenerThread != null) |
| | | // stop the thread in charge of flushing the ServerState. |
| | | if (flushThread != null) |
| | | { |
| | | listenerThread.shutdown(); |
| | | synchronized (flushThread) |
| | | { |
| | | flushThread.notify(); |
| | | } |
| | | } |
| | | |
| | | synchronized (this) |
| | | { |
| | | this.notify(); |
| | | } |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName()); |
| | | |
| | | DirectoryServer.deregisterAlertGenerator(this); |
| | | |
| | | // stop the ReplicationBroker |
| | | broker.stop(); |
| | | |
| | | // Wait for the listener thread to stop |
| | | if (listenerThread != null) |
| | | listenerThread.waitForShutdown(); |
| | | // stop the ReplicationDomain |
| | | stopDomain(); |
| | | |
| | | // wait for completion of the persistentServerState thread. |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the name of the replicationServer to which this domain is currently |
| | | * connected. |
| | | * |
| | | * @return the name of the replicationServer to which this domain |
| | | * is currently connected. |
| | | */ |
| | | public String getReplicationServer() |
| | | { |
| | | if (broker != null) |
| | | return broker.getReplicationServer(); |
| | | else |
| | | return "Not connected"; |
| | | } |
| | | |
| | | /** |
| | | * Create and replay a synchronized Operation from an UpdateMsg. |
| | | * |
| | | * @param msg The UpdateMsg to be replayed. |
| | | */ |
| | | public void replay(UpdateMsg msg) |
| | | public void replay(LDAPUpdateMsg msg) |
| | | { |
| | | Operation op = null; |
| | | boolean done = false; |
| | |
| | | // whose dependency has been replayed until no more left. |
| | | do |
| | | { |
| | | String replayErrorMsg = null; |
| | | try |
| | | { |
| | | op = msg.createOperation(conn); |
| | |
| | | |
| | | while ((!dependency) && (!done) && (retryCount-- > 0)) |
| | | { |
| | | // Try replay the operation |
| | | op.setInternalOperation(true); |
| | | op.setSynchronizationOperation(true); |
| | | changeNumber = OperationContext.getChangeNumber(op); |
| | | ((AbstractOperation) op).run(); |
| | | |
| | | // Try replay the operation |
| | | ResultCode result = op.getResultCode(); |
| | | |
| | | if (result != ResultCode.SUCCESS) |
| | |
| | | op.getErrorMessage().toString()); |
| | | logError(message); |
| | | numUnresolvedNamingConflicts.incrementAndGet(); |
| | | |
| | | replayErrorMsg = message.toString(); |
| | | updateError(changeNumber); |
| | | } |
| | | } catch (ASN1Exception e) |
| | |
| | | Message message = ERR_EXCEPTION_DECODING_OPERATION.get( |
| | | String.valueOf(msg) + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | replayErrorMsg = message.toString(); |
| | | } catch (LDAPException e) |
| | | { |
| | | Message message = ERR_EXCEPTION_DECODING_OPERATION.get( |
| | | String.valueOf(msg) + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | replayErrorMsg = message.toString(); |
| | | } catch (DataFormatException e) |
| | | { |
| | | Message message = ERR_EXCEPTION_DECODING_OPERATION.get( |
| | | String.valueOf(msg) + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | replayErrorMsg = message.toString(); |
| | | } catch (Exception e) |
| | | { |
| | | if (changeNumber != null) |
| | |
| | | Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get( |
| | | stackTraceToSingleLineString(e), op.toString()); |
| | | logError(message); |
| | | replayErrorMsg = message.toString(); |
| | | updateError(changeNumber); |
| | | } else |
| | | { |
| | | Message message = ERR_EXCEPTION_DECODING_OPERATION.get( |
| | | String.valueOf(msg) + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | replayErrorMsg = message.toString(); |
| | | } |
| | | } finally |
| | | { |
| | | if (!dependency) |
| | | { |
| | | broker.updateWindowAfterReplay(); |
| | | if (msg.isAssured()) |
| | | ack(msg.getChangeNumber()); |
| | | incProcessedUpdates(); |
| | | processUpdateDone(msg, replayErrorMsg); |
| | | } |
| | | } |
| | | |
| | |
| | | * @return true if the process is completed, false if it must continue.. |
| | | */ |
| | | private boolean solveNamingConflict(DeleteOperation op, |
| | | UpdateMsg msg) |
| | | LDAPUpdateMsg msg) |
| | | { |
| | | ResultCode result = op.getResultCode(); |
| | | DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT); |
| | |
| | | * @throws Exception When the operation is not valid. |
| | | */ |
| | | private boolean solveNamingConflict(ModifyDNOperation op, |
| | | UpdateMsg msg) throws Exception |
| | | LDAPUpdateMsg msg) throws Exception |
| | | { |
| | | ResultCode result = op.getResultCode(); |
| | | ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if an operation must be processed as an assured operation. |
| | | * |
| | | * @param op the operation to be checked. |
| | | * @return true if the operations must be processed as an assured operation. |
| | | */ |
| | | private boolean isAssured(PostOperationOperation op) |
| | | { |
| | | // TODO : should have a filtering mechanism for checking |
| | | // operation that are assured and operations that are not. |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum receive window size. |
| | | * |
| | | * @return The maximum receive window size. |
| | | */ |
| | | public int getMaxRcvWindow() |
| | | { |
| | | if (broker != null) |
| | | return broker.getMaxRcvWindow(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Get the current receive window size. |
| | | * |
| | | * @return The current receive window size. |
| | | */ |
| | | public int getCurrentRcvWindow() |
| | | { |
| | | if (broker != null) |
| | | return broker.getCurrentRcvWindow(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Get the maximum send window size. |
| | | * |
| | | * @return The maximum send window size. |
| | | */ |
| | | public int getMaxSendWindow() |
| | | { |
| | | if (broker != null) |
| | | return broker.getMaxSendWindow(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Get the current send window size. |
| | | * |
| | | * @return The current send window size. |
| | | */ |
| | | public int getCurrentSendWindow() |
| | | { |
| | | if (broker != null) |
| | | return broker.getCurrentSendWindow(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of times the replication connection was lost. |
| | | * @return The number of times the replication connection was lost. |
| | | */ |
| | | public int getNumLostConnections() |
| | | { |
| | | if (broker != null) |
| | | return broker.getNumLostConnections(); |
| | | else |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * Get the number of modify conflicts successfully resolved. |
| | | * @return The number of modify conflicts successfully resolved. |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the number of namign conflicts successfully resolved. |
| | | * Get the number of naming conflicts successfully resolved. |
| | | * @return The number of naming conflicts successfully resolved. |
| | | */ |
| | | public int getNumResolvedNamingConflicts() |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the server ID. |
| | | * @return The server ID. |
| | | */ |
| | | public short getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | |
| | | /** |
| | | * Check if the domain solve conflicts. |
| | | * |
| | | * @return a boolean indicating if the domain should sove conflicts. |
| | | * @return a boolean indicating if the domain should solve conflicts. |
| | | */ |
| | | public boolean solveConflict() |
| | | { |
| | |
| | | state.save(); |
| | | state.clearInMemory(); |
| | | disabled = true; |
| | | |
| | | // Stop the listener thread |
| | | if (listenerThread != null) |
| | | listenerThread.shutdown(); |
| | | |
| | | broker.stop(); // This will cut the session and wake up the listener |
| | | |
| | | // Wait for the listener thread to stop |
| | | if (listenerThread != null) |
| | | listenerThread.waitForShutdown(); |
| | | disableService(); // This will cut the session and wake up the listener |
| | | } |
| | | |
| | | /** |
| | |
| | | return; |
| | | } |
| | | |
| | | // After an on-line import, the value of the generationId is new |
| | | // and it is necessary for the broker to send this new value as part |
| | | // of the serverStart message. |
| | | broker.setGenerationId(generationId); |
| | | |
| | | broker.start(replicationServers); |
| | | |
| | | // Create the listener thread |
| | | listenerThread = new ListenerThread(this, updateToReplayQueue); |
| | | listenerThread.start(); |
| | | enableService(); |
| | | |
| | | disabled = false; |
| | | } |
| | |
| | | */ |
| | | public long computeGenerationId() throws DirectoryException |
| | | { |
| | | long genId = exportBackend(true); |
| | | long genId = exportBackend(null, true); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Computed generationId: generationId=" + genId); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the generationId set for this domain. |
| | | * |
| | | * @return The generationId. |
| | | * {@inheritDoc} |
| | | */ |
| | | public long getGenerationId() |
| | | public long getGenerationID() |
| | | { |
| | | return generationId; |
| | | } |
| | |
| | | /** |
| | | * Stores the value of the generationId. |
| | | * @param generationId The value of the generationId. |
| | | * @return a ResultCode indicating if the method was successfull. |
| | | * @return a ResultCode indicating if the method was successful. |
| | | */ |
| | | public ResultCode saveGenerationId(long generationId) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Reset the generationId of this domain in the whole topology. |
| | | * A message is sent to the Replication Servers for them to reset |
| | | * their change dbs. |
| | | * |
| | | * @param generationIdNewValue The new value of the generation Id. |
| | | * @throws DirectoryException when an error occurs |
| | | */ |
| | | public void resetGenerationId(Long generationIdNewValue) |
| | | throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | this.getName() + "resetGenerationId" + generationIdNewValue); |
| | | |
| | | if (!isConnected()) |
| | | { |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get( |
| | | baseDn.toNormalizedString()); |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | } |
| | | |
| | | ResetGenerationIdMsg genIdMessage = null; |
| | | |
| | | if (generationIdNewValue == null) |
| | | { |
| | | genIdMessage = new ResetGenerationIdMsg(this.generationId); |
| | | } |
| | | else |
| | | { |
| | | genIdMessage = new ResetGenerationIdMsg(generationIdNewValue); |
| | | } |
| | | broker.publish(genIdMessage); |
| | | } |
| | | |
| | | /** |
| | | * Do whatever is needed when a backup is started. |
| | | * We need to make sure that the serverState is correclty save. |
| | | * We need to make sure that the serverState is correctly save. |
| | | */ |
| | | public void backupStart() |
| | | { |
| | |
| | | * Total Update >> |
| | | */ |
| | | |
| | | /** |
| | | * Receives bytes related to an entry in the context of an import to |
| | | * initialize the domain (called by ReplLDIFInputStream). |
| | | * |
| | | * @return The bytes. Null when the Done or Err message has been received |
| | | */ |
| | | public byte[] receiveEntryBytes() |
| | | { |
| | | ReplicationMsg msg; |
| | | while (true) |
| | | { |
| | | try |
| | | { |
| | | msg = broker.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugVerbose( |
| | | " sid:" + this.serverId + |
| | | " base DN:" + this.baseDn + |
| | | " Import EntryBytes received " + msg); |
| | | if (msg == null) |
| | | { |
| | | // The server is in the shutdown process |
| | | return null; |
| | | } |
| | | |
| | | if (msg instanceof EntryMsg) |
| | | { |
| | | EntryMsg entryMsg = (EntryMsg)msg; |
| | | byte[] entryBytes = entryMsg.getEntryBytes(); |
| | | ieContext.updateCounters(); |
| | | return entryBytes; |
| | | } |
| | | else if (msg instanceof DoneMsg) |
| | | { |
| | | // This is the normal termination of the import |
| | | // No error is stored and the import is ended |
| | | // by returning null |
| | | return null; |
| | | } |
| | | else if (msg instanceof ErrorMsg) |
| | | { |
| | | // This is an error termination during the import |
| | | // The error is stored and the import is ended |
| | | // by returning null |
| | | ErrorMsg errorMsg = (ErrorMsg)msg; |
| | | ieContext.exception = new DirectoryException( |
| | | ResultCode.OTHER, |
| | | errorMsg.getDetails()); |
| | | return null; |
| | | } |
| | | else |
| | | { |
| | | // Other messages received during an import are trashed |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // TODO: i18n |
| | | ieContext.exception = new DirectoryException(ResultCode.OTHER, |
| | | Message.raw("received an unexpected message type" + |
| | | e.getLocalizedMessage())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Processes an error message received while an import/export is |
| | | * on going. |
| | | * @param errorMsg The error message received. |
| | | */ |
| | | protected void abandonImportExport(ErrorMsg errorMsg) |
| | | { |
| | | // FIXME TBD Treat the case where the error happens while entries |
| | | // are being exported |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugVerbose( |
| | | " abandonImportExport:" + this.serverId + |
| | | " base DN:" + this.baseDn + |
| | | " Error Msg received " + errorMsg); |
| | | |
| | | if (ieContext != null) |
| | | { |
| | | ieContext.exception = new DirectoryException(ResultCode.OTHER, |
| | | errorMsg.getDetails()); |
| | | |
| | | if (ieContext.initializeTask instanceof InitializeTask) |
| | | { |
| | | // Update the task that initiated the import |
| | | ((InitializeTask)ieContext.initializeTask). |
| | | updateTaskCompletionState(ieContext.exception); |
| | | |
| | | releaseIEContext(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clears all the entries from the JE backend determined by the |
| | |
| | | } |
| | | |
| | | /** |
| | | * This method trigger an export of the replicated data. |
| | | * |
| | | * @param output The OutputStream where the export should |
| | | * be produced. |
| | | * @throws DirectoryException When needed. |
| | | */ |
| | | protected void exportBackend(OutputStream output) throws DirectoryException |
| | | { |
| | | exportBackend(output, false); |
| | | } |
| | | |
| | | /** |
| | | * Export the entries from the backend and/or compute the generation ID. |
| | | * The ieContext must have been set before calling. |
| | | * @param checksumOutput true is the exportBackend is called to compute |
| | | * the generationID |
| | | * |
| | | * @return The computed generationID. |
| | | * @param output The OutputStream where the export should |
| | | * be produced. |
| | | * @param checksumOutput A boolean indicating if this export is |
| | | * invoked to perform a checksum only |
| | | * |
| | | * @return The computed GenerationID. |
| | | * |
| | | * @throws DirectoryException when an error occurred |
| | | */ |
| | | protected long exportBackend(boolean checksumOutput) |
| | | protected long exportBackend(OutputStream output, boolean checksumOutput) |
| | | throws DirectoryException |
| | | { |
| | | long genID = 0; |
| | |
| | | } |
| | | |
| | | OutputStream os; |
| | | ReplLDIFOutputStream ros; |
| | | ReplLDIFOutputStream ros = null; |
| | | |
| | | if (checksumOutput) |
| | | { |
| | |
| | | } |
| | | else |
| | | { |
| | | ros = new ReplLDIFOutputStream(this, (short)-1); |
| | | os = ros; |
| | | os = output; |
| | | } |
| | | LDIFExportConfig exportConfig = new LDIFExportConfig(os); |
| | | |
| | |
| | | } |
| | | catch (DirectoryException de) |
| | | { |
| | | if ((checksumOutput) && |
| | | if ((ros != null) && |
| | | (ros.getNumExportedEntries() >= entryCount)) |
| | | { |
| | | // This is the normal end when computing the generationId |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the internal broker to perform some operations on it. |
| | | * |
| | | * @return The broker for this domain. |
| | | */ |
| | | ReplicationBroker getBroker() |
| | | { |
| | | return broker; |
| | | } |
| | | |
| | | /** |
| | | * Exports an entry in LDIF format. |
| | | * |
| | | * @param lDIFEntry The entry to be exported.. |
| | | * |
| | | * @throws IOException when an error occurred. |
| | | */ |
| | | public void exportLDIFEntry(String lDIFEntry) throws IOException |
| | | { |
| | | // If an error was raised - like receiving an ErrorMsg |
| | | // we just let down the export. |
| | | if (ieContext.exception != null) |
| | | { |
| | | IOException ioe = new IOException(ieContext.exception.getMessage()); |
| | | ieContext = null; |
| | | throw ioe; |
| | | } |
| | | |
| | | EntryMsg entryMessage = new EntryMsg( |
| | | serverId, ieContext.exportTarget, lDIFEntry.getBytes()); |
| | | broker.publish(entryMessage); |
| | | |
| | | try |
| | | { |
| | | ieContext.updateCounters(); |
| | | } |
| | | catch (DirectoryException de) |
| | | { |
| | | throw new IOException(de.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Initializes this domain from another source server. |
| | | * |
| | | * @param source The source from which to initialize |
| | | * @param initTask The task that launched the initialization |
| | | * and should be updated of its progress. |
| | | * @throws DirectoryException when an error occurs |
| | | */ |
| | | public void initializeFromRemote(short source, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Entering initializeFromRemote"); |
| | | |
| | | acquireIEContext(); |
| | | ieContext.initializeTask = initTask; |
| | | |
| | | InitializeRequestMsg initializeMsg = new InitializeRequestMsg( |
| | | baseDn, serverId, source); |
| | | |
| | | // Publish Init request msg |
| | | broker.publish(initializeMsg); |
| | | |
| | | // .. we expect to receive entries or err after that |
| | | } |
| | | |
| | | /** |
| | | * Verifies that the given string represents a valid source |
| | | * from which this server can be initialized. |
| | | * @param sourceString The string representing the source |
| | | * @return The source as a short value |
| | | * @throws DirectoryException if the string is not valid |
| | | */ |
| | | public short decodeSource(String sourceString) |
| | | throws DirectoryException |
| | | { |
| | | short source = 0; |
| | | Throwable cause = null; |
| | | try |
| | | { |
| | | source = Integer.decode(sourceString).shortValue(); |
| | | if ((source >= -1) && (source != serverId)) |
| | | { |
| | | // TODO Verifies serverID is in the domain |
| | | // We shold check here that this is a server implied |
| | | // in the current domain. |
| | | return source; |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | cause = e; |
| | | } |
| | | |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_INVALID_IMPORT_SOURCE.get(); |
| | | if (cause != null) |
| | | { |
| | | throw new DirectoryException( |
| | | resultCode, message, cause); |
| | | } |
| | | else |
| | | { |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Verifies that the given string represents a valid source |
| | | * from which this server can be initialized. |
| | | * @param targetString The string representing the source |
| | | * @return The source as a short value |
| | | * @throws DirectoryException if the string is not valid |
| | | */ |
| | | public short decodeTarget(String targetString) |
| | | throws DirectoryException |
| | | { |
| | | short target = 0; |
| | | Throwable cause; |
| | | if (targetString.equalsIgnoreCase("all")) |
| | | { |
| | | return RoutableMsg.ALL_SERVERS; |
| | | } |
| | | |
| | | // So should be a serverID |
| | | try |
| | | { |
| | | target = Integer.decode(targetString).shortValue(); |
| | | if (target >= 0) |
| | | { |
| | | // FIXME Could we check now that it is a know server in the domain ? |
| | | } |
| | | return target; |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | cause = e; |
| | | } |
| | | ResultCode resultCode = ResultCode.OTHER; |
| | | Message message = ERR_INVALID_EXPORT_TARGET.get(); |
| | | |
| | | if (cause != null) |
| | | throw new DirectoryException( |
| | | resultCode, message, cause); |
| | | else |
| | | throw new DirectoryException( |
| | | resultCode, message); |
| | | |
| | | } |
| | | |
| | | private synchronized void acquireIEContext() |
| | | throws DirectoryException |
| | | { |
| | | if (ieContext != null) |
| | | { |
| | | // Rejects 2 simultaneous exports |
| | | Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get(); |
| | | throw new DirectoryException(ResultCode.OTHER, |
| | | message); |
| | | } |
| | | |
| | | ieContext = new IEContext(); |
| | | } |
| | | |
| | | private synchronized void releaseIEContext() |
| | | { |
| | | ieContext = null; |
| | | } |
| | | |
| | | /** |
| | | * Process the initialization of some other server or servers in the topology |
| | | * specified by the target argument. |
| | | * @param target The target that should be initialized |
| | | * @param initTask The task that triggers this initialization and that should |
| | | * be updated with its progress. |
| | | * |
| | | * @exception DirectoryException When an error occurs. |
| | | */ |
| | | public void initializeRemote(short target, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | initializeRemote(target, serverId, initTask); |
| | | } |
| | | |
| | | /** |
| | | * Process the initialization of some other server or servers in the topology |
| | | * specified by the target argument when this initialization specifying the |
| | | * server that requests the initialization. |
| | | * |
| | | * @param target The target that should be initialized. |
| | | * @param requestorID The server that initiated the export. |
| | | * @param initTask The task that triggers this initialization and that should |
| | | * be updated with its progress. |
| | | * |
| | | * @exception DirectoryException When an error occurs. |
| | | */ |
| | | public void initializeRemote(short target, short requestorID, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get( |
| | | Short.toString(serverId), |
| | | baseDn.toString(), |
| | | Short.toString(requestorID)); |
| | | logError(msg); |
| | | |
| | | boolean contextAcquired=false; |
| | | |
| | | try |
| | | { |
| | | Backend backend = retrievesBackend(this.baseDn); |
| | | |
| | | if (!backend.supportsLDIFExport()) |
| | | { |
| | | Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get( |
| | | backend.getBackendID().toString()); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | acquireIEContext(); |
| | | contextAcquired = true; |
| | | |
| | | // The number of entries to be exported is the number of entries under |
| | | // the base DN entry and the base entry itself. |
| | | long entryCount = backend.numSubordinates(baseDn, true) + 1; |
| | | ieContext.exportTarget = target; |
| | | if (initTask != null) |
| | | { |
| | | ieContext.initializeTask = initTask; |
| | | } |
| | | ieContext.setCounters(entryCount, entryCount); |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMsg initializeMessage = new InitializeTargetMsg( |
| | | baseDn, serverId, ieContext.exportTarget, requestorID, entryCount); |
| | | |
| | | broker.publish(initializeMessage); |
| | | |
| | | exportBackend(false); |
| | | |
| | | // Notify the peer of the success |
| | | DoneMsg doneMsg = new DoneMsg(serverId, |
| | | initializeMessage.getDestination()); |
| | | broker.publish(doneMsg); |
| | | |
| | | releaseIEContext(); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // Notify the peer of the failure |
| | | ErrorMsg errorMsg = |
| | | new ErrorMsg(target, |
| | | de.getMessageObject()); |
| | | broker.publish(errorMsg); |
| | | |
| | | if (contextAcquired) |
| | | releaseIEContext(); |
| | | |
| | | throw(de); |
| | | } |
| | | |
| | | msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get( |
| | | Short.toString(serverId), |
| | | baseDn.toString(), |
| | | Short.toString(requestorID)); |
| | | logError(msg); |
| | | } |
| | | |
| | | /** |
| | | * Process backend before import. |
| | | * @param backend The backend. |
| | | * @throws Exception |
| | |
| | | } |
| | | |
| | | /** |
| | | * Initializes the domain's backend with received entries. |
| | | * @param initializeMessage The message that initiated the import. |
| | | * @exception DirectoryException Thrown when an error occurs. |
| | | * This method should trigger an import of the replicated data. |
| | | * |
| | | * @param input The InputStream from which |
| | | * @throws DirectoryException When needed. |
| | | */ |
| | | protected void initialize(InitializeTargetMsg initializeMessage) |
| | | throws DirectoryException |
| | | public void importBackend(InputStream input) throws DirectoryException |
| | | { |
| | | LDIFImportConfig importConfig = null; |
| | | DirectoryException de = null; |
| | | |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get( |
| | | Short.toString(serverId), |
| | | baseDn.toString(), |
| | | Long.toString(initializeMessage.getRequestorID())); |
| | | logError(msg); |
| | | |
| | | // Go into full update status |
| | | // Use synchronized as somebody could ask another status change at the same |
| | | // time |
| | | synchronized (status) |
| | | { |
| | | StatusMachineEvent event = StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT; |
| | | ServerStatus newStatus = |
| | | StatusMachine.computeNewStatus(status, event); |
| | | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(), |
| | | Short.toString(serverId), status.toString(), event.toString()); |
| | | logError(msg); |
| | | return; |
| | | } |
| | | |
| | | // Store new status |
| | | status = newStatus; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDn + |
| | | " new status is: " + status); |
| | | |
| | | // Perform whatever actions are needed to apply properties for being |
| | | // compliant with new status |
| | | updateDomainForNewStatus(); |
| | | } |
| | | |
| | | Backend backend = retrievesBackend(baseDn); |
| | | |
| | | try |
| | |
| | | } |
| | | else |
| | | { |
| | | if (initializeMessage.getRequestorID() == serverId) |
| | | { |
| | | // The import responds to a request we did so the IEContext |
| | | // is already acquired |
| | | } |
| | | else |
| | | { |
| | | acquireIEContext(); |
| | | } |
| | | |
| | | ieContext.importSource = initializeMessage.getsenderID(); |
| | | ieContext.entryLeftCount = initializeMessage.getEntryCount(); |
| | | ieContext.setCounters(initializeMessage.getEntryCount(), |
| | | initializeMessage.getEntryCount()); |
| | | |
| | | preBackendImport(backend); |
| | | |
| | | ieContext.ldifImportInputStream = new ReplLDIFInputStream(this); |
| | | importConfig = |
| | | new LDIFImportConfig(ieContext.ldifImportInputStream); |
| | | new LDIFImportConfig(input); |
| | | List<DN> includeBranches = new ArrayList<DN>(); |
| | | includeBranches.add(this.baseDn); |
| | | importConfig.setIncludeBranches(includeBranches); |
| | |
| | | |
| | | // TODO How to deal with rejected entries during the import |
| | | importConfig.writeRejectedEntries( |
| | | getFileForPath("logs" + File.separator + |
| | | "replInitRejectedEntries").getAbsolutePath(), |
| | | ExistingFileBehavior.OVERWRITE); |
| | | getFileForPath("logs" + File.separator + |
| | | "replInitRejectedEntries").getAbsolutePath(), |
| | | ExistingFileBehavior.OVERWRITE); |
| | | |
| | | // Process import |
| | | preBackendImport(backend); |
| | | backend.importLDIF(importConfig); |
| | | |
| | | stateSavingDisabled = false; |
| | |
| | | } |
| | | finally |
| | | { |
| | | if ((ieContext != null) && (ieContext.exception != null)) |
| | | de = ieContext.exception; |
| | | |
| | | // Cleanup |
| | | if (importConfig != null) |
| | | { |
| | |
| | | TRACER.debugInfo( |
| | | "After import, the replication plugin restarts connections" + |
| | | " to all RSs to provide new generation ID=" + generationId); |
| | | broker.setGenerationId(generationId); |
| | | } |
| | | catch (DirectoryException fe) |
| | | { |
| | |
| | | if (de == null) |
| | | de = fe; |
| | | } |
| | | |
| | | // Re-exchange generationID and state with RS |
| | | broker.reStart(); |
| | | |
| | | // Update the task that initiated the import |
| | | if ((ieContext != null ) && (ieContext.initializeTask != null)) |
| | | { |
| | | ((InitializeTask)ieContext.initializeTask). |
| | | updateTaskCompletionState(de); |
| | | } |
| | | releaseIEContext(); |
| | | } |
| | | // Sends up the root error. |
| | | if (de != null) |
| | | { |
| | | throw de; |
| | | } |
| | | |
| | | msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( |
| | | Short.toString(serverId), |
| | | baseDn.toString(), |
| | | Long.toString(initializeMessage.getRequestorID())); |
| | | logError(msg); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @throws DirectoryException When an error occurred or no domain |
| | | * match the provided baseDn. |
| | | */ |
| | | public static ReplicationDomain retrievesReplicationDomain(DN baseDn) |
| | | public static LDAPReplicationDomain retrievesReplicationDomain(DN baseDn) |
| | | throws DirectoryException |
| | | { |
| | | ReplicationDomain replicationDomain = null; |
| | | LDAPReplicationDomain replicationDomain = null; |
| | | |
| | | // Retrieves the domain |
| | | DirectoryServer.getSynchronizationProviders(); |
| | |
| | | } |
| | | |
| | | // From the domainDN retrieves the replication domain |
| | | ReplicationDomain sdomain = |
| | | LDAPReplicationDomain sdomain = |
| | | MultimasterReplication.findDomain(baseDn, null); |
| | | if (sdomain == null) |
| | | { |
| | |
| | | return retrievesBackend(baseDn); |
| | | } |
| | | |
| | | /** |
| | | * Returns a boolean indicating if an import or export is currently |
| | | * processed. |
| | | * @return The status |
| | | */ |
| | | public boolean ieRunning() |
| | | { |
| | | return (ieContext != null); |
| | | } |
| | | /* |
| | | * <<Total Update |
| | | */ |
| | |
| | | { |
| | | // Check that there is not already a domain with the same DN |
| | | DN dn = configuration.getBaseDN(); |
| | | ReplicationDomain domain = MultimasterReplication.findDomain(dn, null); |
| | | LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null); |
| | | if ((domain != null) && (domain.baseDn.equals(dn))) |
| | | { |
| | | Message message = ERR_SYNC_INVALID_DN.get(); |
| | |
| | | public ConfigChangeResult applyConfigurationChange( |
| | | ReplicationDomainCfg configuration) |
| | | { |
| | | // server id and base dn are readonly. |
| | | // isolationPolicy can be set immediately and will apply |
| | | // to the next updates. |
| | | // The other parameters needs to be renegociated with the ReplicationServer |
| | | // so that requires restarting the session with the ReplicationServer. |
| | | Boolean needToRestartSession = false; |
| | | Collection<String> newReplServers = configuration.getReplicationServer(); |
| | | |
| | | // A new session is necessary only when information regarding |
| | | // the connection is modified |
| | | if ((!(replicationServers.size() == newReplServers.size() |
| | | && replicationServers.containsAll(newReplServers))) || |
| | | window != configuration.getWindowSize() || |
| | | heartbeatInterval != configuration.getHeartbeatInterval()) |
| | | needToRestartSession = true; |
| | | |
| | | replicationServers = newReplServers; |
| | | window = configuration.getWindowSize(); |
| | | heartbeatInterval = configuration.getHeartbeatInterval(); |
| | | broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay, |
| | | maxSendQueue, maxSendDelay, window, heartbeatInterval); |
| | | isolationpolicy = configuration.getIsolationPolicy(); |
| | | |
| | | // To be able to stop and restart the broker properly just |
| | | // disable and enable the domain. That way a new session |
| | | // with the new configuration is available. |
| | | if (needToRestartSession) |
| | | { |
| | | this.disable(); |
| | | this.enable(); |
| | | } |
| | | changeConfig( |
| | | configuration.getReplicationServer(), |
| | | configuration.getWindowSize(), |
| | | configuration.getHeartbeatInterval()); |
| | | |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Check if the domain is connected to a ReplicationServer. |
| | | * Starts the Replication Domain. |
| | | */ |
| | | public void start() |
| | | { |
| | | // Create the ServerStateFlush thread |
| | | flushThread = new ServerStateFlush(); |
| | | flushThread.start(); |
| | | |
| | | startListenService(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void sessionInitiated( |
| | | ServerStatus initStatus, |
| | | ServerState replicationServerState, |
| | | ProtocolSession session) |
| | | { |
| | | super.sessionInitiated(initStatus, replicationServerState, session); |
| | | try |
| | | { |
| | | /* |
| | | * We must not publish changes to a replicationServer that has |
| | | * not seen all our previous changes because this could cause |
| | | * some other ldap servers to miss those changes. |
| | | * Check that the ReplicationServer has seen all our previous |
| | | * changes. |
| | | */ |
| | | ChangeNumber replServerMaxChangeNumber = |
| | | replicationServerState.getMaxChangeNumber(serverId); |
| | | |
| | | if (replServerMaxChangeNumber == null) |
| | | { |
| | | replServerMaxChangeNumber = new ChangeNumber(0, 0, serverId); |
| | | } |
| | | ChangeNumber ourMaxChangeNumber = |
| | | state.getMaxChangeNumber(serverId); |
| | | |
| | | if ((ourMaxChangeNumber != null) && |
| | | (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber))) |
| | | { |
| | | |
| | | // Replication server is missing some of our changes: let's |
| | | // send them to him. |
| | | |
| | | Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get(); |
| | | logError(message); |
| | | |
| | | /* |
| | | * Get all the changes that have not been seen by this |
| | | * replication server and populate the replayOperations |
| | | * list. |
| | | */ |
| | | InternalSearchOperation op = searchForChangedEntries( |
| | | baseDn, replServerMaxChangeNumber, this); |
| | | if (op.getResultCode() != ResultCode.SUCCESS) |
| | | { |
| | | /* |
| | | * An error happened trying to search for the updates |
| | | * This server will start accepting again new updates but |
| | | * some inconsistencies will stay between servers. |
| | | * Log an error for the repair tool |
| | | * that will need to re-synchronize the servers. |
| | | */ |
| | | message = ERR_CANNOT_RECOVER_CHANGES.get( |
| | | baseDn.toNormalizedString()); |
| | | logError(message); |
| | | } else |
| | | { |
| | | for (FakeOperation replayOp : replayOperations) |
| | | { |
| | | ChangeNumber cn = replayOp.getChangeNumber(); |
| | | /* |
| | | * Because the entry returned by the search operation |
| | | * can contain old historical information, it is |
| | | * possible that some of the FakeOperation are |
| | | * actually older than the |
| | | * Only send the Operation if it was newer than |
| | | * the last ChangeNumber known by the Replication Server. |
| | | */ |
| | | if (cn.newer(replServerMaxChangeNumber)) |
| | | { |
| | | message = |
| | | DEBUG_SENDING_CHANGE.get( |
| | | replayOp.getChangeNumber().toString()); |
| | | logError(message); |
| | | session.publish(replayOp.generateMessage()); |
| | | } |
| | | } |
| | | message = DEBUG_CHANGES_SENT.get(); |
| | | logError(message); |
| | | } |
| | | replayOperations.clear(); |
| | | } |
| | | } catch (Exception e) |
| | | { |
| | | Message message = ERR_PUBLISHING_FAKE_OPS.get( |
| | | baseDn.toNormalizedString(), |
| | | e.getLocalizedMessage() + stackTraceToSingleLineString(e)); |
| | | logError(message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Search for the changes that happened since fromChangeNumber |
| | | * based on the historical attribute. The only changes that will |
| | | * be send will be the one generated on the serverId provided in |
| | | * fromChangeNumber. |
| | | * @param baseDn the base DN |
| | | * @param fromChangeNumber The change number from which we want the changes |
| | | * @param resultListener that will process the entries returned. |
| | | * @return the internal search operation |
| | | * @throws Exception when raised. |
| | | */ |
| | | public static InternalSearchOperation searchForChangedEntries( |
| | | DN baseDn, |
| | | ChangeNumber fromChangeNumber, |
| | | InternalSearchListener resultListener) |
| | | throws Exception |
| | | { |
| | | InternalClientConnection conn = |
| | | InternalClientConnection.getRootConnection(); |
| | | Short serverId = fromChangeNumber.getServerId(); |
| | | |
| | | String maxValueForId = "ffffffffffffffff" + |
| | | String.format("%04x", serverId) + "ffffffff"; |
| | | |
| | | LDAPFilter filter = LDAPFilter.decode( |
| | | "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:" |
| | | + fromChangeNumber + ")(" + Historical.HISTORICALATTRIBUTENAME + |
| | | "<=dummy:" + maxValueForId + "))"); |
| | | |
| | | LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); |
| | | attrs.add(Historical.HISTORICALATTRIBUTENAME); |
| | | attrs.add(Historical.ENTRYUIDNAME); |
| | | attrs.add("*"); |
| | | return conn.processSearch( |
| | | new ASN1OctetString(baseDn.toString()), |
| | | SearchScope.WHOLE_SUBTREE, |
| | | DereferencePolicy.NEVER_DEREF_ALIASES, |
| | | 0, 0, false, filter, |
| | | attrs, |
| | | resultListener); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void handleInternalSearchEntry( |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultEntry searchEntry) |
| | | { |
| | | /* |
| | | * This call back is called at session establishment phase |
| | | * for each entry that has been changed by this server and the changes |
| | | * have not been sent to any Replication Server. |
| | | * The role of this method is to build equivalent operation from |
| | | * the historical information and add them in the replayOperations |
| | | * table. |
| | | */ |
| | | Iterable<FakeOperation> updates = |
| | | Historical.generateFakeOperations(searchEntry); |
| | | for (FakeOperation op : updates) |
| | | { |
| | | replayOperations.add(op); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public void handleInternalSearchReference( |
| | | InternalSearchOperation searchOperation, |
| | | SearchResultReference searchReference) |
| | | { |
| | | // TODO to be implemented |
| | | } |
| | | |
| | | |
| | | /** |
| | | * This method should return the total number of objects in the |
| | | * replicated domain. |
| | | * This count will be used for reporting. |
| | | * |
| | | * @return true if the server is connected, false if not. |
| | | * @throws DirectoryException when needed. |
| | | * |
| | | * @return The number of objects in the replication domain. |
| | | */ |
| | | public boolean isConnected() |
| | | public long countEntries() throws DirectoryException |
| | | { |
| | | if (broker != null) |
| | | return broker.isConnected(); |
| | | else |
| | | Backend backend = retrievesBackend(baseDn); |
| | | |
| | | if (!backend.supportsLDIFExport()) |
| | | { |
| | | Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get( |
| | | backend.getBackendID().toString()); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | |
| | | return backend.numSubordinates(baseDn, true) + 1; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public boolean processUpdate(UpdateMsg updateMsg) |
| | | { |
| | | if (updateMsg instanceof LDAPUpdateMsg) |
| | | { |
| | | LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg; |
| | | |
| | | // put the UpdateMsg in the RemotePendingChanges list. |
| | | remotePendingChanges.putRemoteUpdate(msg); |
| | | |
| | | // Put update message into the replay queue |
| | | // (block until some place in the queue is available) |
| | | UpdateToReplay updateToReplay = new UpdateToReplay(msg, this); |
| | | updateToReplayQueue.offer(updateToReplay); |
| | | |
| | | return false; |
| | | } |
| | | |
| | | // unknown message type, this should not happen, just ignore it. |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Determine whether the connection to the replication server is encrypted. |
| | | * @return true if the connection is encrypted, false otherwise. |
| | | * Monitoring information for the LDAPReplicationDomain. |
| | | * |
| | | * @return Monitoring attributes specific to the LDAPReplicationDomain. |
| | | */ |
| | | public boolean isSessionEncrypted() |
| | | public Collection<Attribute> getAdditionalMonitoring() |
| | | { |
| | | if (broker != null) |
| | | return broker.isSessionEncrypted(); |
| | | else |
| | | return false; |
| | | } |
| | | ArrayList<Attribute> attributes = new ArrayList<Attribute>(); |
| | | |
| | | /** |
| | | * Gets the info for DSs in the topology (except us). |
| | | * @return The info for DSs in the topology (except us) |
| | | */ |
| | | public List<DSInfo> getDsList() |
| | | { |
| | | return dsList; |
| | | } |
| | | /* get number of changes in the pending list */ |
| | | ReplicationMonitor.addMonitorData( |
| | | attributes, "pending-updates", getPendingUpdatesCount()); |
| | | |
| | | /** |
| | | * Gets the info for RSs in the topology (except the one we are connected |
| | | * to). |
| | | * @return The info for RSs in the topology (except the one we are connected |
| | | * to) |
| | | */ |
| | | public List<RSInfo> getRsList() |
| | | { |
| | | return rsList; |
| | | } |
| | | /* get number of changes successfully */ |
| | | ReplicationMonitor.addMonitorData(attributes, "replayed-updates-ok", |
| | | getNumReplayedPostOpCalled()); |
| | | |
| | | /** |
| | | * Tells if assured replication is enabled for this domain. |
| | | * @return True if assured replication is enabled for this domain. |
| | | */ |
| | | public boolean isAssured() |
| | | { |
| | | return assured; |
| | | } |
| | | /* get number of modify conflicts */ |
| | | ReplicationMonitor.addMonitorData(attributes, "resolved-modify-conflicts", |
| | | getNumResolvedModifyConflicts()); |
| | | |
| | | /** |
| | | * Gives the mode for the assured replication of the domain. |
| | | * @return The mode for the assured replication of the domain. |
| | | */ |
| | | public AssuredMode getAssuredMode() |
| | | { |
| | | return assuredMode; |
| | | } |
| | | /* get number of naming conflicts */ |
| | | ReplicationMonitor.addMonitorData(attributes, "resolved-naming-conflicts", |
| | | getNumResolvedNamingConflicts()); |
| | | |
| | | /** |
| | | * Gives the assured level of the replication of the domain. |
| | | * @return The assured level of the replication of the domain. |
| | | */ |
| | | public byte getAssuredSdLevel() |
| | | { |
| | | return assuredSdLevel; |
| | | } |
| | | /* get number of unresolved naming conflicts */ |
| | | ReplicationMonitor.addMonitorData(attributes, "unresolved-naming-conflicts", |
| | | getNumUnresolvedNamingConflicts()); |
| | | |
| | | /** |
| | | * Gets the group id for this domain. |
| | | * @return The group id for this domain. |
| | | */ |
| | | public byte getGroupId() |
| | | { |
| | | return groupId; |
| | | } |
| | | |
| | | /** |
| | | * Gets the referrals URLs this domain publishes. |
| | | * @return The referrals URLs this domain publishes. |
| | | */ |
| | | public List<String> getRefUrls() |
| | | { |
| | | return refUrls; |
| | | } |
| | | |
| | | /** |
| | | * Gets the status for this domain. |
| | | * @return The status for this domain. |
| | | */ |
| | | public ServerStatus getStatus() |
| | | { |
| | | return status; |
| | | return attributes; |
| | | } |
| | | } |