/*
|
* CDDL HEADER START
|
*
|
* The contents of this file are subject to the terms of the
|
* Common Development and Distribution License, Version 1.0 only
|
* (the "License"). You may not use this file except in compliance
|
* with the License.
|
*
|
* You can obtain a copy of the license at
|
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
|
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
|
* See the License for the specific language governing permissions
|
* and limitations under the License.
|
*
|
* When distributing Covered Code, include this CDDL HEADER in each
|
* file and include the License file at
|
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
|
* add the following below this CDDL HEADER, with the fields enclosed
|
* by brackets "[]" replaced with your own identifying information:
|
* Portions Copyright [yyyy] [name of copyright owner]
|
*
|
* CDDL HEADER END
|
*
|
*
|
* Portions Copyright 2006-2008 Sun Microsystems, Inc.
|
*/
|
package org.opends.server.replication.server;
|
import org.opends.messages.Message;
|
import org.opends.messages.MessageBuilder;
|
|
import static org.opends.server.loggers.debug.DebugLogger.*;
|
|
import org.opends.server.loggers.debug.DebugTracer;
|
import static org.opends.server.loggers.ErrorLogger.logError;
|
import static org.opends.messages.ReplicationMessages.*;
|
import static org.opends.messages.ToolMessages.*;
|
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
|
|
import java.io.IOException;
|
import java.util.ArrayList;
|
import java.util.HashMap;
|
import java.util.LinkedHashSet;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Set;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.TimeUnit;
|
import java.util.Iterator;
|
|
import org.opends.server.replication.common.ChangeNumber;
|
import org.opends.server.replication.common.ServerState;
|
import org.opends.server.replication.protocol.AckMessage;
|
import org.opends.server.replication.protocol.ErrorMessage;
|
import org.opends.server.replication.protocol.RoutableMessage;
|
import org.opends.server.replication.protocol.UpdateMessage;
|
import org.opends.server.replication.protocol.ReplServerInfoMessage;
|
import org.opends.server.replication.protocol.MonitorMessage;
|
import org.opends.server.replication.protocol.MonitorRequestMessage;
|
import org.opends.server.replication.protocol.ResetGenerationId;
|
import org.opends.server.types.DN;
|
import org.opends.server.types.DirectoryException;
|
import org.opends.server.types.ResultCode;
|
import org.opends.server.util.TimeThread;
|
import com.sleepycat.je.DatabaseException;
|
|
/**
|
* This class define an in-memory cache that will be used to store
|
* the messages that have been received from an LDAP server or
|
* from another replication server and that should be forwarded to
|
* other servers.
|
*
|
* The size of the cache is set by configuration.
|
* If the cache becomes bigger than the configured size, the older messages
|
* are removed and should they be needed again must be read from the backing
|
* file
|
*
|
*
|
* it runs a thread that is responsible for saving the messages
|
* received to the disk and for trimming them
|
* Decision to trim can be based on disk space or age of the message
|
*/
|
public class ReplicationServerDomain
|
{
|
private Object flowControlLock = new Object();
|
private DN baseDn = null;
|
|
/*
|
* The following map contains one balanced tree for each replica ID
|
* to which we are currently publishing
|
* the first update in the balanced tree is the next change that we
|
* must push to this particular server
|
*
|
* We add new TreeSet in the HashMap when a new server register
|
* to this replication server.
|
*
|
*/
|
private Map<Short, ServerHandler> connectedServers =
|
new ConcurrentHashMap<Short, ServerHandler>();
|
|
/*
|
* This map contains one ServerHandler for each replication servers
|
* with which we are connected (so normally all the replication servers)
|
* the first update in the balanced tree is the next change that we
|
* must push to this particular server
|
*
|
* We add new TreeSet in the HashMap when a new replication server register
|
* to this replication server.
|
*/
|
|
private Map<Short, ServerHandler> replicationServers =
|
new ConcurrentHashMap<Short, ServerHandler>();
|
|
/*
|
* This map contains the List of updates received from each
|
* LDAP server
|
*/
|
private Map<Short, DbHandler> sourceDbHandlers =
|
new ConcurrentHashMap<Short, DbHandler>();
|
private ReplicationServer replicationServer;
|
|
/* GenerationId management */
|
private long generationId = -1;
|
private boolean generationIdSavedStatus = false;
|
|
/**
|
* The tracer object for the debug logger.
|
*/
|
private static final DebugTracer TRACER = getTracer();
|
|
/* Monitor data management */
|
|
// TODO: Remote monitor data cache lifetime is 500 ms/should be configurable
|
private long remoteMonitorDataLifeTime = 500;
|
|
/* Search op on monitor data is processed by a worker thread.
|
* Requests are sent to the other RS,and responses are received by the
|
* listener threads.
|
* The worker thread is awoke on this semaphore, or on timeout.
|
*/
|
Semaphore remoteMonitorResponsesSemaphore;
|
|
/* The date of the last time they have been elaborated */
|
private long validityDate = 0;
|
|
// For each LDAP server, its server state
|
private HashMap<Short, ServerState> LDAPStates =
|
new HashMap<Short, ServerState>();
|
|
// For each LDAP server, the last CN it published
|
private HashMap<Short, ChangeNumber> maxCNs =
|
new HashMap<Short, ChangeNumber>();
|
|
// For each LDAP server, an approximation of the date of the first missing
|
// change
|
private HashMap<Short, Long> approxFirstMissingDate =
|
new HashMap<Short, Long>();
|
|
/**
|
* Creates a new ReplicationServerDomain associated to the DN baseDn.
|
*
|
* @param baseDn The baseDn associated to the ReplicationServerDomain.
|
* @param replicationServer the ReplicationServer that created this
|
* replicationServer cache.
|
*/
|
public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer)
|
{
|
this.baseDn = baseDn;
|
this.replicationServer = replicationServer;
|
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In " + this.replicationServer.getMonitorInstanceName() +
|
" Created Cache for " + baseDn + " " +
|
stackTraceToSingleLineString(new Exception()));
|
}
|
|
/**
|
* Add an update that has been received to the list of
|
* updates that must be forwarded to all other servers.
|
*
|
* @param update The update that has been received.
|
* @param sourceHandler The ServerHandler for the server from which the
|
* update was received
|
* @throws IOException When an IO exception happens during the update
|
* processing.
|
*/
|
public void put(UpdateMessage update, ServerHandler sourceHandler)
|
throws IOException
|
{
|
/*
|
* TODO : In case that the source server is a LDAP server this method
|
* should check that change did get pushed to at least one
|
* other replication server before pushing it to the LDAP servers
|
*/
|
|
short id = update.getChangeNumber().getServerId();
|
sourceHandler.updateServerState(update);
|
sourceHandler.incrementInCount();
|
|
if (update.isAssured())
|
{
|
int count = this.NumServers();
|
if (count > 1)
|
{
|
if (sourceHandler.isReplicationServer())
|
ServerHandler.addWaitingAck(update, sourceHandler.getServerId(),
|
this, count - 1);
|
else
|
sourceHandler.addWaitingAck(update, count - 1);
|
}
|
else
|
{
|
sourceHandler.sendAck(update.getChangeNumber());
|
}
|
}
|
|
// look for the dbHandler that is responsible for the LDAP server which
|
// generated the change.
|
DbHandler dbHandler = null;
|
synchronized (sourceDbHandlers)
|
{
|
dbHandler = sourceDbHandlers.get(id);
|
if (dbHandler == null)
|
{
|
try
|
{
|
dbHandler = replicationServer.newDbHandler(id,
|
baseDn, generationId);
|
generationIdSavedStatus = true;
|
}
|
catch (DatabaseException e)
|
{
|
/*
|
* Because of database problem we can't save any more changes
|
* from at least one LDAP server.
|
* This replicationServer therefore can't do it's job properly anymore
|
* and needs to close all its connections and shutdown itself.
|
*/
|
MessageBuilder mb = new MessageBuilder();
|
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
|
mb.append(stackTraceToSingleLineString(e));
|
logError(mb.toMessage());
|
replicationServer.shutdown();
|
return;
|
}
|
sourceDbHandlers.put(id, dbHandler);
|
}
|
}
|
|
// Publish the messages to the source handler
|
dbHandler.add(update);
|
|
|
/*
|
* Push the message to the replication servers
|
*/
|
if (!sourceHandler.isReplicationServer())
|
{
|
for (ServerHandler handler : replicationServers.values())
|
{
|
handler.add(update, sourceHandler);
|
}
|
}
|
|
/*
|
* Push the message to the LDAP servers
|
*/
|
for (ServerHandler handler : connectedServers.values())
|
{
|
// don't forward the change to the server that just sent it
|
if (handler == sourceHandler)
|
{
|
continue;
|
}
|
|
handler.add(update, sourceHandler);
|
}
|
|
|
|
}
|
|
/**
|
* Create initialize context necessary for finding the changes
|
* that must be sent to a given LDAP or replication server.
|
*
|
* @param handler handler for the server that must be started
|
* @throws Exception when method has failed
|
* @return A boolean indicating if the start was successfull.
|
*/
|
public boolean startServer(ServerHandler handler) throws Exception
|
{
|
/*
|
* create the balanced tree that will be used to forward changes
|
*/
|
synchronized (connectedServers)
|
{
|
ServerHandler oldHandler = connectedServers.get(handler.getServerId());
|
|
if (connectedServers.containsKey(handler.getServerId()))
|
{
|
// looks like two LDAP servers have the same serverId
|
// log an error message and drop this connection.
|
Message message = ERR_DUPLICATE_SERVER_ID.get(
|
oldHandler.toString(), handler.toString(), handler.getServerId());
|
logError(message);
|
return false;
|
}
|
connectedServers.put(handler.getServerId(), handler);
|
|
// It can be that the server that connects here is the
|
// first server connected for a domain.
|
// In that case, we will establish the appriopriate connections
|
// to the other repl servers for this domain and receive
|
// their ReplServerInfo messages.
|
// FIXME: Is it necessary to end this above processing BEFORE listening
|
// to incoming messages for that domain ? But the replica
|
// would raise Read Timeout for replica that connects.
|
|
// Update the remote replication servers with our list
|
// of connected LDAP servers
|
sendReplServerInfo();
|
|
return true;
|
}
|
}
|
|
/**
|
* Stop operations with a given server.
|
*
|
* @param handler the server for which we want to stop operations
|
*/
|
public void stopServer(ServerHandler handler)
|
{
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In RS " + this.replicationServer.getMonitorInstanceName() +
|
" for " + baseDn + " " +
|
" stopServer " + handler.getMonitorInstanceName());
|
|
handler.stopHandler();
|
|
if (handler.isReplicationServer())
|
{
|
replicationServers.remove(handler.getServerId());
|
}
|
else
|
{
|
connectedServers.remove(handler.getServerId());
|
}
|
|
mayResetGenerationId();
|
|
// Update the remote replication servers with our list
|
// of connected LDAP servers
|
sendReplServerInfo();
|
}
|
|
/**
|
* Resets the generationId for this domain if there is no LDAP
|
* server currently connected and if the generationId has never
|
* been saved.
|
*/
|
protected void mayResetGenerationId()
|
{
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In RS " + this.replicationServer.getMonitorInstanceName() +
|
" for " + baseDn + " " +
|
" mayResetGenerationId generationIdSavedStatus=" +
|
generationIdSavedStatus);
|
|
// If there is no more any LDAP server connected to this domain in the
|
// topology and the generationId has never been saved, then we can reset
|
// it and the next LDAP server to connect will become the new reference.
|
boolean lDAPServersConnectedInTheTopology = false;
|
if (connectedServers.isEmpty())
|
{
|
for (ServerHandler rsh : replicationServers.values())
|
{
|
if (generationId != rsh.getGenerationId())
|
{
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In RS " + this.replicationServer.getMonitorInstanceName() +
|
" for " + baseDn + " " +
|
" mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
|
" thas different genId");
|
}
|
else
|
{
|
if (rsh.hasRemoteLDAPServers())
|
{
|
lDAPServersConnectedInTheTopology = true;
|
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In RS " + this.replicationServer.getMonitorInstanceName() +
|
" for " + baseDn + " " +
|
" mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
|
" has servers connected to it - will not reset generationId");
|
}
|
}
|
}
|
}
|
else
|
{
|
lDAPServersConnectedInTheTopology = true;
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In RS " + this.replicationServer.getMonitorInstanceName() +
|
" for " + baseDn + " " +
|
" has servers connected to it - will not reset generationId");
|
}
|
|
if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus))
|
{
|
setGenerationId(-1, false);
|
}
|
}
|
|
/**
|
* Create initialize context necessary for finding the changes
|
* that must be sent to a given replication server.
|
*
|
* @param handler the server ID to which we want to forward changes
|
* @throws Exception in case of errors
|
* @return A boolean indicating if the start was successfull.
|
*/
|
public boolean startReplicationServer(ServerHandler handler) throws Exception
|
{
|
/*
|
* create the balanced tree that will be used to forward changes
|
*/
|
synchronized (replicationServers)
|
{
|
ServerHandler oldHandler = replicationServers.get(handler.getServerId());
|
if ((oldHandler != null))
|
{
|
if (oldHandler.getServerAddressURL().equals(
|
handler.getServerAddressURL()))
|
{
|
// this is the same server, this means that our ServerStart messages
|
// have been sent at about the same time and 2 connections
|
// have been established.
|
// Silently drop this connection.
|
}
|
else
|
{
|
// looks like two replication servers have the same serverId
|
// log an error message and drop this connection.
|
Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.
|
get(oldHandler.getServerAddressURL(),
|
handler.getServerAddressURL(), handler.getServerId());
|
logError(message);
|
}
|
return false;
|
}
|
replicationServers.put(handler.getServerId(), handler);
|
|
// Update this server with the list of LDAP servers
|
// already connected
|
handler.sendInfo(
|
new ReplServerInfoMessage(getConnectedLDAPservers(),generationId));
|
|
return true;
|
}
|
}
|
|
/**
|
* Get the next update that need to be sent to a given LDAP server.
|
* This call is blocking when no update is available or when dependencies
|
* do not allow to send the next available change
|
*
|
* @param handler The server handler for the target directory server.
|
*
|
* @return the update that must be forwarded
|
*/
|
public UpdateMessage take(ServerHandler handler)
|
{
|
UpdateMessage msg;
|
/*
|
* Get the balanced tree that we use to sort the changes to be
|
* sent to the replica from the cookie
|
*
|
* The next change to send is always the first one in the tree
|
* So this methods simply need to check that dependencies are OK
|
* and update this replicaId RUV
|
*
|
* TODO : dependency :
|
* before forwarding change, we should check that the dependency
|
* that is indicated in this change is OK (change already in the RUV)
|
*/
|
msg = handler.take();
|
synchronized (flowControlLock)
|
{
|
if (handler.restartAfterSaturation(null))
|
flowControlLock.notifyAll();
|
}
|
return msg;
|
}
|
|
/**
|
* Return a Set of String containing the lists of Replication servers
|
* connected to this server.
|
* @return the set of connected servers
|
*/
|
public Set<String> getChangelogs()
|
{
|
LinkedHashSet<String> mySet = new LinkedHashSet<String>();
|
|
for (ServerHandler handler : replicationServers.values())
|
{
|
mySet.add(handler.getServerAddressURL());
|
}
|
|
return mySet;
|
}
|
|
|
/**
|
* Return a Set containing the servers known by this replicationServer.
|
* @return a set containing the servers known by this replicationServer.
|
*/
|
public Set<Short> getServers()
|
{
|
return sourceDbHandlers.keySet();
|
}
|
|
/**
|
* Returns as a set of String the list of LDAP servers connected to us.
|
* Each string is the serverID of a connected LDAP server.
|
*
|
* @return The set of connected LDAP servers
|
*/
|
public List<String> getConnectedLDAPservers()
|
{
|
List<String> mySet = new ArrayList<String>(0);
|
|
for (ServerHandler handler : connectedServers.values())
|
{
|
mySet.add(String.valueOf(handler.getServerId()));
|
}
|
return mySet;
|
}
|
|
/**
|
* Creates and returns an iterator.
|
*
|
* @param serverId Identifier of the server for which the iterator is created.
|
* @param changeNumber Starting point for the iterator.
|
* @return the created ReplicationIterator.
|
*/
|
public ReplicationIterator getChangelogIterator(short serverId,
|
ChangeNumber changeNumber)
|
{
|
DbHandler handler = sourceDbHandlers.get(serverId);
|
if (handler == null)
|
return null;
|
|
try
|
{
|
return handler.generateIterator(changeNumber);
|
}
|
catch (Exception e) {
|
return null;
|
}
|
}
|
|
/**
|
* Returns the change count for that ReplicationServerDomain.
|
*
|
* @return the change count.
|
*/
|
public long getChangesCount()
|
{
|
long entryCount = 0;
|
for (DbHandler dbHandler : sourceDbHandlers.values())
|
{
|
entryCount += dbHandler.getChangesCount();
|
}
|
return entryCount;
|
}
|
|
/**
|
* Get the baseDn.
|
* @return Returns the baseDn.
|
*/
|
public DN getBaseDn()
|
{
|
return baseDn;
|
}
|
|
/**
|
* Sets the provided DbHandler associated to the provided serverId.
|
*
|
* @param serverId the serverId for the server to which is
|
* associated the Dbhandler.
|
* @param dbHandler the dbHandler associated to the serverId.
|
*
|
* @throws DatabaseException If a database error happened.
|
*/
|
public void setDbHandler(short serverId, DbHandler dbHandler)
|
throws DatabaseException
|
{
|
synchronized (sourceDbHandlers)
|
{
|
sourceDbHandlers.put(serverId , dbHandler);
|
}
|
}
|
|
/**
|
* Get the number of currently connected servers.
|
*
|
* @return the number of currently connected servers.
|
*/
|
private int NumServers()
|
{
|
return replicationServers.size() + connectedServers.size();
|
}
|
|
|
/**
|
* Add an ack to the list of ack received for a given change.
|
*
|
* @param message The ack message received.
|
* @param fromServerId The identifier of the server that sent the ack.
|
*/
|
public void ack(AckMessage message, short fromServerId)
|
{
|
/*
|
* there are 2 possible cases here :
|
* - the message that was acked comes from a server to which
|
* we are directly connected.
|
* In this case, we can find the handler from the connectedServers map
|
* - the message that was acked comes from a server to which we are not
|
* connected.
|
* In this case we need to find the replication server that forwarded
|
* the change and send back the ack to this server.
|
*/
|
ServerHandler handler = connectedServers.get(
|
message.getChangeNumber().getServerId());
|
if (handler != null)
|
handler.ack(message, fromServerId);
|
else
|
{
|
ServerHandler.ackChangelog(message, fromServerId);
|
}
|
}
|
|
/**
|
* Retrieves the destination handlers for a routable message.
|
*
|
* @param msg The message to route.
|
* @param senderHandler The handler of the server that published this message.
|
* @return The list of destination handlers.
|
*/
|
protected List<ServerHandler> getDestinationServers(RoutableMessage msg,
|
ServerHandler senderHandler)
|
{
|
List<ServerHandler> servers =
|
new ArrayList<ServerHandler>();
|
|
if (msg.getDestination() == RoutableMessage.THE_CLOSEST_SERVER)
|
{
|
// TODO Import from the "closest server" to be implemented
|
}
|
else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
|
{
|
if (!senderHandler.isReplicationServer())
|
{
|
// Send to all replication servers with a least one remote
|
// server connected
|
for (ServerHandler rsh : replicationServers.values())
|
{
|
if (rsh.hasRemoteLDAPServers())
|
{
|
servers.add(rsh);
|
}
|
}
|
}
|
|
// Sends to all connected LDAP servers
|
for (ServerHandler destinationHandler : connectedServers.values())
|
{
|
// Don't loop on the sender
|
if (destinationHandler == senderHandler)
|
continue;
|
servers.add(destinationHandler);
|
}
|
}
|
else
|
{
|
// Destination is one server
|
ServerHandler destinationHandler =
|
connectedServers.get(msg.getDestination());
|
if (destinationHandler != null)
|
{
|
servers.add(destinationHandler);
|
}
|
else
|
{
|
// the targeted server is NOT connected
|
// Let's search for THE changelog server that MAY
|
// have the targeted server connected.
|
if (senderHandler.isLDAPserver())
|
{
|
for (ServerHandler h : replicationServers.values())
|
{
|
// Send to all replication servers with a least one remote
|
// server connected
|
if (h.isRemoteLDAPServer(msg.getDestination()))
|
{
|
servers.add(h);
|
}
|
}
|
}
|
}
|
}
|
return servers;
|
}
|
|
/**
|
* Processes a message coming from one server in the topology
|
* and potentially forwards it to one or all other servers.
|
*
|
* @param msg The message received and to be processed.
|
* @param senderHandler The server handler of the server that emitted
|
* the message.
|
*/
|
public void process(RoutableMessage msg, ServerHandler senderHandler)
|
{
|
// Test the message for which a ReplicationServer is expected
|
// to be the destination
|
if (msg.getDestination() == this.replicationServer.getServerId())
|
{
|
if (msg instanceof ErrorMessage)
|
{
|
ErrorMessage errorMsg = (ErrorMessage)msg;
|
logError(ERR_ERROR_MSG_RECEIVED.get(
|
errorMsg.getDetails()));
|
}
|
else if (msg instanceof MonitorRequestMessage)
|
{
|
MonitorRequestMessage replServerMonitorRequestMsg =
|
(MonitorRequestMessage) msg;
|
|
MonitorMessage monitorMsg =
|
new MonitorMessage(
|
replServerMonitorRequestMsg.getDestination(),
|
replServerMonitorRequestMsg.getsenderID());
|
|
// Populate the RS state in the msg from the DbState
|
monitorMsg.setReplServerState(this.getDbServerState());
|
|
// Populate for each connected LDAP Server
|
// from the states stored in the serverHandler.
|
// - the server state
|
// - the older missing change
|
for (ServerHandler lsh : this.connectedServers.values())
|
{
|
monitorMsg.setLDAPServerState(
|
lsh.getServerId(),
|
lsh.getServerState(),
|
lsh.getApproxFirstMissingDate());
|
}
|
try
|
{
|
senderHandler.send(monitorMsg);
|
}
|
catch(Exception e)
|
{
|
// We log the error. The requestor will detect a timeout or
|
// any other failure on the connection.
|
logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
|
Short.toString((msg.getDestination()))));
|
}
|
}
|
else if (msg instanceof MonitorMessage)
|
{
|
MonitorMessage monitorMsg =
|
(MonitorMessage) msg;
|
|
receivesMonitorDataResponse(monitorMsg);
|
}
|
else
|
{
|
logError(NOTE_ERR_ROUTING_TO_SERVER.get(
|
msg.getClass().getCanonicalName()));
|
}
|
return;
|
}
|
|
List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
|
|
if (servers.isEmpty())
|
{
|
MessageBuilder mb = new MessageBuilder();
|
mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
|
mb.append(" In Replication Server=" + this.replicationServer.
|
getMonitorInstanceName());
|
mb.append(" domain =" + this.baseDn);
|
mb.append(" unroutable message =" + msg.toString());
|
mb.append(" routing table is empty");
|
ErrorMessage errMsg = new ErrorMessage(
|
this.replicationServer.getServerId(),
|
msg.getsenderID(),
|
mb.toMessage());
|
logError(mb.toMessage());
|
try
|
{
|
senderHandler.send(errMsg);
|
}
|
catch(IOException ioe)
|
{
|
// TODO Handle error properly (sender timeout in addition)
|
/*
|
* An error happened trying to send an error msg to this server.
|
* Log an error and close the connection to this server.
|
*/
|
MessageBuilder mb2 = new MessageBuilder();
|
mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
|
mb2.append(stackTraceToSingleLineString(ioe));
|
logError(mb2.toMessage());
|
senderHandler.shutdown();
|
}
|
}
|
else
|
{
|
for (ServerHandler targetHandler : servers)
|
{
|
try
|
{
|
targetHandler.send(msg);
|
}
|
catch(IOException ioe)
|
{
|
/*
|
* An error happened trying the send a routabled message
|
* to its destination server.
|
* Send back an error to the originator of the message.
|
*/
|
MessageBuilder mb = new MessageBuilder();
|
mb.append(ERR_CHANGELOG_ERROR_SENDING_MSG.get(this.toString()));
|
mb.append(stackTraceToSingleLineString(ioe));
|
mb.append(" ");
|
mb.append(msg.getClass().getCanonicalName());
|
logError(mb.toMessage());
|
|
MessageBuilder mb1 = new MessageBuilder();
|
mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
|
mb1.append("serverID:" + msg.getDestination());
|
ErrorMessage errMsg = new ErrorMessage(
|
msg.getsenderID(), mb1.toMessage());
|
try
|
{
|
senderHandler.send(errMsg);
|
}
|
catch(IOException ioe1)
|
{
|
// an error happened on the sender session trying to recover
|
// from an error on the receiver session.
|
// We don't have much solution left beside closing the sessions.
|
senderHandler.shutdown();
|
targetHandler.shutdown();
|
}
|
// TODO Handle error properly (sender timeout in addition)
|
}
|
}
|
}
|
|
}
|
|
/**
|
* Send back an ack to the server that sent the change.
|
*
|
* @param changeNumber The ChangeNumber of the change that must be acked.
|
* @param isLDAPserver This boolean indicates if the server that sent the
|
* change was an LDAP server or a ReplicationServer.
|
*/
|
public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
|
{
|
short serverId = changeNumber.getServerId();
|
sendAck(changeNumber, isLDAPserver, serverId);
|
}
|
|
/**
|
*
|
* Send back an ack to a server that sent the change.
|
*
|
* @param changeNumber The ChangeNumber of the change that must be acked.
|
* @param isLDAPserver This boolean indicates if the server that sent the
|
* change was an LDAP server or a ReplicationServer.
|
* @param serverId The identifier of the server from which we
|
* received the change..
|
*/
|
public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
|
short serverId)
|
{
|
ServerHandler handler;
|
if (isLDAPserver)
|
handler = connectedServers.get(serverId);
|
else
|
handler = replicationServers.get(serverId);
|
|
// TODO : check for null handler and log error
|
try
|
{
|
handler.sendAck(changeNumber);
|
} catch (IOException e)
|
{
|
/*
|
* An error happened trying the send back an ack to this server.
|
* Log an error and close the connection to this server.
|
*/
|
MessageBuilder mb = new MessageBuilder();
|
mb.append(ERR_CHANGELOG_ERROR_SENDING_ACK.get(this.toString()));
|
mb.append(stackTraceToSingleLineString(e));
|
logError(mb.toMessage());
|
handler.shutdown();
|
}
|
}
|
|
/**
|
* Shutdown this ReplicationServerDomain.
|
*/
|
public void shutdown()
|
{
|
// Close session with other changelogs
|
for (ServerHandler serverHandler : replicationServers.values())
|
{
|
serverHandler.shutdown();
|
}
|
|
// Close session with other LDAP servers
|
for (ServerHandler serverHandler : connectedServers.values())
|
{
|
serverHandler.shutdown();
|
}
|
|
// Shutdown the dbHandlers
|
synchronized (sourceDbHandlers)
|
{
|
for (DbHandler dbHandler : sourceDbHandlers.values())
|
{
|
dbHandler.shutdown();
|
}
|
sourceDbHandlers.clear();
|
}
|
}
|
|
/**
|
* Returns the ServerState describing the last change from this replica.
|
*
|
* @return The ServerState describing the last change from this replica.
|
*/
|
public ServerState getDbServerState()
|
{
|
ServerState serverState = new ServerState();
|
for (DbHandler db : sourceDbHandlers.values())
|
{
|
serverState.update(db.getLastChange());
|
}
|
return serverState;
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public String toString()
|
{
|
return "ReplicationServerDomain " + baseDn;
|
}
|
|
/**
|
* Check if some server Handler should be removed from flow control state.
|
* @throws IOException If an error happened.
|
*/
|
public void checkAllSaturation() throws IOException
|
{
|
for (ServerHandler handler : replicationServers.values())
|
{
|
handler.checkWindow();
|
}
|
|
for (ServerHandler handler : connectedServers.values())
|
{
|
handler.checkWindow();
|
}
|
}
|
|
/**
|
* Check if a server that was in flow control can now restart
|
* sending updates.
|
* @param sourceHandler The server that must be checked.
|
* @return true if the server can restart sending changes.
|
* false if the server can't restart sending changes.
|
*/
|
public boolean restartAfterSaturation(ServerHandler sourceHandler)
|
{
|
for (ServerHandler handler : replicationServers.values())
|
{
|
if (!handler.restartAfterSaturation(sourceHandler))
|
return false;
|
}
|
|
for (ServerHandler handler : connectedServers.values())
|
{
|
if (!handler.restartAfterSaturation(sourceHandler))
|
return false;
|
}
|
return true;
|
}
|
|
/**
|
* Send a ReplServerInfoMessage to all the connected replication servers
|
* in order to let them know our connected LDAP servers.
|
*/
|
private void sendReplServerInfo()
|
{
|
ReplServerInfoMessage info =
|
new ReplServerInfoMessage(getConnectedLDAPservers(), generationId);
|
for (ServerHandler handler : replicationServers.values())
|
{
|
try
|
{
|
handler.sendInfo(info);
|
}
|
catch (IOException e)
|
{
|
/*
|
* An error happened trying the send back an ack to this server.
|
* Log an error and close the connection to this server.
|
*/
|
MessageBuilder mb = new MessageBuilder();
|
mb.append(ERR_CHANGELOG_ERROR_SENDING_INFO.get(this.toString()));
|
mb.append(stackTraceToSingleLineString(e));
|
logError(mb.toMessage());
|
handler.shutdown();
|
}
|
}
|
}
|
|
/**
|
* Get the generationId associated to this domain.
|
*
|
* @return The generationId
|
*/
|
public long getGenerationId()
|
{
|
return generationId;
|
}
|
|
/**
|
* Get the generationId saved status.
|
*
|
* @return The generationId saved status.
|
*/
|
public boolean getGenerationIdSavedStatus()
|
{
|
return generationIdSavedStatus;
|
}
|
|
/**
|
* Sets the provided value as the new in memory generationId.
|
*
|
* @param generationId The new value of generationId.
|
* @param savedStatus The saved status of the generationId.
|
*/
|
synchronized public void setGenerationId(long generationId,
|
boolean savedStatus)
|
{
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In " + this.replicationServer.getMonitorInstanceName() +
|
" baseDN=" + baseDn +
|
" RCache.set GenerationId=" + generationId);
|
|
if (generationId == this.generationId)
|
return;
|
|
if (this.generationId>0)
|
{
|
for (ServerHandler handler : connectedServers.values())
|
{
|
handler.warnBadGenerationId();
|
}
|
}
|
|
this.generationId = generationId;
|
this.generationIdSavedStatus = savedStatus;
|
|
}
|
|
/**
|
* Resets the generationID.
|
*
|
* @param senderHandler The handler associated to the server
|
* that requested to reset the generationId.
|
* @param genIdMsg The reset generation ID msg received.
|
*/
|
public void resetGenerationId(ServerHandler senderHandler,
|
ResetGenerationId genIdMsg)
|
{
|
long newGenId = genIdMsg.getGenerationId();
|
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In " + this.replicationServer.getMonitorInstanceName() +
|
" baseDN=" + baseDn +
|
" RCache.resetGenerationId received new ref genId=" + newGenId);
|
|
// Notifies the remote LDAP servers that from now on
|
// they have a generationId different from the reference one
|
for (ServerHandler handler : connectedServers.values())
|
{
|
if (newGenId != handler.getGenerationId())
|
{
|
handler.warnBadGenerationId();
|
}
|
}
|
|
// If we are the first replication server warned,
|
// then forwards the reset message to the remote replication servers
|
for (ServerHandler rsHandler : replicationServers.values())
|
{
|
try
|
{
|
rsHandler.setGenerationId(newGenId);
|
if (senderHandler.isLDAPserver())
|
{
|
rsHandler.forwardGenerationIdToRS(genIdMsg);
|
}
|
}
|
catch (IOException e)
|
{
|
logError(ERR_CHANGELOG_ERROR_SENDING_INFO.
|
get(rsHandler.getMonitorInstanceName()));
|
}
|
}
|
|
clearDbs();
|
|
// Reset the in memory domain generationId
|
generationId = newGenId;
|
|
}
|
|
/**
|
* Clears the Db associated with that cache.
|
*/
|
public void clearDbs()
|
{
|
// Reset the localchange and state db for the current domain
|
synchronized (sourceDbHandlers)
|
{
|
for (DbHandler dbHandler : sourceDbHandlers.values())
|
{
|
try
|
{
|
dbHandler.clear();
|
}
|
catch (Exception e)
|
{
|
// TODO: i18n
|
logError(Message.raw(
|
"Exception caught while clearing dbHandler:" +
|
e.getLocalizedMessage()));
|
}
|
}
|
sourceDbHandlers.clear();
|
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In " + this.replicationServer.getMonitorInstanceName() +
|
" baseDN=" + baseDn +
|
" The source db handler has been cleared");
|
}
|
try
|
{
|
replicationServer.clearGenerationId(baseDn);
|
}
|
catch (Exception e)
|
{
|
// TODO: i18n
|
logError(Message.raw(
|
"Exception caught while clearing generationId:" +
|
e.getLocalizedMessage()));
|
}
|
}
|
|
/**
|
* Returns whether the provided server is in degraded
|
* state due to the fact that the peer server has an invalid
|
* generationId for this domain.
|
*
|
* @param serverId The serverId for which we want to know the
|
* the state.
|
* @return Whether it is degraded or not.
|
*/
|
|
public boolean isDegradedDueToGenerationId(short serverId)
|
{
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In " + this.replicationServer.getMonitorInstanceName() +
|
" baseDN=" + baseDn +
|
" isDegraded serverId=" + serverId +
|
" given local generation Id=" + this.generationId);
|
|
ServerHandler handler = replicationServers.get(serverId);
|
if (handler == null)
|
{
|
handler = connectedServers.get(serverId);
|
if (handler == null)
|
{
|
return false;
|
}
|
}
|
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In " + this.replicationServer.getMonitorInstanceName() +
|
" baseDN=" + baseDn +
|
" Compute degradation of serverId=" + serverId +
|
" LS server generation Id=" + handler.getGenerationId());
|
return (handler.getGenerationId() != this.generationId);
|
}
|
|
/**
|
* Return the associated replication server.
|
* @return The replication server.
|
*/
|
public ReplicationServer getReplicationServer()
|
{
|
return replicationServer;
|
}
|
|
/*
|
* Monitor Data generation
|
*/
|
|
/**
|
* Retrieves the remote monitor data.
|
*
|
* @throws DirectoryException When an error occurs.
|
*/
|
protected void retrievesRemoteMonitorData()
|
throws DirectoryException
|
{
|
if (validityDate > TimeThread.getTime())
|
{
|
// The current data are still valid. No need to renew them.
|
return;
|
}
|
|
// Clean
|
this.LDAPStates.clear();
|
this.maxCNs.clear();
|
|
// Init the maxCNs of our direct LDAP servers from our own dbstate
|
for (ServerHandler rs : connectedServers.values())
|
{
|
short serverID = rs.getServerId();
|
ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID);
|
if (cn == null)
|
{
|
// we have nothing in db for that server
|
cn = new ChangeNumber(0, 0 , serverID);
|
}
|
this.maxCNs.put(serverID, cn);
|
}
|
|
ServerState replServerState = this.getDbServerState();
|
Iterator<Short> it = replServerState.iterator();
|
while (it.hasNext())
|
{
|
short sid = it.next();
|
ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
|
ChangeNumber maxCN = this.maxCNs.get(sid);
|
if ((maxCN != null) && (receivedCN.newer(maxCN)))
|
{
|
// We found a newer one
|
this.maxCNs.remove(sid);
|
this.maxCNs.put(sid, receivedCN);
|
}
|
}
|
|
// Send Request to the other Replication Servers
|
if (remoteMonitorResponsesSemaphore == null)
|
{
|
remoteMonitorResponsesSemaphore = new Semaphore(
|
replicationServers.size() -1);
|
|
sendMonitorDataRequest();
|
|
// Wait reponses from them or timeout
|
waitMonitorDataResponses(replicationServers.size());
|
}
|
else
|
{
|
// The processing of renewing the monitor cache is already running
|
// We'll make it sleeping until the end
|
while (remoteMonitorResponsesSemaphore!=null)
|
{
|
waitMonitorDataResponses(1);
|
}
|
}
|
|
// Now we have the expected answers of an error occured
|
validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime;
|
|
if (debugEnabled())
|
{
|
debugMonitorData();
|
}
|
}
|
|
private void debugMonitorData()
|
{
|
String mds = " Monitor data=";
|
Iterator<Short> ite = LDAPStates.keySet().iterator();
|
while (ite.hasNext())
|
{
|
Short sid = ite.next();
|
ServerState ss = LDAPStates.get(sid);
|
mds += " LDAPState(" + sid + ")=" + ss.toString();
|
}
|
Iterator<Short> itc = maxCNs.keySet().iterator();
|
while (itc.hasNext())
|
{
|
Short sid = itc.next();
|
ChangeNumber cn = maxCNs.get(sid);
|
mds += " maxCNs(" + sid + ")=" + cn.toString();
|
}
|
|
mds += "--";
|
TRACER.debugInfo(
|
"In " + this.replicationServer.getMonitorInstanceName() +
|
" baseDN=" + baseDn +
|
mds);
|
}
|
|
/**
|
* Sends a MonitorRequest message to all connected RS.
|
* @throws DirectoryException when a problem occurs.
|
*/
|
protected void sendMonitorDataRequest()
|
throws DirectoryException
|
{
|
try
|
{
|
for (ServerHandler rs : replicationServers.values())
|
{
|
MonitorRequestMessage msg = new
|
MonitorRequestMessage(this.replicationServer.getServerId(),
|
rs.getServerId());
|
rs.send(msg);
|
}
|
}
|
catch(Exception e)
|
{
|
Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get();
|
logError(message);
|
throw new DirectoryException(ResultCode.OTHER,
|
message, e);
|
}
|
}
|
|
/**
|
* Wait for the expected count of received MonitorMessage.
|
* @param expectedResponses The number of expected answers.
|
* @throws DirectoryException When an error occurs.
|
*/
|
protected void waitMonitorDataResponses(int expectedResponses)
|
throws DirectoryException
|
{
|
try
|
{
|
boolean allPermitsAcquired =
|
remoteMonitorResponsesSemaphore.tryAcquire(
|
expectedResponses,
|
(long) 500, TimeUnit.MILLISECONDS);
|
|
if (!allPermitsAcquired)
|
{
|
logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
|
}
|
else
|
{
|
if (debugEnabled())
|
TRACER.debugInfo(
|
"In " + this.replicationServer.getMonitorInstanceName() +
|
"Successfully received all " + replicationServers.size()
|
+ " expected monitor messages");
|
}
|
}
|
catch(Exception e)
|
{
|
logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
|
}
|
finally
|
{
|
remoteMonitorResponsesSemaphore = null;
|
}
|
}
|
|
/**
|
* Processes a Monitor message receives from a remote Replication Server
|
* and stores the data received.
|
*
|
* @param msg The message to be processed.
|
*/
|
public void receivesMonitorDataResponse(MonitorMessage msg)
|
{
|
if (remoteMonitorResponsesSemaphore == null)
|
{
|
// Ignoring the remote monitor data because an error occured previously
|
return;
|
}
|
|
try
|
{
|
// Here is the RS state : list <serverID, lastChangeNumber>
|
// For each LDAP Server, we keep the max CN accross the RSes
|
ServerState replServerState = msg.getReplServerState();
|
Iterator<Short> it = replServerState.iterator();
|
while (it.hasNext())
|
{
|
short sid = it.next();
|
ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid);
|
ChangeNumber maxCN = this.maxCNs.get(sid);
|
if (receivedCN.newer(maxCN))
|
{
|
// We found a newer one
|
this.maxCNs.remove(sid);
|
this.maxCNs.put(sid, receivedCN);
|
}
|
}
|
|
// Store the LDAP servers states
|
Iterator<Short> sidIterator = msg.iterator();
|
while (sidIterator.hasNext())
|
{
|
short sid = sidIterator.next();
|
ServerState ss = msg.getLDAPServerState(sid);
|
this.LDAPStates.put(sid, ss);
|
this.approxFirstMissingDate.put(sid,
|
msg.getApproxFirstMissingDate(sid));
|
}
|
|
// Decreases the number of expected responses and potentially
|
// wakes up the waiting requestor thread.
|
remoteMonitorResponsesSemaphore.release();
|
}
|
catch (Exception e)
|
{
|
// If an exception occurs while processing one of the expected message,
|
// the processing is aborted and the waiting thread is awoke.
|
remoteMonitorResponsesSemaphore.notifyAll();
|
}
|
}
|
|
/**
|
* Get the state of the LDAP server with the provided serverId.
|
* @param serverId The server ID.
|
* @return The server state.
|
*/
|
public ServerState getServerState(short serverId)
|
{
|
return LDAPStates.get(serverId);
|
}
|
|
/**
|
* Get the highest know change number of the LDAP server with the provided
|
* serverId.
|
* @param serverId The server ID.
|
* @return The highest change number.
|
*/
|
public ChangeNumber getMaxCN(short serverId)
|
{
|
return maxCNs.get(serverId);
|
}
|
|
/**
|
* Get an approximation of the date of the oldest missing changes.
|
* serverId.
|
* @param serverId The server ID.
|
* @return The approximation of the date of the oldest missing change.
|
*/
|
public Long getApproxFirstMissingDate(short serverId)
|
{
|
return approxFirstMissingDate.get(serverId);
|
}
|
|
/**
|
* Get the number of missing change for the server with the provided state.
|
* @param state The provided server state.
|
* @return The number of missing changes.
|
*/
|
public int getMissingChanges(ServerState state)
|
{
|
// Traverse the max Cn transmitted by each server
|
// For each server, get the highest CN know from the current server
|
// Sum the difference betwenn the max and the last
|
int missingChanges = 0;
|
Iterator<Short> itc = maxCNs.keySet().iterator();
|
while (itc.hasNext())
|
{
|
Short sid = itc.next();
|
ChangeNumber maxCN = maxCNs.get(sid);
|
ChangeNumber last = state.getMaxChangeNumber(sid);
|
if (last == null)
|
{
|
last = new ChangeNumber(0,0, sid);
|
}
|
int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last);
|
missingChanges += missingChangesFromSID;
|
}
|
return missingChanges;
|
}
|
}
|