/*
|
* CDDL HEADER START
|
*
|
* The contents of this file are subject to the terms of the
|
* Common Development and Distribution License, Version 1.0 only
|
* (the "License"). You may not use this file except in compliance
|
* with the License.
|
*
|
* You can obtain a copy of the license at
|
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
|
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
|
* See the License for the specific language governing permissions
|
* and limitations under the License.
|
*
|
* When distributing Covered Code, include this CDDL HEADER in each
|
* file and include the License file at
|
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
|
* add the following below this CDDL HEADER, with the fields enclosed
|
* by brackets "[]" replaced with your own identifying information:
|
* Portions Copyright [yyyy] [name of copyright owner]
|
*
|
* CDDL HEADER END
|
*
|
*
|
* Copyright 2009-2010 Sun Microsystems, Inc.
|
* Portions copyright 2011 ForgeRock AS
|
*/
|
package org.opends.server.replication.server;
|
|
import static org.opends.messages.ReplicationMessages.ERR_RS_DN_DOES_NOT_MATCH;
|
import static org.opends.server.loggers.debug.DebugLogger.*;
|
|
import java.util.ArrayList;
|
import java.util.SortedSet;
|
import java.util.TreeSet;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import org.opends.messages.Message;
|
import org.opends.server.admin.std.server.MonitorProviderCfg;
|
import org.opends.server.api.MonitorProvider;
|
import org.opends.server.config.ConfigException;
|
import org.opends.server.core.DirectoryServer;
|
import org.opends.server.loggers.debug.DebugTracer;
|
import org.opends.server.replication.common.ChangeNumber;
|
import org.opends.server.replication.common.ServerState;
|
import org.opends.server.replication.protocol.UpdateMsg;
|
import org.opends.server.types.Attribute;
|
import org.opends.server.types.Attributes;
|
import org.opends.server.types.DirectoryException;
|
import org.opends.server.types.InitializationException;
|
import org.opends.server.types.ResultCode;
|
import org.opends.server.util.TimeThread;
|
|
/**
|
* This class implements a buffering/producer/consumer mechanism of
|
* replication changes (UpdateMsg) used inside the replication server.
|
*
|
* MessageHandlers are registered into Replication server domains.
|
* When an update message is received by a domain, the domain forwards
|
* the message to the registered message handlers.
|
* Message are buffered into a queue.
|
* Consumers are expected to come and consume the UpdateMsg from the queue.
|
*/
|
public class MessageHandler extends MonitorProvider<MonitorProviderCfg>
|
{
|
|
/**
|
* The tracer object for the debug logger.
|
*/
|
protected static final DebugTracer TRACER = getTracer();
|
/**
|
* UpdateMsg queue.
|
*/
|
private final MsgQueue msgQueue = new MsgQueue();
|
/**
|
* Late queue.
|
*/
|
protected MsgQueue lateQueue = new MsgQueue();
|
/**
|
* Local hosting RS.
|
*/
|
protected ReplicationServer replicationServer = null;
|
/**
|
* The URL of the hosting replication server.
|
*/
|
protected String replicationServerURL = null;
|
/**
|
* The serverID of the hosting replication server.
|
*/
|
protected int replicationServerId;
|
/**
|
* Specifies the related replication server domain based on serviceId(baseDn).
|
*/
|
protected ReplicationServerDomain replicationServerDomain = null;
|
/**
|
* Number of update sent to the server.
|
*/
|
protected int outCount = 0;
|
/**
|
* Number of updates received from the server.
|
*/
|
protected int inCount = 0;
|
/**
|
* Specifies the max queue size for this handler.
|
*/
|
protected int maxQueueSize = 5000;
|
/**
|
* Specifies the max queue size in bytes for this handler.
|
*/
|
protected int maxQueueBytesSize = maxQueueSize * 100;
|
/**
|
* Specifies whether the consumer is following the producer (is not late).
|
*/
|
protected boolean following = false;
|
/**
|
* Specifies the current serverState of this handler.
|
*/
|
private ServerState serverState;
|
/**
|
* Specifies the identifier of the service (usually the baseDn of the domain).
|
*/
|
private String serviceId = null;
|
/**
|
* Specifies whether the consumer is still active or not.
|
* If not active, the handler will not return any message.
|
* Called at the beginning of shutdown process.
|
*/
|
private boolean activeConsumer = true;
|
/**
|
* Set when ServerHandler is stopping.
|
*/
|
private AtomicBoolean shuttingDown = new AtomicBoolean(false);
|
|
/**
|
* Creates a new server handler instance with the provided socket.
|
* @param queueSize The maximum number of update that will be kept
|
* in memory by this ServerHandler.
|
* @param replicationServerURL The URL of the hosting replication server.
|
* @param replicationServerId The ID of the hosting replication server.
|
* @param replicationServer The hosting replication server.
|
*/
|
public MessageHandler(
|
int queueSize,
|
String replicationServerURL,
|
int replicationServerId,
|
ReplicationServer replicationServer)
|
{
|
this.maxQueueSize = queueSize;
|
this.maxQueueBytesSize = queueSize * 100;
|
this.replicationServerURL = replicationServerURL;
|
this.replicationServerId = replicationServerId;
|
this.replicationServer = replicationServer;
|
}
|
|
/**
|
* Add an update to the list of updates that must be sent to the server
|
* managed by this Handler.
|
*
|
* @param update The update that must be added to the list of updates of
|
* this handler.
|
* @param sourceHandler The source handler that generated the update.
|
*/
|
public void add(UpdateMsg update, MessageHandler sourceHandler)
|
{
|
synchronized (msgQueue)
|
{
|
/*
|
* If queue was empty the writer thread was probably asleep
|
* waiting for some changes, wake it up
|
*/
|
if (msgQueue.isEmpty())
|
msgQueue.notify();
|
|
msgQueue.add(update);
|
|
/* TODO : size should be configurable
|
* and larger than max-receive-queue-size
|
*/
|
while ((msgQueue.count() > maxQueueSize) ||
|
(msgQueue.bytesCount() > maxQueueBytesSize))
|
{
|
setFollowing(false);
|
msgQueue.removeFirst();
|
}
|
}
|
}
|
/**
|
* Set the shut down flag to true and returns the previous value of the flag.
|
* @return The previous value of the shut down flag
|
*/
|
public boolean engageShutdown()
|
{
|
// Use thread safe boolean
|
return shuttingDown.getAndSet(true);
|
}
|
|
/**
|
* Get an approximation of the delay by looking at the age of the oldest
|
* message that has not been sent to this server.
|
* This is an approximation because the age is calculated using the
|
* clock of the server where the replicationServer is currently running
|
* while it should be calculated using the clock of the server
|
* that originally processed the change.
|
*
|
* The approximation error is therefore the time difference between
|
*
|
* @return the approximate delay for the connected server.
|
*/
|
public long getApproxDelay()
|
{
|
long olderUpdateTime = getOlderUpdateTime();
|
if (olderUpdateTime == 0)
|
return 0;
|
|
long currentTime = TimeThread.getTime();
|
return ((currentTime - olderUpdateTime) / 1000);
|
}
|
|
/**
|
* Get the age of the older change that has not yet been replicated
|
* to the server handled by this ServerHandler.
|
* @return The age if the older change has not yet been replicated
|
* to the server handled by this ServerHandler.
|
*/
|
public Long getApproxFirstMissingDate()
|
{
|
Long result = (long) 0;
|
|
// Get the older CN received
|
ChangeNumber olderUpdateCN = getOlderUpdateCN();
|
if (olderUpdateCN != null)
|
{
|
// If not present in the local RS db,
|
// then approximate with the older update time
|
result = olderUpdateCN.getTime();
|
}
|
return result;
|
}
|
|
/**
|
* Returns the Replication Server Domain to which belongs this handler.
|
*
|
* @param createIfNotExist Creates the domain if it does not exist.
|
* @param waitConnections Waits for the Connections with other RS to
|
* be established before returning.
|
* @return The replication server domain.
|
*/
|
public ReplicationServerDomain getDomain(
|
boolean createIfNotExist, boolean waitConnections)
|
{
|
if (replicationServerDomain==null)
|
{
|
replicationServerDomain =
|
replicationServer.getReplicationServerDomain(
|
serviceId, createIfNotExist, waitConnections);
|
}
|
return replicationServerDomain;
|
}
|
|
/**
|
* Get the count of updates received from the server.
|
* @return the count of update received from the server.
|
*/
|
public int getInCount()
|
{
|
return inCount;
|
}
|
|
/**
|
* Retrieves a set of attributes containing monitor data that should be
|
* returned to the client if the corresponding monitor entry is requested.
|
*
|
* @return A set of attributes containing monitor data that should be
|
* returned to the client if the corresponding monitor entry is
|
* requested.
|
*/
|
@Override
|
public ArrayList<Attribute> getMonitorData()
|
{
|
ArrayList<Attribute> attributes = new ArrayList<Attribute>();
|
attributes.add(Attributes.create("handler", getMonitorInstanceName()));
|
attributes.add(
|
Attributes.create("queue-size", String.valueOf(msgQueue.count())));
|
attributes.add(
|
Attributes.create(
|
"queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
|
attributes.add(
|
Attributes.create(
|
"following", String.valueOf(following)));
|
return attributes;
|
}
|
|
/**
|
* Retrieves the name of this monitor provider. It should be unique among all
|
* monitor providers, including all instances of the same monitor provider.
|
*
|
* @return The name of this monitor provider.
|
*/
|
@Override
|
public String getMonitorInstanceName()
|
{
|
return "Message Handler";
|
}
|
|
/**
|
* Get the next update that must be sent to the consumer
|
* from the message queue or from the database.
|
*
|
* @param synchronous specifies what to do when the queue is empty.
|
* when true, the method blocks; when false the method return null.
|
* @return The next update that must be sent to the consumer.
|
* null when synchronous is false and queue is empty.
|
*/
|
protected UpdateMsg getnextMessage(boolean synchronous)
|
{
|
UpdateMsg msg;
|
while (activeConsumer == true)
|
{
|
if (following == false)
|
{
|
/* this server is late with regard to some other masters
|
* in the topology or just joined the topology.
|
* In such cases, we can't keep all changes in the queue
|
* without saturating the memory, we therefore use
|
* a lateQueue that is filled with a few changes from the changelogDB
|
* If this server is able to close the gap, it will start using again
|
* the regular msgQueue later.
|
*/
|
if (lateQueue.isEmpty())
|
{
|
/*
|
* Start from the server State
|
* Loop until the queue high mark or until no more changes
|
* for each known LDAP master
|
* get the next CSN after this last one :
|
* - try to get next from the file
|
* - if not found in the file
|
* - try to get the next from the queue
|
* select the smallest of changes
|
* check if it is in the memory tree
|
* yes : lock memory tree.
|
* check all changes from the list, remove the ones that
|
* are already sent
|
* unlock memory tree
|
* restart as usual
|
* load this change on the delayList
|
*
|
*/
|
ReplicationIteratorComparator comparator =
|
new ReplicationIteratorComparator();
|
SortedSet<ReplicationIterator> iteratorSortedSet =
|
new TreeSet<ReplicationIterator>(comparator);
|
try
|
{
|
/* fill the lateQueue */
|
for (int serverId : replicationServerDomain.getServers())
|
{
|
ChangeNumber lastCsn = serverState
|
.getMaxChangeNumber(serverId);
|
ReplicationIterator iterator = replicationServerDomain
|
.getChangelogIterator(serverId, lastCsn);
|
if (iterator != null)
|
{
|
if (iterator.getChange() != null)
|
{
|
iteratorSortedSet.add(iterator);
|
}
|
else
|
{
|
iterator.releaseCursor();
|
}
|
}
|
}
|
|
// The loop below relies on the fact that it is sorted based
|
// on the currentChange of each iterator to consider the next
|
// change across all servers.
|
//
|
// Hence it is necessary to remove and eventual add again an
|
// iterator when looping in order to keep consistent the order of
|
// the iterators (see ReplicationIteratorComparator.
|
while (!iteratorSortedSet.isEmpty()
|
&& (lateQueue.count() < 100)
|
&& (lateQueue.bytesCount() < 50000))
|
{
|
ReplicationIterator iterator = iteratorSortedSet
|
.first();
|
iteratorSortedSet.remove(iterator);
|
lateQueue.add(iterator.getChange());
|
if (iterator.next())
|
{
|
iteratorSortedSet.add(iterator);
|
}
|
else
|
{
|
iterator.releaseCursor();
|
}
|
}
|
}
|
finally
|
{
|
for (ReplicationIterator iterator : iteratorSortedSet)
|
{
|
iterator.releaseCursor();
|
}
|
}
|
|
/*
|
* If the late queue is empty then we could not find any
|
* messages in the replication log so the remote serevr is not
|
* late anymore.
|
*/
|
|
if (lateQueue.isEmpty())
|
{
|
synchronized (msgQueue)
|
{
|
if ((msgQueue.count() < maxQueueSize) &&
|
(msgQueue.bytesCount() < maxQueueBytesSize))
|
{
|
setFollowing(true);
|
}
|
}
|
} else
|
{
|
/*
|
* if the first change in the lateQueue is also on the regular
|
* queue, we can resume the processing from the regular queue
|
* -> set following to true and empty the lateQueue.
|
*/
|
msg = lateQueue.first();
|
synchronized (msgQueue)
|
{
|
if (msgQueue.contains(msg))
|
{
|
/* we finally catch up with the regular queue */
|
setFollowing(true);
|
lateQueue.clear();
|
UpdateMsg msg1;
|
do
|
{
|
msg1 = msgQueue.removeFirst();
|
} while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
|
this.updateServerState(msg);
|
return msg1;
|
}
|
}
|
}
|
} else
|
{
|
/* get the next change from the lateQueue */
|
msg = lateQueue.removeFirst();
|
this.updateServerState(msg);
|
return msg;
|
}
|
}
|
synchronized (msgQueue)
|
{
|
if (following == true)
|
{
|
try
|
{
|
while (msgQueue.isEmpty() && (following == true))
|
{
|
if (!synchronous)
|
return null;
|
msgQueue.wait(500);
|
if (!activeConsumer)
|
return null;
|
}
|
} catch (InterruptedException e)
|
{
|
return null;
|
}
|
msg = msgQueue.removeFirst();
|
|
if (this.updateServerState(msg))
|
{
|
/*
|
* Only push the message if it has not yet been seen
|
* by the other server.
|
* Otherwise just loop to select the next message.
|
*/
|
return msg;
|
}
|
}
|
}
|
/*
|
* Need to loop because following flag may have gone to false between
|
* the first check at the beginning of this method
|
* and the second check just above.
|
*/
|
}
|
return null;
|
}
|
|
/**
|
* Get the older Change Number for that server.
|
* Returns null when the queue is empty.
|
* @return The older change number.
|
*/
|
public ChangeNumber getOlderUpdateCN()
|
{
|
ChangeNumber result = null;
|
synchronized (msgQueue)
|
{
|
if (isFollowing())
|
{
|
if (msgQueue.isEmpty())
|
{
|
result = null;
|
} else
|
{
|
UpdateMsg msg = msgQueue.first();
|
result = msg.getChangeNumber();
|
}
|
} else
|
{
|
if (lateQueue.isEmpty())
|
{
|
// isFollowing is false AND lateQueue is empty
|
// We may be at the very moment when the writer has emptyed the
|
// lateQueue when it sent the last update. The writer will fill again
|
// the lateQueue when it will send the next update but we are not yet
|
// there. So let's take the last change not sent directly from
|
// the db.
|
|
ReplicationIteratorComparator comparator =
|
new ReplicationIteratorComparator();
|
SortedSet<ReplicationIterator> iteratorSortedSet =
|
new TreeSet<ReplicationIterator>(comparator);
|
try
|
{
|
// Build a list of candidates iterator (i.e. db i.e. server)
|
for (int serverId : replicationServerDomain.getServers())
|
{
|
// get the last already sent CN from that server
|
ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
|
// get an iterator in this server db from that last change
|
ReplicationIterator iterator =
|
replicationServerDomain.getChangelogIterator(serverId, lastCsn);
|
// if that iterator has changes, then it is a candidate
|
// it is added in the sorted list at a position given by its
|
// current change (see ReplicationIteratorComparator).
|
if (iterator != null)
|
{
|
if (iterator.getChange() != null)
|
{
|
iteratorSortedSet.add(iterator);
|
}
|
else
|
{
|
iterator.releaseCursor();
|
}
|
}
|
}
|
UpdateMsg msg = iteratorSortedSet.first().getChange();
|
result = msg.getChangeNumber();
|
} catch (Exception e)
|
{
|
result = null;
|
} finally
|
{
|
for (ReplicationIterator iterator : iteratorSortedSet)
|
{
|
iterator.releaseCursor();
|
}
|
}
|
} else
|
{
|
UpdateMsg msg = lateQueue.first();
|
result = msg.getChangeNumber();
|
}
|
}
|
}
|
return result;
|
}
|
|
/**
|
* Get the older update time for that server.
|
* @return The older update time.
|
*/
|
public long getOlderUpdateTime()
|
{
|
ChangeNumber olderUpdateCN = getOlderUpdateCN();
|
if (olderUpdateCN == null)
|
return 0;
|
return olderUpdateCN.getTime();
|
}
|
|
/**
|
* Get the count of updates sent to this server.
|
* @return The count of update sent to this server.
|
*/
|
public int getOutCount()
|
{
|
return outCount;
|
}
|
|
/**
|
* Get the number of message in the receive message queue.
|
* @return Size of the receive message queue.
|
*/
|
public int getRcvMsgQueueSize()
|
{
|
synchronized (msgQueue)
|
{
|
/*
|
* When the server is up to date or close to be up to date,
|
* the number of updates to be sent is the size of the receive queue.
|
*/
|
if (isFollowing())
|
return msgQueue.count();
|
else
|
{
|
/**
|
* When the server is not able to follow, the msgQueue
|
* may become too large and therefore won't contain all the
|
* changes. Some changes may only be stored in the backing DB
|
* of the servers.
|
* The total size of the receive queue is calculated by doing
|
* the sum of the number of missing changes for every dbHandler.
|
*/
|
ServerState dbState = replicationServerDomain.getDbServerState();
|
return ServerState.diffChanges(dbState, serverState);
|
}
|
}
|
}
|
|
/**
|
* Get the state of this server.
|
*
|
* @return ServerState the state for this server..
|
*/
|
public ServerState getServerState()
|
{
|
return serverState;
|
}
|
|
/**
|
* Get the name of the serviceId (usually baseDn) for this handler.
|
* @return The name of the serviceId.
|
*/
|
protected String getServiceId()
|
{
|
return serviceId;
|
}
|
|
/**
|
* Increase the counter of update received from the server.
|
*/
|
public void incrementInCount()
|
{
|
inCount++;
|
}
|
|
/**
|
* Increase the counter of updates sent to the server.
|
*/
|
public void incrementOutCount()
|
{
|
outCount++;
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public void initializeMonitorProvider(MonitorProviderCfg configuration)
|
throws ConfigException, InitializationException
|
{
|
// Nothing to do for now
|
}
|
|
/**
|
* Check if the LDAP server can follow the speed of the other servers.
|
* @return true when the server has all the not yet sent changes
|
* in its queue.
|
*/
|
public boolean isFollowing()
|
{
|
return following;
|
}
|
|
/**
|
* Set that the consumer is now becoming inactive and thus getNextMessage
|
* should not return any UpdateMsg any more.
|
* @param active the provided state of the consumer.
|
*/
|
public void setConsumerActive(boolean active)
|
{
|
this.activeConsumer = active;
|
}
|
|
/**
|
* Set the following flag of this server.
|
* @param following the value that should be set.
|
*/
|
private void setFollowing(boolean following)
|
{
|
this.following = following;
|
}
|
|
/**
|
* Set the initial value of the serverState for this handler.
|
* Expected to be done once, then the state will be updated using
|
* updateServerState() method.
|
* @param serverState the provided serverState.
|
* @exception DirectoryException raised when a problem occurs.
|
*/
|
public void setInitialServerState(ServerState serverState)
|
throws DirectoryException
|
{
|
this.serverState = serverState;
|
}
|
|
|
/**
|
* Set the serviceId (usually baseDn) for this handler. Expected to be done
|
* once and never changed during the handler life.
|
*
|
* @param serviceId The provided serviceId.
|
* @param isDataServer The handler is a dataServer
|
*
|
* @exception DirectoryException raised when a problem occurs.
|
*/
|
protected void setServiceIdAndDomain(String serviceId, boolean isDataServer)
|
throws DirectoryException
|
{
|
if (this.serviceId != null)
|
{
|
if (!this.serviceId.equalsIgnoreCase(serviceId))
|
{
|
Message message = ERR_RS_DN_DOES_NOT_MATCH.get(
|
this.serviceId.toString(),
|
serviceId.toString());
|
throw new DirectoryException(ResultCode.OTHER,
|
message, null);
|
}
|
}
|
else
|
{
|
this.serviceId = serviceId;
|
if (!serviceId.equalsIgnoreCase("cn=changelog"))
|
this.replicationServerDomain = getDomain(true, isDataServer);
|
}
|
}
|
|
/**
|
* Shutdown this handler.
|
*/
|
public void shutdown()
|
{
|
/*
|
* Shutdown ServerWriter
|
*/
|
synchronized (msgQueue)
|
{
|
msgQueue.clear();
|
msgQueue.notify();
|
msgQueue.notifyAll();
|
}
|
|
DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName());
|
}
|
|
/**
|
* Update the serverState with the last message sent.
|
*
|
* @param msg the last update sent.
|
* @return boolean indicating if the update was meaningful.
|
*/
|
public boolean updateServerState(UpdateMsg msg)
|
{
|
return serverState.update(msg.getChangeNumber());
|
}
|
|
/**
|
* Get the groupId of the hosting RS.
|
* @return the group id.
|
*/
|
public byte getLocalGroupId()
|
{
|
return replicationServer.getGroupId();
|
}
|
|
/**
|
* Get the serverId of the hosting replication server.
|
* @return the replication serverId.
|
*/
|
public int getReplicationServerId()
|
{
|
return this.replicationServerId;
|
}
|
}
|