| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | * Portions Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.messages.ToolMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.Iterator; |
| | | |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.ServerState; |
| | |
| | | import org.opends.server.replication.protocol.RoutableMessage; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.protocol.ReplServerInfoMessage; |
| | | import org.opends.server.replication.protocol.MonitorMessage; |
| | | import org.opends.server.replication.protocol.MonitorRequestMessage; |
| | | import org.opends.server.replication.protocol.ResetGenerationId; |
| | | import org.opends.server.types.DN; |
| | | |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.util.TimeThread; |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | | /** |
| | |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /* Monitor data management */ |
| | | |
| | | // TODO: Remote monitor data cache lifetime is 500 ms/should be configurable |
| | | private long remoteMonitorDataLifeTime = 500; |
| | | |
| | | /* Search op on monitor data is processed by a worker thread. |
| | | * Requests are sent to the other RS,and responses are received by the |
| | | * listener threads. |
| | | * The worker thread is awoke on this semaphore, or on timeout. |
| | | */ |
| | | Semaphore remoteMonitorResponsesSemaphore; |
| | | |
| | | /* The date of the last time they have been elaborated */ |
| | | private long validityDate = 0; |
| | | |
| | | // For each LDAP server, its server state |
| | | private HashMap<Short, ServerState> LDAPStates = |
| | | new HashMap<Short, ServerState>(); |
| | | |
| | | // For each LDAP server, the last CN it published |
| | | private HashMap<Short, ChangeNumber> maxCNs = |
| | | new HashMap<Short, ChangeNumber>(); |
| | | |
| | | // For each LDAP server, an approximation of the date of the first missing |
| | | // change |
| | | private HashMap<Short, Long> approxFirstMissingDate = |
| | | new HashMap<Short, Long>(); |
| | | |
| | | /** |
| | | * Creates a new ReplicationServerDomain associated to the DN baseDn. |
| | | * |
| | |
| | | } |
| | | else |
| | | { |
| | | if (!rsh.getRemoteLDAPServers().isEmpty()) |
| | | if (rsh.hasRemoteLDAPServers()) |
| | | { |
| | | lDAPServersConnectedInTheTopology = true; |
| | | |
| | |
| | | // server connected |
| | | for (ServerHandler rsh : replicationServers.values()) |
| | | { |
| | | if (!rsh.getRemoteLDAPServers().isEmpty()) |
| | | if (rsh.hasRemoteLDAPServers()) |
| | | { |
| | | servers.add(rsh); |
| | | } |
| | |
| | | */ |
| | | public void process(RoutableMessage msg, ServerHandler senderHandler) |
| | | { |
| | | // A replication server is not expected to be the destination |
| | | // of a routable message except for an error message. |
| | | // Test the message for which a ReplicationServer is expected |
| | | // to be the destination |
| | | if (msg.getDestination() == this.replicationServer.getServerId()) |
| | | { |
| | | if (msg instanceof ErrorMessage) |
| | | { |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | errorMsg.getDetails())); |
| | | } |
| | | else if (msg instanceof MonitorRequestMessage) |
| | | { |
| | | MonitorRequestMessage replServerMonitorRequestMsg = |
| | | (MonitorRequestMessage) msg; |
| | | |
| | | MonitorMessage monitorMsg = |
| | | new MonitorMessage( |
| | | replServerMonitorRequestMsg.getDestination(), |
| | | replServerMonitorRequestMsg.getsenderID()); |
| | | |
| | | // Populate the RS state in the msg from the DbState |
| | | monitorMsg.setReplServerState(this.getDbServerState()); |
| | | |
| | | // Populate for each connected LDAP Server |
| | | // from the states stored in the serverHandler. |
| | | // - the server state |
| | | // - the older missing change |
| | | for (ServerHandler lsh : this.connectedServers.values()) |
| | | { |
| | | monitorMsg.setLDAPServerState( |
| | | lsh.getServerId(), |
| | | lsh.getServerState(), |
| | | lsh.getApproxFirstMissingDate()); |
| | | } |
| | | try |
| | | { |
| | | senderHandler.send(monitorMsg); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // We log the error. The requestor will detect a timeout or |
| | | // any other failure on the connection. |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( |
| | | Short.toString((msg.getDestination())))); |
| | | } |
| | | } |
| | | else if (msg instanceof MonitorMessage) |
| | | { |
| | | MonitorMessage monitorMsg = |
| | | (MonitorMessage) msg; |
| | | |
| | | receivesMonitorDataResponse(monitorMsg); |
| | | } |
| | | else |
| | | { |
| | |
| | | { |
| | | return replicationServer; |
| | | } |
| | | |
| | | /* |
| | | * Monitor Data generation |
| | | */ |
| | | |
| | | /** |
| | | * Retrieves the remote monitor data. |
| | | * |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | protected void retrievesRemoteMonitorData() |
| | | throws DirectoryException |
| | | { |
| | | if (validityDate > TimeThread.getTime()) |
| | | { |
| | | // The current data are still valid. No need to renew them. |
| | | return; |
| | | } |
| | | |
| | | // Clean |
| | | this.LDAPStates.clear(); |
| | | this.maxCNs.clear(); |
| | | |
| | | // Init the maxCNs of our direct LDAP servers from our own dbstate |
| | | for (ServerHandler rs : connectedServers.values()) |
| | | { |
| | | short serverID = rs.getServerId(); |
| | | ChangeNumber cn = rs.getServerState().getMaxChangeNumber(serverID); |
| | | if (cn == null) |
| | | { |
| | | // we have nothing in db for that server |
| | | cn = new ChangeNumber(0, 0 , serverID); |
| | | } |
| | | this.maxCNs.put(serverID, cn); |
| | | } |
| | | |
| | | ServerState replServerState = this.getDbServerState(); |
| | | Iterator<Short> it = replServerState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | short sid = it.next(); |
| | | ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid); |
| | | ChangeNumber maxCN = this.maxCNs.get(sid); |
| | | if ((maxCN != null) && (receivedCN.newer(maxCN))) |
| | | { |
| | | // We found a newer one |
| | | this.maxCNs.remove(sid); |
| | | this.maxCNs.put(sid, receivedCN); |
| | | } |
| | | } |
| | | |
| | | // Send Request to the other Replication Servers |
| | | if (remoteMonitorResponsesSemaphore == null) |
| | | { |
| | | remoteMonitorResponsesSemaphore = new Semaphore( |
| | | replicationServers.size() -1); |
| | | |
| | | sendMonitorDataRequest(); |
| | | |
| | | // Wait reponses from them or timeout |
| | | waitMonitorDataResponses(replicationServers.size()); |
| | | } |
| | | else |
| | | { |
| | | // The processing of renewing the monitor cache is already running |
| | | // We'll make it sleeping until the end |
| | | while (remoteMonitorResponsesSemaphore!=null) |
| | | { |
| | | waitMonitorDataResponses(1); |
| | | } |
| | | } |
| | | |
| | | // Now we have the expected answers of an error occured |
| | | validityDate = TimeThread.getTime() + remoteMonitorDataLifeTime; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | debugMonitorData(); |
| | | } |
| | | } |
| | | |
| | | private void debugMonitorData() |
| | | { |
| | | String mds = " Monitor data="; |
| | | Iterator<Short> ite = LDAPStates.keySet().iterator(); |
| | | while (ite.hasNext()) |
| | | { |
| | | Short sid = ite.next(); |
| | | ServerState ss = LDAPStates.get(sid); |
| | | mds += " LDAPState(" + sid + ")=" + ss.toString(); |
| | | } |
| | | Iterator<Short> itc = maxCNs.keySet().iterator(); |
| | | while (itc.hasNext()) |
| | | { |
| | | Short sid = itc.next(); |
| | | ChangeNumber cn = maxCNs.get(sid); |
| | | mds += " maxCNs(" + sid + ")=" + cn.toString(); |
| | | } |
| | | |
| | | mds += "--"; |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | mds); |
| | | } |
| | | |
| | | /** |
| | | * Sends a MonitorRequest message to all connected RS. |
| | | * @throws DirectoryException when a problem occurs. |
| | | */ |
| | | protected void sendMonitorDataRequest() |
| | | throws DirectoryException |
| | | { |
| | | try |
| | | { |
| | | for (ServerHandler rs : replicationServers.values()) |
| | | { |
| | | MonitorRequestMessage msg = new |
| | | MonitorRequestMessage(this.replicationServer.getServerId(), |
| | | rs.getServerId()); |
| | | rs.send(msg); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST.get(); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, |
| | | message, e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Wait for the expected count of received MonitorMessage. |
| | | * @param expectedResponses The number of expected answers. |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | protected void waitMonitorDataResponses(int expectedResponses) |
| | | throws DirectoryException |
| | | { |
| | | try |
| | | { |
| | | boolean allPermitsAcquired = |
| | | remoteMonitorResponsesSemaphore.tryAcquire( |
| | | expectedResponses, |
| | | (long) 500, TimeUnit.MILLISECONDS); |
| | | |
| | | if (!allPermitsAcquired) |
| | | { |
| | | logError(ERR_MISSING_REMOTE_MONITOR_DATA.get()); |
| | | } |
| | | else |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | "Successfully received all " + replicationServers.size() |
| | | + " expected monitor messages"); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage())); |
| | | } |
| | | finally |
| | | { |
| | | remoteMonitorResponsesSemaphore = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Processes a Monitor message receives from a remote Replication Server |
| | | * and stores the data received. |
| | | * |
| | | * @param msg The message to be processed. |
| | | */ |
| | | public void receivesMonitorDataResponse(MonitorMessage msg) |
| | | { |
| | | if (remoteMonitorResponsesSemaphore == null) |
| | | { |
| | | // Ignoring the remote monitor data because an error occured previously |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | // Here is the RS state : list <serverID, lastChangeNumber> |
| | | // For each LDAP Server, we keep the max CN accross the RSes |
| | | ServerState replServerState = msg.getReplServerState(); |
| | | Iterator<Short> it = replServerState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | short sid = it.next(); |
| | | ChangeNumber receivedCN = replServerState.getMaxChangeNumber(sid); |
| | | ChangeNumber maxCN = this.maxCNs.get(sid); |
| | | if (receivedCN.newer(maxCN)) |
| | | { |
| | | // We found a newer one |
| | | this.maxCNs.remove(sid); |
| | | this.maxCNs.put(sid, receivedCN); |
| | | } |
| | | } |
| | | |
| | | // Store the LDAP servers states |
| | | Iterator<Short> sidIterator = msg.iterator(); |
| | | while (sidIterator.hasNext()) |
| | | { |
| | | short sid = sidIterator.next(); |
| | | ServerState ss = msg.getLDAPServerState(sid); |
| | | this.LDAPStates.put(sid, ss); |
| | | this.approxFirstMissingDate.put(sid, |
| | | msg.getApproxFirstMissingDate(sid)); |
| | | } |
| | | |
| | | // Decreases the number of expected responses and potentially |
| | | // wakes up the waiting requestor thread. |
| | | remoteMonitorResponsesSemaphore.release(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // If an exception occurs while processing one of the expected message, |
| | | // the processing is aborted and the waiting thread is awoke. |
| | | remoteMonitorResponsesSemaphore.notifyAll(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Get the state of the LDAP server with the provided serverId. |
| | | * @param serverId The server ID. |
| | | * @return The server state. |
| | | */ |
| | | public ServerState getServerState(short serverId) |
| | | { |
| | | return LDAPStates.get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Get the highest know change number of the LDAP server with the provided |
| | | * serverId. |
| | | * @param serverId The server ID. |
| | | * @return The highest change number. |
| | | */ |
| | | public ChangeNumber getMaxCN(short serverId) |
| | | { |
| | | return maxCNs.get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Get an approximation of the date of the oldest missing changes. |
| | | * serverId. |
| | | * @param serverId The server ID. |
| | | * @return The approximation of the date of the oldest missing change. |
| | | */ |
| | | public Long getApproxFirstMissingDate(short serverId) |
| | | { |
| | | return approxFirstMissingDate.get(serverId); |
| | | } |
| | | |
| | | /** |
| | | * Get the number of missing change for the server with the provided state. |
| | | * @param state The provided server state. |
| | | * @return The number of missing changes. |
| | | */ |
| | | public int getMissingChanges(ServerState state) |
| | | { |
| | | // Traverse the max Cn transmitted by each server |
| | | // For each server, get the highest CN know from the current server |
| | | // Sum the difference betwenn the max and the last |
| | | int missingChanges = 0; |
| | | Iterator<Short> itc = maxCNs.keySet().iterator(); |
| | | while (itc.hasNext()) |
| | | { |
| | | Short sid = itc.next(); |
| | | ChangeNumber maxCN = maxCNs.get(sid); |
| | | ChangeNumber last = state.getMaxChangeNumber(sid); |
| | | if (last == null) |
| | | { |
| | | last = new ChangeNumber(0,0, sid); |
| | | } |
| | | int missingChangesFromSID = ChangeNumber.diffSeqNum(maxCN, last); |
| | | missingChanges += missingChangesFromSID; |
| | | } |
| | | return missingChanges; |
| | | } |
| | | } |