| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.*; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.Date; |
| | | import java.util.HashSet; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachine; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | /** |
| | | * This class defines a server handler, which handles all interaction with a |
| | | * peer server (RS or DS). |
| | |
| | | * @param newGenId The new generation id to take into account |
| | | * @throws IOException If IO error occurred. |
| | | */ |
| | | public void changeStatusForResetGenId(long newGenId) |
| | | throws IOException |
| | | public void changeStatusForResetGenId(long newGenId) throws IOException |
| | | { |
| | | StatusMachineEvent event; |
| | | final int localRsServerId = replicationServer.getServerId(); |
| | | |
| | | StatusMachineEvent event; |
| | | if (newGenId == -1) |
| | | { |
| | | // The generation id is being made invalid, let's put the DS |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | ". Closing connection to DS " + getServerId() + |
| | | "In RS " + localRsServerId + |
| | | ", closing connection to DS " + getServerId() + |
| | | " for baseDn " + getBaseDN() + |
| | | " to force reconnection as new local" + |
| | | " generationId and remote one match and DS is in bad gen id: " + |
| | |
| | | // would rewait the RSD lock that we already must have entering this |
| | | // method. This would lead to a reentrant lock which we do not want. |
| | | // So simply close the session, this will make the hang up appear |
| | | // after the reader thread that took the RSD lock realeases it. |
| | | if (session != null) |
| | | // after the reader thread that took the RSD lock releases it. |
| | | if (session != null |
| | | // V4 protocol introduced a StopMsg to properly close the |
| | | // connection between servers |
| | | && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // V4 protocol introduces a StopMsg to properly close the |
| | | // connection between servers |
| | | if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | try |
| | | { |
| | | try |
| | | { |
| | | session.publish(new StopMsg()); |
| | | } catch (IOException ioe) |
| | | { |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | session.publish(new StopMsg()); |
| | | } |
| | | catch (IOException ioe) |
| | | { |
| | | // Anyway, going to close session, so nothing to do |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | ". DS " + getServerId() + " for baseDn " + getBaseDN() + |
| | | " has already generation id " + newGenId + |
| | | " so no ChangeStatusMsg sent to him."); |
| | | TRACER.debugInfo("In RS " + localRsServerId + ". DS " |
| | | + getServerId() + " for baseDn " + getBaseDN() |
| | | + " has already generation id " + newGenId |
| | | + " so no ChangeStatusMsg sent to him."); |
| | | } |
| | | return; |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) && |
| | | (status == ServerStatus.FULL_UPDATE_STATUS)) |
| | | if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT |
| | | && status == ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | | // Prevent useless error message (full update status cannot lead to bad |
| | | // gen status) |
| | | Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get( |
| | | Integer.toString(replicationServerDomain. |
| | | getReplicationServer().getServerId()), |
| | | Integer.toString(localRsServerId), |
| | | getBaseDN(), |
| | | Integer.toString(serverId), |
| | | Long.toString(generationId), |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | " Sending change status for reset gen id to " + getServerId() + |
| | | " for baseDn " + getBaseDN() + ":\n" + csMsg); |
| | | TRACER.debugInfo("In RS " + localRsServerId |
| | | + " Sending change status for reset gen id to " + getServerId() |
| | | + " for baseDn " + getBaseDN() + ":\n" + csMsg); |
| | | } |
| | | |
| | | session.publish(csMsg); |
| | |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId() + |
| | | " Sending change status from status analyzer to " + getServerId() + |
| | | " for baseDn " + getBaseDN() + ":\n" + csMsg); |
| | | TRACER.debugInfo("In RS " + replicationServer.getServerId() |
| | | + " Sending change status from status analyzer to " + getServerId() |
| | | + " for baseDn " + getBaseDN() + ":\n" + csMsg); |
| | | } |
| | | |
| | | session.publish(csMsg); |
| | |
| | | // Add the specific DS ones |
| | | attributes.add(Attributes.create("replica", serverURL)); |
| | | attributes.add(Attributes.create("connected-to", |
| | | this.replicationServerDomain.getReplicationServer() |
| | | .getMonitorInstanceName())); |
| | | this.replicationServer.getMonitorInstanceName())); |
| | | |
| | | MonitorData md = replicationServerDomain.getDomainMonitorData(); |
| | | |
| | | // Oldest missing update |
| | | Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); |
| | | if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0)) |
| | | long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); |
| | | if (approxFirstMissingDate > 0) |
| | | { |
| | | Date date = new Date(approxFirstMissingDate); |
| | | attributes.add(Attributes.create( |
| | |
| | | } |
| | | |
| | | // Missing changes |
| | | long missingChanges = md.getMissingChanges(serverId); |
| | | attributes.add(Attributes.create("missing-changes", String |
| | | .valueOf(missingChanges))); |
| | | attributes.add(Attributes.create("missing-changes", |
| | | String.valueOf(md.getMissingChanges(serverId)))); |
| | | |
| | | // Replication delay |
| | | long delay = md.getApproxDelay(serverId); |
| | | attributes.add(Attributes.create("approximate-delay", String |
| | | .valueOf(delay))); |
| | | attributes.add(Attributes.create("approximate-delay", |
| | | String.valueOf(md.getApproxDelay(serverId)))); |
| | | |
| | | /* get the Server State */ |
| | | AttributeBuilder builder = new AttributeBuilder("server-state"); |
| | |
| | | { |
| | | Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get( |
| | | Integer.toString(inServerStartMsg.getServerId()), |
| | | Integer.toString(replicationServerDomain.getReplicationServer(). |
| | | getServerId())); |
| | | Integer.toString(replicationServer.getServerId())); |
| | | throw new DirectoryException(ResultCode.OTHER, errMessage); |
| | | } |
| | | catch (NotSupportedOldVersionPDUException e) |
| | | { |
| | | // We do not need to support DS V1 connection, we just accept RS V1 |
| | | // connection: |
| | | // We just trash the message, log the event for debug purpose and close |
| | | // the connection |
| | | throw new DirectoryException(ResultCode.OTHER, null, null); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // We do not need to support DS V1 connection, we just accept RS V1 |
| | |
| | | } |
| | | finally |
| | | { |
| | | if ((replicationServerDomain != null) && |
| | | if (replicationServerDomain != null && |
| | | replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | } |
| | |
| | | { |
| | | // Peer DS uses protocol < V4 : send it a ReplServerStartMsg |
| | | startMsg = new ReplServerStartMsg(getReplicationServerId(), |
| | | getReplicationServerURL(), getBaseDN(), maxRcvWindow, |
| | | getReplicationServerURL(), getBaseDN(), maxRcvWindow, |
| | | replicationServerDomain.getDbServerState(), |
| | | localGenerationId, sslEncryption, getLocalGroupId(), |
| | | replicationServerDomain.getReplicationServer() |
| | | .getDegradedStatusThreshold()); |
| | | replicationServer.getDegradedStatusThreshold()); |
| | | } |
| | | else |
| | | { |
| | | // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg |
| | | startMsg = new ReplServerStartDSMsg(getReplicationServerId(), |
| | | getReplicationServerURL(), getBaseDN(), maxRcvWindow, |
| | | getReplicationServerURL(), getBaseDN(), maxRcvWindow, |
| | | replicationServerDomain.getDbServerState(), |
| | | localGenerationId, sslEncryption, getLocalGroupId(), |
| | | replicationServerDomain.getReplicationServer() |
| | | .getDegradedStatusThreshold(), replicationServer.getWeight(), |
| | | replicationServer.getDegradedStatusThreshold(), |
| | | replicationServer.getWeight(), |
| | | replicationServerDomain.getConnectedLDAPservers().size()); |
| | | } |
| | | |
| | |
| | | { |
| | | if (serverId != 0) |
| | | { |
| | | StringBuilder builder = new StringBuilder("Replica DS("); |
| | | builder.append(serverId); |
| | | builder.append(") for domain \""); |
| | | builder.append(replicationServerDomain.getBaseDn()); |
| | | builder.append("\""); |
| | | return builder.toString(); |
| | | return "Replica DS(" + serverId + ") for domain \"" |
| | | + replicationServerDomain.getBaseDn() + "\""; |
| | | } |
| | | else |
| | | { |
| | | return "Unknown server"; |
| | | } |
| | | return "Unknown server"; |
| | | } |
| | | |
| | | /** |
| | |
| | | else |
| | | { |
| | | // We are an empty ReplicationServer |
| | | if ((generationId > 0) && (!getServerState().isEmpty())) |
| | | if (generationId > 0 && !getServerState().isEmpty()) |
| | | { |
| | | // If the LDAP server has already sent changes |
| | | // it is not expected to connect to an empty RS |