/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2008-2009 Sun Microsystems, Inc. */ package org.opends.server.replication.service; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.replication.common.StatusMachine.*; import org.opends.server.replication.common.ChangeNumberGenerator; import java.io.BufferedOutputStream; import org.opends.server.types.Attribute; import org.opends.server.core.DirectoryServer; import java.util.Set; import org.opends.server.replication.common.DSInfo; import org.opends.server.replication.common.RSInfo; import java.util.HashMap; import java.util.Map; import org.opends.server.config.ConfigException; import java.util.Collection; import org.opends.server.replication.plugin.InitializeTargetTask; import org.opends.server.replication.plugin.InitializeTask; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.replication.protocol.ResetGenerationIdMsg; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.server.api.DirectoryThread; import org.opends.server.backends.task.Task; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.AssuredMode; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.common.StatusMachine; import org.opends.server.replication.common.StatusMachineEvent; import org.opends.server.replication.protocol.AckMsg; import org.opends.server.replication.protocol.ChangeStatusMsg; import org.opends.server.replication.protocol.DoneMsg; import org.opends.server.replication.protocol.EntryMsg; import org.opends.server.replication.protocol.ErrorMsg; import org.opends.server.replication.protocol.HeartbeatMsg; import org.opends.server.replication.protocol.InitializeRequestMsg; import org.opends.server.replication.protocol.InitializeTargetMsg; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.protocol.RoutableMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.types.DirectoryException; import org.opends.server.types.ResultCode; /** * This class should be used as a base for Replication implementations. *
* It is intended that developer in need of a replication mechanism * subclass this class with their own implementation. *
* The startup phase of the ReplicationDomain subclass, * should read the list of replication servers from the configuration, * instantiate a {@link ServerState} then start the publish service * by calling * {@link #startPublishService(Collection, int, long)}. * At this point it can start calling the {@link #publish(UpdateMsg)} * method if needed. *
* When the startup phase reach the point when the subclass is ready * to handle updates the Replication Domain implementation should call the * {@link #startListenService()} method. * At this point a Listener thread is created on the Replication Service * and which can start receiving updates. *
* When updates are received the Replication Service calls the * {@link #processUpdate(UpdateMsg)} method. * ReplicationDomain implementation should implement the appropriate code * for replaying the update on the local repository. * When fully done the subclass must call the * {@link #processUpdateDone(UpdateMsg, String)} method. * This allows to process the update asynchronously if necessary. * *
* To propagate changes to other replica, a ReplicationDomain implementation * must use the {@link #publish(UpdateMsg)} method. *
* If the Full Initialization process is needed then implementation * for {@link #importBackend(InputStream)} and * {@link #exportBackend(OutputStream)} must be * provided. *
* Full Initialization of a replica can be triggered by LDAP clients * by creating InitializeTasks or InitializeTargetTask. * Full initialization can also by triggered from the ReplicationDomain * implementation using methods {@link #initializeRemote(short)} * or {@link #initializeFromRemote(short)}. *
* At shutdown time, the {@link #stopDomain()} method should be called to
* cleanly stop the replication service.
*/
public abstract class ReplicationDomain
{
/**
* Current status for this replicated domain.
*/
private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
/**
* An identifier for the Replication Service.
* All Replication Domain using this identifier will be connected
* through the Replication Service.
*/
private final String serviceID;
/**
* The identifier of this Replication Domain inside the
* Replication Service.
* Each Domain must use a unique ServerID.
*/
private final short serverID;
/**
* The ReplicationBroker that is used by this ReplicationDomain to
* connect to the ReplicationService.
*/
private ReplicationBroker broker = null;
/**
* This Map is used to store all outgoing assured messages in order
* to be able to correlate all the coming back acks to the original
* operation.
*/
private final SortedMap
* The {@link #exportBackend(OutputStream)} will therefore be called
* on this server, and the {@link #importBackend(InputStream)}
* will be called on the remote server.
*
* The InputStream and OutpuStream given as a parameter to those
* methods will be connected through the replication protocol.
*
* @param target The server-id of the server that should be initialized.
* The target can be discovered using the
* {@link #getDsList()} method.
* @param initTask The task that triggers this initialization and that should
* be updated with its progress.
*
* @throws DirectoryException If it was not possible to publish the
* Initialization message to the Topology.
*/
public void initializeRemote(short target, Task initTask)
throws DirectoryException
{
initializeRemote(target, serverID, initTask);
}
/**
* Process the initialization of some other server or servers in the topology
* specified by the target argument when this initialization specifying the
* server that requests the initialization.
*
* @param target The target that should be initialized.
* @param requestorID The server that initiated the export.
* @param initTask The task that triggers this initialization and that should
* be updated with its progress.
*
* @exception DirectoryException When an error occurs.
*/
void initializeRemote(short target, short requestorID, Task initTask)
throws DirectoryException
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
Short.toString(serverID),
serviceID,
Short.toString(requestorID));
logError(msg);
boolean contextAcquired=false;
acquireIEContext(false);
contextAcquired = true;
ieContext.exportTarget = target;
if (initTask != null)
{
ieContext.initializeTask = initTask;
}
// The number of entries to be exported is the number of entries under
// the base DN entry and the base entry itself.
long entryCount = this.countEntries();
ieContext.setCounters(entryCount, entryCount);
// Send start message to the peer
InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
serviceID, serverID, target, requestorID, entryCount);
broker.publish(initializeMessage);
try
{
exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
// Notify the peer of the success
DoneMsg doneMsg = new DoneMsg(serverID,
initializeMessage.getDestination());
broker.publish(doneMsg);
releaseIEContext();
}
catch(DirectoryException de)
{
// Notify the peer of the failure
ErrorMsg errorMsg =
new ErrorMsg(target,
de.getMessageObject());
broker.publish(errorMsg);
if (contextAcquired)
releaseIEContext();
throw(de);
}
msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
Short.toString(serverID),
serviceID,
Short.toString(requestorID));
logError(msg);
}
/**
* Get the ServerState maintained by the Concrete class.
*
* @return the ServerState maintained by the Concrete class.
*/
public ServerState getServerState()
{
return state;
}
private synchronized void acquireIEContext(boolean importInProgress)
throws DirectoryException
{
if (ieContext != null)
{
// Rejects 2 simultaneous exports
Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
throw new DirectoryException(ResultCode.OTHER,
message);
}
ieContext = new IEContext(importInProgress);
}
private synchronized void releaseIEContext()
{
ieContext = null;
}
/**
* Processes an error message received while an import/export is
* on going.
* @param errorMsg The error message received.
*/
void abandonImportExport(ErrorMsg errorMsg)
{
// FIXME TBD Treat the case where the error happens while entries
// are being exported
if (debugEnabled())
TRACER.debugVerbose(
" abandonImportExport:" + this.serverID +
" serviceID: " + this.serviceID +
" Error Msg received: " + errorMsg);
if (ieContext != null)
{
ieContext.exception = new DirectoryException(ResultCode.OTHER,
errorMsg.getDetails());
if (ieContext.initializeTask instanceof InitializeTask)
{
// Update the task that initiated the import
((InitializeTask)ieContext.initializeTask).
updateTaskCompletionState(ieContext.exception);
releaseIEContext();
}
}
}
/**
* Receives bytes related to an entry in the context of an import to
* initialize the domain (called by ReplLDIFInputStream).
*
* @return The bytes. Null when the Done or Err message has been received
*/
byte[] receiveEntryBytes()
{
ReplicationMsg msg;
while (true)
{
try
{
msg = broker.receive();
if (debugEnabled())
TRACER.debugVerbose(
" sid:" + serverID +
" base DN:" + serviceID +
" Import EntryBytes received " + msg);
if (msg == null)
{
// The server is in the shutdown process
return null;
}
if (msg instanceof EntryMsg)
{
EntryMsg entryMsg = (EntryMsg)msg;
byte[] entryBytes = entryMsg.getEntryBytes();
ieContext.updateCounters(countEntryLimits(entryBytes));
return entryBytes;
}
else if (msg instanceof DoneMsg)
{
// This is the normal termination of the import
// No error is stored and the import is ended
// by returning null
return null;
}
else if (msg instanceof ErrorMsg)
{
// This is an error termination during the import
// The error is stored and the import is ended
// by returning null
ErrorMsg errorMsg = (ErrorMsg)msg;
ieContext.exception = new DirectoryException(
ResultCode.OTHER,
errorMsg.getDetails());
return null;
}
else
{
// Other messages received during an import are trashed
}
}
catch(Exception e)
{
// TODO: i18n
ieContext.exception = new DirectoryException(ResultCode.OTHER,
Message.raw("received an unexpected message type" +
e.getLocalizedMessage()));
}
}
}
/**
* Count the number of entries in the provided byte[].
* This is based on the hypothesis that the entries are separated
* by a "\n\n" String.
*
* @param entryBytes
* @return The number of entries in the provided byte[].
*/
private int countEntryLimits(byte[] entryBytes)
{
return countEntryLimits(entryBytes, 0, entryBytes.length);
}
/**
* Count the number of entries in the provided byte[].
* This is based on the hypothesis that the entries are separated
* by a "\n\n" String.
*
* @param entryBytes
* @return The number of entries in the provided byte[].
*/
private int countEntryLimits(byte[] entryBytes, int pos, int length)
{
int entryCount = 0;
int count = 0;
while (count<=length-2)
{
if ((entryBytes[pos+count] == '\n') && (entryBytes[pos+count+1] == '\n'))
{
entryCount++;
count++;
}
count++;
}
return entryCount;
}
/**
* Exports an entry in LDIF format.
*
* @param lDIFEntry The entry to be exported in byte[] form.
* @param pos The starting Position in the array.
* @param length Number of array elements to be copied.
*
* @throws IOException when an error occurred.
*/
void exportLDIFEntry(byte[] lDIFEntry, int pos, int length) throws IOException
{
// If an error was raised - like receiving an ErrorMsg
// we just let down the export.
if (ieContext.exception != null)
{
IOException ioe = new IOException(ieContext.exception.getMessage());
ieContext = null;
throw ioe;
}
EntryMsg entryMessage = new EntryMsg(
serverID, ieContext.exportTarget, lDIFEntry, pos, length);
broker.publish(entryMessage);
try
{
ieContext.updateCounters(countEntryLimits(lDIFEntry, pos, length));
}
catch (DirectoryException de)
{
throw new IOException(de.getMessage());
}
}
/**
* Initializes this domain from another source server.
*
* When this method is called, a request for initialization will
* be sent to the source server asking for initialization.
*
* The {@link #exportBackend(OutputStream)} will therefore be called
* on the source server, and the {@link #importBackend(InputStream)}
* will be called on his server.
*
* The InputStream and OutpuStream given as a parameter to those
* methods will be connected through the replication protocol.
*
* @param source The server-id of the source from which to initialize.
* The source can be discovered using the
* {@link #getDsList()} method.
*
* @throws DirectoryException If it was not possible to publish the
* Initialization message to the Topology.
*/
public void initializeFromRemote(short source)
throws DirectoryException
{
initializeFromRemote(source, null);
}
/**
* Initializes a remote server from this server.
*
* The {@link #exportBackend(OutputStream)} will therefore be called
* on this server, and the {@link #importBackend(InputStream)}
* will be called on the remote server.
*
* The InputStream and OutpuStream given as a parameter to those
* methods will be connected through the replication protocol.
*
* @param target The server-id of the server that should be initialized.
* The target can be discovered using the
* {@link #getDsList()} method.
*
* @throws DirectoryException If it was not possible to publish the
* Initialization message to the Topology.
*/
public void initializeRemote(short target) throws DirectoryException
{
initializeRemote(target, null);
}
/**
* Initializes this domain from another source server.
*
* When this method is called, a request for initialization will
* be sent to the source server asking for initialization.
*
* The {@link #exportBackend(OutputStream)} will therefore be called
* on the source server, and the {@link #importBackend(InputStream)}
* will be called on his server.
*
* The InputStream and OutpuStream given as a parameter to those
* methods will be connected through the replication protocol.
*
* @param source The server-id of the source from which to initialize.
* The source can be discovered using the
* {@link #getDsList()} method.
* @param initTask The task that launched the initialization
* and should be updated of its progress.
*
* @throws DirectoryException If it was not possible to publish the
* Initialization message to the Topology.
*/
public void initializeFromRemote(short source, Task initTask)
throws DirectoryException
{
if (debugEnabled())
TRACER.debugInfo("Entering initializeFromRemote");
acquireIEContext(true);
ieContext.initializeTask = initTask;
InitializeRequestMsg initializeMsg = new InitializeRequestMsg(
serviceID, serverID, source);
// Publish Init request msg
broker.publish(initializeMsg);
// .. we expect to receive entries or err after that
}
/**
* Initializes the domain's backend with received entries.
* @param initializeMessage The message that initiated the import.
* @exception DirectoryException Thrown when an error occurs.
*/
void initialize(InitializeTargetMsg initializeMessage)
throws DirectoryException
{
DirectoryException de = null;
Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
Short.toString(serverID),
serviceID,
Long.toString(initializeMessage.getRequestorID()));
logError(msg);
// Go into full update status
setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
if (initializeMessage.getRequestorID() == serverID)
{
// The import responds to a request we did so the IEContext
// is already acquired
}
else
{
acquireIEContext(true);
}
ieContext.importSource = initializeMessage.getsenderID();
ieContext.entryLeftCount = initializeMessage.getEntryCount();
ieContext.setCounters(
initializeMessage.getEntryCount(),
initializeMessage.getEntryCount());
try
{
importBackend(new ReplInputStream(this));
broker.reStart();
}
catch (DirectoryException e)
{
de = e;
}
finally
{
if ((ieContext != null) && (ieContext.exception != null))
de = ieContext.exception;
// Update the task that initiated the import
if ((ieContext != null ) && (ieContext.initializeTask != null))
{
((InitializeTask)ieContext.initializeTask).
updateTaskCompletionState(de);
}
releaseIEContext();
}
// Sends up the root error.
if (de != null)
{
throw de;
}
msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
Short.toString(serverID),
serviceID,
Long.toString(initializeMessage.getRequestorID()));
logError(msg);
}
/**
* Sets the status to a new value depending of the passed status machine
* event.
* @param event The event that may make the status be changed
*/
private void setNewStatus(StatusMachineEvent event)
{
ServerStatus newStatus =
StatusMachine.computeNewStatus(status, event);
if (newStatus == ServerStatus.INVALID_STATUS)
{
Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(serviceID,
Short.toString(serverID), status.toString(), event.toString());
logError(msg);
return;
}
if (newStatus != status)
{
// Reset status date
lastStatusChangeDate = new Date();
// Reset monitoring counters if reconnection
if (newStatus == ServerStatus.NOT_CONNECTED_STATUS)
resetMonitoringCounters();
// Store new status
status = newStatus;
if (debugEnabled())
TRACER.debugInfo("Replication domain " + serviceID +
" new status is: " + status);
// Perform whatever actions are needed to apply properties for being
// compliant with new status
updateDomainForNewStatus();
}
}
/**
* Returns a boolean indicating if an import or export is currently
* processed.
*
* @return The status
*/
public boolean ieRunning()
{
return (ieContext != null);
}
/**
* Check the value of the Replication Servers generation ID.
*
* @param generationID The expected value of the generation ID.
*
* @throws DirectoryException When the generation ID of the Replication
* Servers is not the expected value.
*/
private void checkGenerationID(long generationID) throws DirectoryException
{
boolean flag = false;
for (int i = 0; i< 10; i++)
{
for (RSInfo rsInfo : getRsList())
{
if (rsInfo.getGenerationId() == generationID)
{
flag = true;
break;
}
else
{
try
{
Thread.sleep(i*100);
} catch (InterruptedException e)
{
}
}
}
if (flag)
{
break;
}
}
if (!flag)
{
ResultCode resultCode = ResultCode.OTHER;
Message message = ERR_RESET_GENERATION_ID_FAILED.get(serviceID);
throw new DirectoryException(
resultCode, message);
}
}
/**
* Reset the Replication Log.
* Calling this method will remove all the Replication information that
* was kept on all the Replication Servers currently connected in the
* topology.
*
* @throws DirectoryException If this ReplicationDomain is not currently
* connected to a Replication Server or it
* was not possible to contact it.
*/
public void resetReplicationLog() throws DirectoryException
{
// Reset the Generation ID to -1 to clean the ReplicationServers.
resetGenerationId((long)-1);
// check that at least one ReplicationServer did change its generation-id
checkGenerationID(-1);
// Reconnect to the Replication Server so that it adopt our
// GenerationID.
disableService();
enableService();
resetGenerationId(getGenerationID());
// check that at least one ReplicationServer did change its generation-id
checkGenerationID(getGenerationID());
}
/**
* Reset the generationId of this domain in the whole topology.
* A message is sent to the Replication Servers for them to reset
* their change dbs.
*
* @param generationIdNewValue The new value of the generation Id.
* @throws DirectoryException When an error occurs
*/
public void resetGenerationId(Long generationIdNewValue)
throws DirectoryException
{
if (debugEnabled())
TRACER.debugInfo(
"Server id " + serverID + " and domain " + serviceID
+ "resetGenerationId" + generationIdNewValue);
if (!isConnected())
{
ResultCode resultCode = ResultCode.OTHER;
Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(serviceID);
throw new DirectoryException(
resultCode, message);
}
ResetGenerationIdMsg genIdMessage = null;
if (generationIdNewValue == null)
{
genIdMessage = new ResetGenerationIdMsg(this.getGenerationID());
}
else
{
genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
}
broker.publish(genIdMessage);
}
/*
******** End of The total Update code *********
*/
/*
******* Start of Monitoring Code **********
*/
/**
* Get the maximum receive window size.
*
* @return The maximum receive window size.
*/
int getMaxRcvWindow()
{
if (broker != null)
return broker.getMaxRcvWindow();
else
return 0;
}
/**
* Get the current receive window size.
*
* @return The current receive window size.
*/
int getCurrentRcvWindow()
{
if (broker != null)
return broker.getCurrentRcvWindow();
else
return 0;
}
/**
* Get the maximum send window size.
*
* @return The maximum send window size.
*/
int getMaxSendWindow()
{
if (broker != null)
return broker.getMaxSendWindow();
else
return 0;
}
/**
* Get the current send window size.
*
* @return The current send window size.
*/
int getCurrentSendWindow()
{
if (broker != null)
return broker.getCurrentSendWindow();
else
return 0;
}
/**
* Get the number of times the replication connection was lost.
* @return The number of times the replication connection was lost.
*/
int getNumLostConnections()
{
if (broker != null)
return broker.getNumLostConnections();
else
return 0;
}
/**
* Determine whether the connection to the replication server is encrypted.
* @return true if the connection is encrypted, false otherwise.
*/
boolean isSessionEncrypted()
{
if (broker != null)
return broker.isSessionEncrypted();
else
return false;
}
/**
* This method is called when the ReplicationDomain has completed the
* processing of a received update synchronously.
* In such cases the processUpdateDone () is called and the state
* is updated automatically.
*
* @param msg The UpdateMessage that was processed.
*/
void processUpdateDoneSynchronous(UpdateMsg msg)
{
// Warning: in synchronous mode, no way to tell the replay of an update went
// wrong Just put null in processUpdateDone so that if assured replication
// is used the ack is sent without error at replay flag.
processUpdateDone(msg, null);
state.update(msg.getChangeNumber());
}
/**
* Check if the domain is connected to a ReplicationServer.
*
* @return true if the server is connected, false if not.
*/
public boolean isConnected()
{
if (broker != null)
return broker.isConnected();
else
return false;
}
/**
* Get the name of the replicationServer to which this domain is currently
* connected.
*
* @return the name of the replicationServer to which this domain
* is currently connected.
*/
public String getReplicationServer()
{
if (broker != null)
return broker.getReplicationServer();
else
return "Not connected";
}
/**
* Gets the number of updates sent in assured safe read mode.
* @return The number of updates sent in assured safe read mode.
*/
public int getAssuredSrSentUpdates()
{
return assuredSrSentUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have been
* acknowledged without errors.
* @return The number of updates sent in assured safe read mode that have been
* acknowledged without errors.
*/
public int getAssuredSrAcknowledgedUpdates()
{
return assuredSrAcknowledgedUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged.
* @return The number of updates sent in assured safe read mode that have not
* been acknowledged.
*/
public int getAssuredSrNotAcknowledgedUpdates()
{
return assuredSrNotAcknowledgedUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged due to timeout error.
* @return The number of updates sent in assured safe read mode that have not
* been acknowledged due to timeout error.
*/
public int getAssuredSrTimeoutUpdates()
{
return assuredSrTimeoutUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged due to wrong status error.
* @return The number of updates sent in assured safe read mode that have not
* been acknowledged due to wrong status error.
*/
public int getAssuredSrWrongStatusUpdates()
{
return assuredSrWrongStatusUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged due to replay error.
* @return The number of updates sent in assured safe read mode that have not
* been acknowledged due to replay error.
*/
public int getAssuredSrReplayErrorUpdates()
{
return assuredSrReplayErrorUpdates.get();
}
/**
* Gets the number of updates sent in assured safe read mode that have not
* been acknowledged per server.
* @return The number of updates sent in assured safe read mode that have not
* been acknowledged per server.
*/
public Map
* After this method has been called, the Replication Service will start
* calling the {@link #processUpdate(UpdateMsg)}.
*
* This method must be called once and must be called after the
* {@link #startPublishService(Collection, int, long)}.
*
*/
public void startListenService()
{
//
// Create the listener thread
listenerThread = new ListenerThread(this);
listenerThread.start();
}
/**
* Temporarily disable the Replication Service.
* The Replication Service can be enabled again using
* {@link #enableService()}.
*
* It can be useful to disable the Replication Service when the
* repository where the replicated information is stored becomes
* temporarily unavailable and replicated updates can therefore not
* be replayed during a while.
*/
public void disableService()
{
// Stop the listener thread
if (listenerThread != null)
{
listenerThread.shutdown();
}
if (broker != null)
{
broker.stop();
}
// Wait for the listener thread to stop
if (listenerThread != null)
listenerThread.waitForShutdown();
}
/**
* Restart the Replication service after a {@link #disableService()}.
*
* The Replication Service will restart from the point indicated by the
* {@link ServerState} that was given as a parameter to the
* {@link #startPublishService(Collection, int, long)}
* at startup time.
* If some data have changed in the repository during the period of time when
* the Replication Service was disabled, this {@link ServerState} should
* therefore be updated by the Replication Domain subclass before calling
* this method.
*/
public void enableService()
{
broker.start();
// Create the listener thread
listenerThread = new ListenerThread(this);
listenerThread.start();
}
/**
* Definitively stops the Replication Service.
*/
public void stopDomain()
{
DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
disableService();
domains.remove(serviceID);
}
/**
* Change the ReplicationDomain parameters.
*
* @param replicationServers The new list of Replication Servers that this
* domain should now use.
* @param windowSize The window size that this domain should use.
* @param heartbeatInterval The heartbeatInterval that this domain should
* use.
* @param groupId The new group id to use
*/
public void changeConfig(
Collection
* This method will be called by a single thread and should therefore
* should not be blocking.
*
* @param updateMsg The {@link UpdateMsg} that was received.
*
* @return A boolean indicating if the processing is completed at return
* time.
* If
* It is useful for implementation needing to process the update in an
* asynchronous way or using several threads, but must be called even
* by implementation doing it in a synchronous, single-threaded way.
*
* @param msg The UpdateMsg whose processing was completed.
* @param replayErrorMsg if not null, this means an error occurred during the
* replay of this update, and this is the matching human readable message
* describing the problem.
*/
public void processUpdateDone(UpdateMsg msg, String replayErrorMsg)
{
broker.updateWindowAfterReplay();
// Send an ack if it was requested and the group id is the same of the RS
// one. Only Safe Read mode makes sense in DS for returning an ack.
byte rsGroupId = broker.getRsGroupId();
if (msg.isAssured())
{
// Assured feature is supported starting from replication protocol V2
if (broker.getProtocolVersion() >=
ProtocolVersion.REPLICATION_PROTOCOL_V2)
{
AssuredMode msgAssuredMode = msg.getAssuredMode();
if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
{
if (rsGroupId == groupId)
{
// Send the ack
AckMsg ackMsg = new AckMsg(msg.getChangeNumber());
if (replayErrorMsg != null)
{
// Mark the error in the ack
// -> replay error occured
ackMsg.setHasReplayError(true);
// -> replay error occured in our server
List
* The Replication Service will handle the delivery of this {@link UpdateMsg}
* to all the participants of this Replication Domain.
* These members will be receive this {@link UpdateMsg} through a call
* of the {@link #processUpdate(UpdateMsg)} message.
*
* @param msg The UpdateMsg that should be pushed.
*/
public void publish(UpdateMsg msg)
{
// Publish the update
broker.publish(msg);
state.update(msg.getChangeNumber());
numSentUpdates.incrementAndGet();
}
/**
* Publish informations to the Replication Service (not assured mode).
*
* @param msg The byte array containing the informations that should
* be sent to the remote entities.
*/
public void publish(byte[] msg)
{
UpdateMsg update;
synchronized (this)
{
update = new UpdateMsg(generator.newChangeNumber(), msg);
// If assured replication is configured, this will prepare blocking
// mechanism. If assured replication is disabled, this returns
// immediately
prepareWaitForAckIfAssuredEnabled(update);
publish(update);
}
try
{
// If assured replication is enabled, this will wait for the matching
// ack or time out. If assured replication is disabled, this returns
// immediately
waitForAckIfAssuredEnabled(update);
} catch (TimeoutException ex)
{
// This exception may only be raised if assured replication is
// enabled
Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(serviceID, Long.toString(
assuredTimeout), msg.toString());
logError(errorMsg);
}
}
/**
* This method should return the generationID to use for this
* ReplicationDomain.
* This method can be called at any time after the ReplicationDomain
* has been started.
*
* @return The GenerationID.
*/
public abstract long getGenerationID();
/**
* Subclasses should use this method to add additional monitoring
* information in the ReplicationDomain.
*
* @return Additional monitoring attributes that will be added in the
* ReplicationDomain monitoring entry.
*/
public Collection true is returned, no further
* processing is necessary.
*
* If false is returned, the subclass should
* call the method
* {@link #processUpdateDone(UpdateMsg, String)}
* and update the ServerState
* When this processing is complete.
*
*/
public abstract boolean processUpdate(UpdateMsg updateMsg);
/**
* This method must be called after each call to
* {@link #processUpdate(UpdateMsg)} when the processing of the update is
* completed.
*