| | |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | |
| | | import java.io.IOException; |
| | |
| | | import org.opends.server.replication.protocol.DoneMessage; |
| | | import org.opends.server.replication.protocol.EntryMessage; |
| | | import org.opends.server.replication.protocol.ErrorMessage; |
| | | import org.opends.server.replication.protocol.ResetGenerationId; |
| | | import org.opends.server.replication.protocol.InitializeRequestMessage; |
| | | import org.opends.server.replication.protocol.InitializeTargetMessage; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | |
| | | * Constructor for the LDAP server reader part of the replicationServer. |
| | | * |
| | | * @param session The ProtocolSession from which to read the data. |
| | | * @param serverId The server ID of the server from which we read changes. |
| | | * @param serverId The server ID of the server from which we read messages. |
| | | * @param handler The server handler for this server reader. |
| | | * @param replicationCache The ReplicationCache for this server reader. |
| | | */ |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | TRACER.debugInfo("Replication server reader starting " + serverId); |
| | | } |
| | | else |
| | | { |
| | | TRACER.debugInfo("LDAP server reader starting " + serverId); |
| | | } |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?" RS ":" LS")+ |
| | | " reader starting for serverId=" + serverId); |
| | | } |
| | | /* |
| | | * wait on input stream |
| | |
| | | { |
| | | ReplicationMessage msg = session.receive(); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | "> from RS server with serverId=" + serverId + |
| | | " receives " + msg); |
| | | } |
| | | else |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | "> from LDAP server with serverId=" + serverId + |
| | | " receives " + msg); |
| | | } |
| | | } |
| | | if (msg instanceof AckMessage) |
| | | { |
| | | AckMessage ack = (AckMessage) msg; |
| | |
| | | } |
| | | else if (msg instanceof UpdateMessage) |
| | | { |
| | | UpdateMessage update = (UpdateMessage) msg; |
| | | handler.decAndCheckWindow(); |
| | | replicationCache.put(update, handler); |
| | | // Ignore update received from a replica with |
| | | // a bad generation ID |
| | | long referenceGenerationId = replicationCache.getGenerationId(); |
| | | if ((referenceGenerationId>0) && |
| | | (referenceGenerationId != handler.getGenerationId())) |
| | | { |
| | | logError(ERR_IGNORING_UPDATE_FROM.get( |
| | | msg.toString(), |
| | | handler.getMonitorInstanceName())); |
| | | } |
| | | else |
| | | { |
| | | UpdateMessage update = (UpdateMessage) msg; |
| | | handler.decAndCheckWindow(); |
| | | replicationCache.put(update, handler); |
| | | } |
| | | } |
| | | else if (msg instanceof WindowMessage) |
| | | { |
| | |
| | | ErrorMessage errorMsg = (ErrorMessage) msg; |
| | | handler.process(errorMsg); |
| | | } |
| | | else if (msg instanceof ResetGenerationId) |
| | | { |
| | | ResetGenerationId genIdMsg = (ResetGenerationId) msg; |
| | | replicationCache.resetGenerationId(this.handler); |
| | | } |
| | | else if (msg instanceof WindowProbe) |
| | | { |
| | | WindowProbe windowProbeMsg = (WindowProbe) msg; |
| | |
| | | { |
| | | ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg; |
| | | handler.setReplServerInfo(infoMsg); |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationCache.getReplicationServer(). |
| | | getServerId() + |
| | | " Receiving replServerInfo from " + handler.getServerId() + |
| | | " baseDn=" + replicationCache.getBaseDn() + |
| | | " genId=" + infoMsg.getGenerationId()); |
| | | } |
| | | |
| | | if (replicationCache.getGenerationId()<0) |
| | | { |
| | | // Here is the case where a ReplicationServer receives from |
| | | // another ReplicationServer the generationId for a domain |
| | | // for which the generation ID has never been set. |
| | | replicationCache.setGenerationId(infoMsg.getGenerationId(), false); |
| | | } |
| | | else |
| | | { |
| | | if (infoMsg.getGenerationId()<0) |
| | | { |
| | | // Here is the case where another ReplicationServer |
| | | // signals that it has no generationId set for the domain. |
| | | // If we have generationId set locally and no server currently |
| | | // connected for that domain in the topology then we may also |
| | | // reset the generationId localy. |
| | | replicationCache.mayResetGenerationId(); |
| | | } |
| | | |
| | | if (replicationCache.getGenerationId() != infoMsg.getGenerationId()) |
| | | { |
| | | Message message = NOTE_BAD_GENERATION_ID.get( |
| | | replicationCache.getBaseDn().toNormalizedString(), |
| | | Short.toString(handler.getServerId()), |
| | | Long.toString(infoMsg.getGenerationId()), |
| | | Long.toString(replicationCache.getGenerationId())); |
| | | |
| | | ErrorMessage errorMsg = new ErrorMessage( |
| | | replicationCache.getReplicationServer().getServerId(), |
| | | handler.getServerId(), |
| | | message); |
| | | session.publish(errorMsg); |
| | | } |
| | | } |
| | | } |
| | | else if (msg == null) |
| | | { |
| | |
| | | * Log a message and exit from this loop |
| | | * So that this handler is stopped. |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader IO EXCEPTION serverID=" + serverId |
| | | + stackTraceToSingleLineString(e) + e.getLocalizedMessage() + |
| | | e.getCause()); |
| | | Message message = NOTE_SERVER_DISCONNECT.get(handler.toString()); |
| | | logError(message); |
| | | } catch (ClassNotFoundException e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader CNF EXCEPTION serverID=" + serverId |
| | | + stackTraceToSingleLineString(e)); |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the conenction. |
| | | * close the connection. |
| | | */ |
| | | Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString()); |
| | | logError(message); |
| | | } catch (Exception e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " server reader EXCEPTION serverID=" + serverId |
| | | + stackTraceToSingleLineString(e)); |
| | | /* |
| | | * The remote server has sent an unknown message, |
| | | * close the conenction. |
| | | * close the connection. |
| | | */ |
| | | Message message = NOTE_READER_EXCEPTION.get(handler.toString()); |
| | | logError(message); |
| | |
| | | * happen. |
| | | * Attempt to close the socket and stop the server handler. |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | " reader CLOSE serverID=" + serverId |
| | | + stackTraceToSingleLineString(new Exception())); |
| | | try |
| | | { |
| | | session.close(); |
| | |
| | | replicationCache.stopServer(handler); |
| | | } |
| | | if (debugEnabled()) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | TRACER.debugInfo("Replication server reader stopping " + serverId); |
| | | } |
| | | else |
| | | { |
| | | TRACER.debugInfo("LDAP server reader stopping " + serverId); |
| | | } |
| | | } |
| | | TRACER.debugInfo( |
| | | "In RS <" + replicationCache.getReplicationServer(). |
| | | getMonitorInstanceName() + |
| | | (handler.isReplicationServer()?"RS":"LDAP") + |
| | | " server reader stopped for serverID=" + serverId |
| | | + stackTraceToSingleLineString(new Exception())); |
| | | } |
| | | } |