| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | |
| | | import org.opends.server.replication.protocol.RoutableMessage; |
| | | import org.opends.server.replication.protocol.UpdateMessage; |
| | | import org.opends.server.replication.protocol.ReplServerInfoMessage; |
| | | import org.opends.server.replication.protocol.ResetGenerationId; |
| | | import org.opends.server.types.DN; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | |
| | | new ConcurrentHashMap<Short, DbHandler>(); |
| | | private ReplicationServer replicationServer; |
| | | |
| | | /* GenerationId management */ |
| | | private long generationId = -1; |
| | | private boolean generationIdSavedStatus = false; |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * Creates a new ReplicationCache associated to the DN baseDn. |
| | | * |
| | |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.replicationServer = replicationServer; |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " Created Cache for " + baseDn + " " + |
| | | stackTraceToSingleLineString(new Exception())); |
| | | } |
| | | |
| | | /** |
| | | * Add an update that has been received to the list of |
| | |
| | | * other replication server before pushing it to the LDAP servers |
| | | */ |
| | | |
| | | short id = update.getChangeNumber().getServerId(); |
| | | sourceHandler.updateServerState(update); |
| | | sourceHandler.incrementInCount(); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | // look for the dbHandler that is responsible for the master server which |
| | | // look for the dbHandler that is responsible for the LDAP server which |
| | | // generated the change. |
| | | DbHandler dbHandler = null; |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | short id = update.getChangeNumber().getServerId(); |
| | | dbHandler = sourceDbHandlers.get(id); |
| | | if (dbHandler == null) |
| | | { |
| | | try |
| | | { |
| | | dbHandler = replicationServer.newDbHandler(id, baseDn); |
| | | } catch (DatabaseException e) |
| | | dbHandler = replicationServer.newDbHandler(id, |
| | | baseDn, generationId); |
| | | generationIdSavedStatus = true; |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | /* |
| | | * Because of database problem we can't save any more changes |
| | |
| | | } |
| | | connectedServers.put(handler.getServerId(), handler); |
| | | |
| | | // It can be that the server that connects here is the |
| | | // first server connected for a domain. |
| | | // In that case, we will establish the appriopriate connections |
| | | // to the other repl servers for this domain and receive |
| | | // their ReplServerInfo messages. |
| | | // FIXME: Is it necessary to end this above processing BEFORE listening |
| | | // to incoming messages for that domain ? But the replica |
| | | // would raise Read Timeout for replica that connects. |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | |
| | | */ |
| | | public void stopServer(ServerHandler handler) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " stopServer " + handler.getMonitorInstanceName()); |
| | | |
| | | handler.stopHandler(); |
| | | |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | replicationServers.remove(handler.getServerId()); |
| | | } |
| | | else |
| | | { |
| | | connectedServers.remove(handler.getServerId()); |
| | | } |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | mayResetGenerationId(); |
| | | |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendReplServerInfo(); |
| | | } |
| | | |
| | | /** |
| | | * Resets the generationId for this domain if there is no LDAP |
| | | * server currently connected and if the generationId has never |
| | | * been saved. |
| | | */ |
| | | protected void mayResetGenerationId() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " mayResetGenerationId generationIdSavedStatus=" + |
| | | generationIdSavedStatus); |
| | | |
| | | // If there is no more any LDAP server connected to this domain in the |
| | | // topology and the generationId has never been saved, then we can reset |
| | | // it and the next LDAP server to connect will become the new reference. |
| | | boolean lDAPServersConnectedInTheTopology = false; |
| | | if (connectedServers.isEmpty()) |
| | | { |
| | | for (ServerHandler rsh : replicationServers.values()) |
| | | { |
| | | if (generationId != rsh.getGenerationId()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() + |
| | | " thas different genId"); |
| | | } |
| | | else |
| | | { |
| | | if (!rsh.getRemoteLDAPServers().isEmpty()) |
| | | { |
| | | lDAPServersConnectedInTheTopology = true; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " mayResetGenerationId RS" + rsh.getMonitorInstanceName() + |
| | | " has servers connected to it - will not reset generationId"); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | lDAPServersConnectedInTheTopology = true; |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " has servers connected to it - will not reset generationId"); |
| | | } |
| | | |
| | | if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus)) |
| | | { |
| | | setGenerationId(-1, false); |
| | | } |
| | | } |
| | | |
| | |
| | | // Update this server with the list of LDAP servers |
| | | // already connected |
| | | handler.sendInfo( |
| | | new ReplServerInfoMessage(getConnectedLDAPservers())); |
| | | new ReplServerInfoMessage(getConnectedLDAPservers(),generationId)); |
| | | |
| | | return true; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * creates a new ReplicationDB with specified identifier. |
| | | * @param id the identifier of the new ReplicationDB. |
| | | * @param db the new db. |
| | | * Sets the provided DbHandler associated to the provided serverId. |
| | | * |
| | | * @param serverId the serverId for the server to which is |
| | | * associated the Dbhandler. |
| | | * @param dbHandler the dbHandler associated to the serverId. |
| | | * |
| | | * @throws DatabaseException If a database error happened. |
| | | */ |
| | | public void newDb(short id, DbHandler db) throws DatabaseException |
| | | public void setDbHandler(short serverId, DbHandler dbHandler) |
| | | throws DatabaseException |
| | | { |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | sourceDbHandlers.put(id , db); |
| | | sourceDbHandlers.put(serverId , dbHandler); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Process an InitializeRequestMessage. |
| | | * Processes a message coming from one server in the topology |
| | | * and potentially forwards it to one or all other servers. |
| | | * |
| | | * @param msg The message received and to be processed. |
| | | * @param senderHandler The server handler of the server that emitted |
| | |
| | | */ |
| | | public void process(RoutableMessage msg, ServerHandler senderHandler) |
| | | { |
| | | // A replication server is not expected to be the destination |
| | | // of a routable message except for an error message. |
| | | if (msg.getDestination() == this.replicationServer.getServerId()) |
| | | { |
| | | if (msg instanceof ErrorMessage) |
| | | { |
| | | ErrorMessage errorMsg = (ErrorMessage)msg; |
| | | logError(ERR_ERROR_MSG_RECEIVED.get( |
| | | errorMsg.getDetails())); |
| | | } |
| | | else |
| | | { |
| | | logError(NOTE_ERR_ROUTING_TO_SERVER.get( |
| | | msg.getClass().getCanonicalName())); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | List<ServerHandler> servers = getDestinationServers(msg, senderHandler); |
| | | |
| | |
| | | { |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get()); |
| | | mb.append("serverID:" + msg.getDestination()); |
| | | mb.append(" unreachable server ID=" + msg.getDestination()); |
| | | mb.append(" unroutable message =" + msg); |
| | | ErrorMessage errMsg = new ErrorMessage( |
| | | msg.getsenderID(), mb.toMessage()); |
| | | this.replicationServer.getServerId(), |
| | | msg.getsenderID(), |
| | | mb.toMessage()); |
| | | |
| | | try |
| | | { |
| | | senderHandler.send(errMsg); |
| | |
| | | { |
| | | // TODO Handle error properly (sender timeout in addition) |
| | | /* |
| | | * An error happened trying the send back an error to this server. |
| | | * Log an error and close the connection to the sender server. |
| | | * An error happened trying to send an error msg to this server. |
| | | * Log an error and close the connection to this server. |
| | | */ |
| | | MessageBuilder mb2 = new MessageBuilder(); |
| | | mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString())); |
| | |
| | | private void sendReplServerInfo() |
| | | { |
| | | ReplServerInfoMessage info = |
| | | new ReplServerInfoMessage(getConnectedLDAPservers()); |
| | | new ReplServerInfoMessage(getConnectedLDAPservers(), generationId); |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | try |
| | |
| | | } |
| | | |
| | | /** |
| | | * Get the generationId associated to this domain. |
| | | * |
| | | * @return The generationId |
| | | */ |
| | | public long getGenerationId() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Get the generationId saved status. |
| | | * |
| | | * @return The generationId saved status. |
| | | */ |
| | | public boolean getGenerationIdSavedStatus() |
| | | { |
| | | return generationIdSavedStatus; |
| | | } |
| | | |
| | | /** |
| | | * Sets the replication server informations for the provided |
| | | * handler from the provided ReplServerInfoMessage. |
| | | * |
| | |
| | | { |
| | | handler.setReplServerInfo(infoMsg); |
| | | } |
| | | |
| | | /** |
| | | * Sets the provided value as the new in memory generationId. |
| | | * |
| | | * @param generationId The new value of generationId. |
| | | * @param savedStatus The saved status of the generationId. |
| | | */ |
| | | synchronized public void setGenerationId(long generationId, |
| | | boolean savedStatus) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " RCache.set GenerationId=" + generationId); |
| | | |
| | | if (generationId == this.generationId) |
| | | return; |
| | | |
| | | if (this.generationId>0) |
| | | { |
| | | for (ServerHandler handler : connectedServers.values()) |
| | | { |
| | | handler.resetGenerationId(); |
| | | } |
| | | } |
| | | |
| | | this.generationId = generationId; |
| | | this.generationIdSavedStatus = savedStatus; |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Resets the generationID. |
| | | * |
| | | * @param senderHandler The handler associated to the server |
| | | * that requested to reset the generationId. |
| | | */ |
| | | public void resetGenerationId(ServerHandler senderHandler) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " RCache.resetGenerationId"); |
| | | |
| | | // Notifies the others LDAP servers that from now on |
| | | // they have the bad generationId |
| | | for (ServerHandler handler : connectedServers.values()) |
| | | { |
| | | handler.resetGenerationId(); |
| | | } |
| | | |
| | | // Propagates the reset message to the others replication servers |
| | | // dealing with the same domain. |
| | | if (senderHandler.isLDAPserver()) |
| | | { |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | { |
| | | try |
| | | { |
| | | handler.sendGenerationId(new ResetGenerationId()); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_INFO. |
| | | get(handler.getMonitorInstanceName())); |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Reset the localchange and state db for the current domain |
| | | synchronized (sourceDbHandlers) |
| | | { |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | { |
| | | try |
| | | { |
| | | dbHandler.clear(); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // TODO: i18n |
| | | logError(Message.raw( |
| | | "Exception caught while clearing dbHandler:" + |
| | | e.getLocalizedMessage())); |
| | | } |
| | | } |
| | | sourceDbHandlers.clear(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " The source db handler has been cleared"); |
| | | } |
| | | try |
| | | { |
| | | replicationServer.clearGenerationId(baseDn); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // TODO: i18n |
| | | logError(Message.raw( |
| | | "Exception caught while clearing generationId:" + |
| | | e.getLocalizedMessage())); |
| | | } |
| | | |
| | | // Reset the in memory domain generationId |
| | | generationId = -1; |
| | | } |
| | | |
| | | /** |
| | | * Returns whether the provided server is in degraded |
| | | * state due to the fact that the peer server has an invalid |
| | | * generationId for this domain. |
| | | * |
| | | * @param serverId The serverId for which we want to know the |
| | | * the state. |
| | | * @return Whether it is degraded or not. |
| | | */ |
| | | |
| | | public boolean isDegradedDueToGenerationId(short serverId) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " isDegraded serverId=" + serverId + |
| | | " given local generation Id=" + this.generationId); |
| | | |
| | | ServerHandler handler = replicationServers.get(serverId); |
| | | if (handler == null) |
| | | { |
| | | handler = connectedServers.get(serverId); |
| | | if (handler == null) |
| | | { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " Compute degradation of serverId=" + serverId + |
| | | " LS server generation Id=" + handler.getGenerationId()); |
| | | return (handler.getGenerationId() != this.generationId); |
| | | } |
| | | |
| | | /** |
| | | * Return the associated replication server. |
| | | * @return The replication server. |
| | | */ |
| | | public ReplicationServer getReplicationServer() |
| | | { |
| | | return replicationServer; |
| | | } |
| | | } |