| | |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import org.opends.messages.*; |
| | | import org.opends.messages.MessageBuilder; |
| | | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | |
| | | import java.util.HashMap; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Semaphore; |
| | |
| | | private short replicationServerId; |
| | | |
| | | private short protocolVersion; |
| | | private long generationId=-1; |
| | | |
| | | |
| | | /** |
| | |
| | | * Then create the reader and writer thread. |
| | | * |
| | | * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. |
| | | * null if this is an incoming connection. |
| | | * null if this is an incoming connection (listen). |
| | | * @param replicationServerId The identifier of the replicationServer that |
| | | * creates this server handler. |
| | | * @param replicationServerURL The URL of the replicationServer that creates |
| | |
| | | int windowSize, boolean sslEncryption, |
| | | ReplicationServer replicationServer) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + |
| | | " starts a new LS or RS " + |
| | | ((baseDn == null)?"incoming connection":"outgoing connection")); |
| | | |
| | | this.replicationServerId = replicationServerId; |
| | | rcvWindowSizeHalf = windowSize/2; |
| | | maxRcvWindow = windowSize; |
| | | rcvWindow = windowSize; |
| | | long localGenerationId=-1; |
| | | try |
| | | { |
| | | if (baseDn != null) |
| | | { |
| | | // This is an outgoing connection. Publish our start message. |
| | | this.baseDn = baseDn; |
| | | replicationCache = replicationServer.getReplicationCache(baseDn); |
| | | |
| | | // Get or create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(baseDn, true); |
| | | localGenerationId = replicationCache.getGenerationId(); |
| | | |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | ReplServerStartMessage msg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | baseDn, windowSize, localServerState, |
| | | protocolVersion, sslEncryption); |
| | | protocolVersion, localGenerationId, |
| | | sslEncryption); |
| | | |
| | | session.publish(msg); |
| | | } |
| | | |
| | |
| | | ReplicationMessage msg = session.receive(); |
| | | if (msg instanceof ServerStartMessage) |
| | | { |
| | | // The remote server is an LDAP Server |
| | | // The remote server is an LDAP Server. |
| | | ServerStartMessage receivedMsg = (ServerStartMessage) msg; |
| | | |
| | | generationId = receivedMsg.getGenerationId(); |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | receivedMsg.getVersion()); |
| | | serverId = receivedMsg.getServerId(); |
| | |
| | | |
| | | serverIsLDAPserver = true; |
| | | |
| | | // This an incoming connection. Publish our start message |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | // Get or Create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn, |
| | | true); |
| | | localGenerationId = replicationCache.getGenerationId(); |
| | | |
| | | ServerState localServerState = replicationCache.getDbServerState(); |
| | | // This an incoming connection. Publish our start message |
| | | ReplServerStartMessage myStartMsg = |
| | | new ReplServerStartMessage(replicationServerId, replicationServerURL, |
| | | this.baseDn, windowSize, localServerState, |
| | | protocolVersion, sslEncryption); |
| | | protocolVersion, localGenerationId, |
| | | sslEncryption); |
| | | session.publish(myStartMsg); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | |
| | | /* Until here session is encrypted then it depends on the negociation */ |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | Set<String> ss = this.serverState.toStringSet(); |
| | | Set<String> lss = replicationCache.getDbServerState().toStringSet(); |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | ", SH received START from LS serverId=" + serverId + |
| | | " baseDN=" + this.baseDn + |
| | | " generationId=" + generationId + |
| | | " localGenerationId=" + localGenerationId + |
| | | " state=" + ss + |
| | | " and sent ReplServerStart with state=" + lss); |
| | | } |
| | | |
| | | /* |
| | | * If we have already a generationID set for the domain |
| | | * then |
| | | * if the connecting replica has not the same |
| | | * then it is degraded locally and notified by an error message |
| | | * else |
| | | * we set the generationID from the one received |
| | | * (unsaved yet on disk . will be set with the 1rst change received) |
| | | */ |
| | | if (localGenerationId>0) |
| | | { |
| | | if (generationId != localGenerationId) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | receivedMsg.getBaseDn().toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(replicationServerId, serverId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | replicationCache.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | else if (msg instanceof ReplServerStartMessage) |
| | | { |
| | |
| | | ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg; |
| | | protocolVersion = ProtocolVersion.minWithCurrent( |
| | | receivedMsg.getVersion()); |
| | | generationId = receivedMsg.getGenerationId(); |
| | | serverId = receivedMsg.getServerId(); |
| | | serverURL = receivedMsg.getServerURL(); |
| | | int separator = serverURL.lastIndexOf(':'); |
| | |
| | | this.baseDn = receivedMsg.getBaseDn(); |
| | | if (baseDn == null) |
| | | { |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | // Get or create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn, |
| | | true); |
| | | localGenerationId = replicationCache.getGenerationId(); |
| | | ServerState serverState = replicationCache.getDbServerState(); |
| | | |
| | | // The session initiator decides whether to use SSL. |
| | |
| | | new ReplServerStartMessage(replicationServerId, |
| | | replicationServerURL, |
| | | this.baseDn, windowSize, serverState, |
| | | protocolVersion, sslEncryption); |
| | | protocolVersion, |
| | | localGenerationId, |
| | | sslEncryption); |
| | | session.publish(outMsg); |
| | | } |
| | | else |
| | |
| | | } |
| | | this.serverState = receivedMsg.getServerState(); |
| | | sendWindowSize = receivedMsg.getWindowSize(); |
| | | |
| | | /* Until here session is encrypted then it depends on the negociation */ |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | Set<String> ss = this.serverState.toStringSet(); |
| | | Set<String> lss = replicationCache.getDbServerState().toStringSet(); |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | ", SH received START from RS serverId=" + serverId + |
| | | " baseDN=" + this.baseDn + |
| | | " generationId=" + generationId + |
| | | " localGenerationId=" + localGenerationId + |
| | | " state=" + ss + |
| | | " and sent ReplServerStart with state=" + lss); |
| | | } |
| | | |
| | | // if the remote RS and the local RS have the same genID |
| | | // then it's ok and nothing else to do |
| | | if (generationId == localGenerationId) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS with serverID=" + serverId + |
| | | " is connected with the right generation ID"); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (localGenerationId>0) |
| | | { |
| | | // if the local RS is initialized |
| | | if (generationId>0) |
| | | { |
| | | // if the remote RS is initialized |
| | | if (generationId != localGenerationId) |
| | | { |
| | | // if the 2 RS have different generationID |
| | | if (replicationCache.getGenerationIdSavedStatus()) |
| | | { |
| | | // it the present RS has received changes regarding its |
| | | // gen ID and so won't change without a reset |
| | | // then we are just degrading the peer. |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(replicationServerId, serverId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | else |
| | | { |
| | | // The present RS has never received changes regarding its |
| | | // gen ID. |
| | | // |
| | | // Example case: |
| | | // - we are in RS1 |
| | | // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) |
| | | // - RS1 has genId1 from LS1 /genId1 comes from data in suffix |
| | | // - we are in RS1 and we receive a START msg from RS2 |
| | | // - Each RS keeps its genID / is degraded and when LS2 will |
| | | // be populated from LS1 everything will becomes ok. |
| | | // |
| | | // Issue: |
| | | // FIXME : Would it be a good idea in some cases to just |
| | | // set the gen ID received from the peer RS |
| | | // specially if the peer has a non nul state and |
| | | // we have a nul state ? |
| | | // replicationCache.setGenerationId(generationId, false); |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | this.baseDn.toNormalizedString(), |
| | | Short.toString(receivedMsg.getServerId()), |
| | | Long.toString(generationId), |
| | | Long.toString(localGenerationId)); |
| | | |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(replicationServerId, serverId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // The remote has no genId. We don't change anything for the |
| | | // current RS. |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // The local RS is not initialized - take the one received |
| | | replicationCache.setGenerationId(generationId, false); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | |
| | | return; // we did not recognize the message, ignore it |
| | | } |
| | | |
| | | if (!sslEncryption) |
| | | { |
| | | session.stopEncryption(); |
| | | } |
| | | |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn); |
| | | // Get or create the ReplicationCache |
| | | replicationCache = replicationServer.getReplicationCache(this.baseDn, |
| | | true); |
| | | |
| | | boolean started; |
| | | if (serverIsLDAPserver) |
| | |
| | | |
| | | if (started) |
| | | { |
| | | writer = new ServerWriter(session, serverId, this, replicationCache); |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | |
| | | reader = new ServerReader(session, serverId, this, |
| | | replicationCache); |
| | | writer = new ServerWriter(session, serverId, this, replicationCache); |
| | | reader = new ServerReader(session, serverId, this, replicationCache); |
| | | |
| | | reader.start(); |
| | | writer.start(); |
| | |
| | | // the connection is not valid, close it. |
| | | try |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + " RS failed to start locally " + |
| | | " the connection from serverID="+serverId); |
| | | } |
| | | session.close(); |
| | | } catch (IOException e1) |
| | | { |
| | |
| | | { |
| | | // some problem happened, reject the connection |
| | | MessageBuilder mb = new MessageBuilder(); |
| | | mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(this.toString())); |
| | | mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get( |
| | | this.getMonitorInstanceName())); |
| | | mb.append(stackTraceToSingleLineString(e)); |
| | | logError(mb.toMessage()); |
| | | try |
| | |
| | | // ignore |
| | | } |
| | | } |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void add(UpdateMessage update, ServerHandler sourceHandler) |
| | | { |
| | | /* |
| | | * Ignore updates from a server that is degraded due to |
| | | * its inconsistent generationId |
| | | */ |
| | | long referenceGenerationId = replicationCache.getGenerationId(); |
| | | if ((referenceGenerationId>0) && |
| | | (referenceGenerationId != generationId)) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_TO.get( |
| | | update.getDn(), |
| | | this.getMonitorInstanceName())); |
| | | |
| | | return; |
| | | } |
| | | |
| | | synchronized (msgQueue) |
| | | { |
| | | /* |
| | |
| | | if (serverIsLDAPserver) |
| | | return "Remote LDAP Server " + str; |
| | | else |
| | | return "Remote Replication Server " + str; |
| | | return "Remote Repl Server " + str; |
| | | } |
| | | |
| | | /** |
| | |
| | | attributes.add(attr); |
| | | |
| | | attributes.add(new Attribute("ssl-encryption", |
| | | String.valueOf(session.isEncrypted()))); |
| | | String.valueOf(session.isEncrypted()))); |
| | | |
| | | attributes.add(new Attribute("generation-id", |
| | | String.valueOf(generationId))); |
| | | |
| | | return attributes; |
| | | } |
| | |
| | | public void process(RoutableMessage msg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("SH(" + replicationServerId + ") receives " + |
| | | msg + " from " + serverId); |
| | | |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " processes received msg=" + msg); |
| | | replicationCache.process(msg, this); |
| | | } |
| | | |
| | |
| | | public void sendInfo(ReplServerInfoMessage info) |
| | | throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sends message=" + info); |
| | | |
| | | session.publish(info); |
| | | } |
| | | |
| | |
| | | */ |
| | | public void setReplServerInfo(ReplServerInfoMessage infoMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sets replServerInfo " + "<" + infoMsg + ">"); |
| | | remoteLDAPservers = infoMsg.getConnectedServers(); |
| | | generationId = infoMsg.getGenerationId(); |
| | | } |
| | | |
| | | /** |
| | |
| | | public void send(RoutableMessage msg) throws IOException |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("SH(" + replicationServerId + ") forwards " + |
| | | msg + " to " + serverId); |
| | | TRACER.debugInfo("In " + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " SH for remote server " + this.getMonitorInstanceName() + |
| | | " sends message=" + msg); |
| | | session.publish(msg); |
| | | } |
| | | |
| | |
| | | checkWindow(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns the value of generationId for that handler. |
| | | * @return The value of the generationId. |
| | | */ |
| | | public long getGenerationId() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Resets the generationId for this domain. |
| | | */ |
| | | public void resetGenerationId() |
| | | { |
| | | // Notify the peer that it is now invalid regarding the generationId |
| | | // We are now waiting a startServer message from this server with |
| | | // a valid generationId. |
| | | try |
| | | { |
| | | Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString()); |
| | | ErrorMessage errorMsg = |
| | | new ErrorMessage(serverId, replicationServerId, message); |
| | | session.publish(errorMsg); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // FIXME Log exception when sending reset error message |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Sends a message containing a generationId to a peer server. |
| | | * The peer is expected to be a replication server. |
| | | * |
| | | * @param msg The GenerationIdMessage message to be sent. |
| | | * @throws IOException When it occurs while sending the message, |
| | | * |
| | | */ |
| | | public void sendGenerationId(ResetGenerationId msg) |
| | | throws IOException |
| | | { |
| | | session.publish(msg); |
| | | } |
| | | } |