| | |
| | | package org.opends.server.replication.server; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.*; |
| | |
| | | * are removed and should they be needed again must be read from the backing |
| | | * file |
| | | * |
| | | * |
| | | * it runs a thread that is responsible for saving the messages |
| | | * received to the disk and for trimming them |
| | | * Decision to trim can be based on disk space or age of the message |
| | |
| | | TRACER.debugInfo("In " + "Replication Server " + |
| | | replicationServer.getReplicationPort() + " " + |
| | | baseDn + " " + replicationServer.getServerId() + |
| | | " for dn " + baseDn + ", update " + |
| | | update.getChangeNumber().toString() + |
| | | " for dn " + baseDn + ", update " + update.getChangeNumber() + |
| | | " will not be sent to replication server " + |
| | | Integer.toString(handler.getServerId()) + " with generation id " + |
| | | Long.toString(handler.getGenerationId()) + |
| | | " different from local " + |
| | | "generation id " + Long.toString(generationId)); |
| | | handler.getServerId() + " with generation id " + |
| | | handler.getGenerationId() + " different from local " + |
| | | "generation id " + generationId); |
| | | |
| | | continue; |
| | | } |
| | |
| | | { |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | TRACER.debugInfo("In " + this + |
| | | " for dn " + baseDn + ", update " + |
| | | update.getChangeNumber().toString() + |
| | | " for dn " + baseDn + ", update " + update.getChangeNumber() + |
| | | " will not be sent to directory server " + |
| | | Integer.toString(handler.getServerId()) + " with generation id " + |
| | | Long.toString(handler.getGenerationId()) + |
| | | " different from local " + |
| | | "generation id " + Long.toString(generationId)); |
| | | handler.getServerId() + " with generation id " + |
| | | handler.getGenerationId() + " different from local " + |
| | | "generation id " + generationId); |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | TRACER.debugInfo("In RS " + |
| | | replicationServer.getServerId() + |
| | | " for dn " + baseDn + ", update " + |
| | | update.getChangeNumber().toString() + |
| | | " will not be sent to directory server " + |
| | | Integer.toString(handler.getServerId()) + |
| | | " for dn " + baseDn + ", update " + update.getChangeNumber() + |
| | | " will not be sent to directory server " + handler.getServerId() + |
| | | " as it is in full update"); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | List<Integer> expectedServers = new ArrayList<Integer>(); |
| | | if (interestedInAcks) |
| | | if (interestedInAcks && sourceHandler.isDataServer()) |
| | | { |
| | | if (sourceHandler.isDataServer()) |
| | | // Look for RS eligible for assured |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | // Look for RS eligible for assured |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | if (handler.getGroupId() == groupId |
| | | // No ack expected from a RS with different group id |
| | | && generationId > 0 && (generationId == handler.getGenerationId())) |
| | | // No ack expected from a RS with bad gen id |
| | | { |
| | | if (handler.getGroupId() == groupId) |
| | | // No ack expected from a RS with different group id |
| | | { |
| | | if ((generationId > 0) && |
| | | (generationId == handler.getGenerationId())) |
| | | // No ack expected from a RS with bad gen id |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | } |
| | |
| | | * Run when the assured timeout for an assured update message we are waiting |
| | | * acks for occurs. |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn); |
| | |
| | | ServerHandler origServer = expectedAcksInfo.getRequesterServer(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + Integer.toString(replicationServer.getServerId()) + |
| | | " for " + baseDn + |
| | | "In RS " + replicationServer.getServerId() + " for " + baseDn + |
| | | ", sending timeout for assured update with change " + " number " + |
| | | cn.toString() + " to server id " + |
| | | Integer.toString(origServer.getServerId())); |
| | | cn + " to server id " + origServer.getServerId()); |
| | | try |
| | | { |
| | | origServer.send(finalAck); |
| | |
| | | } |
| | | // Increment assured counters |
| | | boolean safeRead = |
| | | (expectedAcksInfo instanceof SafeReadExpectedAcksInfo); |
| | | expectedAcksInfo instanceof SafeReadExpectedAcksInfo; |
| | | if (safeRead) |
| | | { |
| | | origServer.incrementAssuredSrReceivedUpdatesTimeout(); |
| | |
| | | " has servers connected to it - will not reset generationId"); |
| | | } |
| | | |
| | | if ((!lDAPServersConnectedInTheTopology) && |
| | | (!this.generationIdSavedStatus) && |
| | | (generationId != -1)) |
| | | if (!lDAPServersConnectedInTheTopology && !this.generationIdSavedStatus |
| | | && generationId != -1) |
| | | { |
| | | changeGenerationId(-1, false); |
| | | } |
| | |
| | | { |
| | | ReplicationServerHandler oldHandler = |
| | | replicationServers.get(handler.getServerId()); |
| | | if ((oldHandler != null)) |
| | | if (oldHandler != null) |
| | | { |
| | | if (oldHandler.getServerAddressURL().equals( |
| | | handler.getServerAddressURL())) |
| | |
| | | // We log the error. The requestor will detect a timeout or |
| | | // any other failure on the connection. |
| | | logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( |
| | | Integer.toString((msg.getDestination())))); |
| | | Integer.toString(msg.getDestination()))); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | { |
| | | if ((notThisOne == null) || ((handler != notThisOne))) |
| | | if ((notThisOne == null) || (handler != notThisOne)) |
| | | // All except passed one |
| | | { |
| | | for (int i=1; i<=2; i++) |
| | | { |
| | | if (!handler.shuttingDown()) |
| | | if (!handler.shuttingDown() |
| | | && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | { |
| | | if (handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | TopologyMsg topoMsg = createTopologyMsgForDS(handler.getServerId()); |
| | | try |
| | | { |
| | | TopologyMsg topoMsg=createTopologyMsgForDS(handler.getServerId()); |
| | | try |
| | | handler.sendTopoInfo(topoMsg); |
| | | break; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | if (i == 2) |
| | | { |
| | | handler.sendTopoInfo(topoMsg); |
| | | break; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | if (i==2) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( |
| | | baseDn, |
| | | "directory", |
| | | Integer.toString(handler.getServerId()), |
| | | e.getMessage()); |
| | | logError(message); |
| | | } |
| | | Message message = |
| | | ERR_EXCEPTION_SENDING_TOPO_INFO |
| | | .get(baseDn, "directory", Integer.toString(handler |
| | | .getServerId()), e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | for (int i=1; i<=2; i++) |
| | | { |
| | | if (!handler.shuttingDown()) |
| | | if (!handler.shuttingDown() |
| | | && handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | { |
| | | if (handler.getStatus() != ServerStatus.NOT_CONNECTED_STATUS) |
| | | try |
| | | { |
| | | try |
| | | handler.sendTopoInfo(topoMsg); |
| | | break; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | if (i == 2) |
| | | { |
| | | handler.sendTopoInfo(topoMsg); |
| | | break; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | if (i==2) |
| | | { |
| | | Message message = ERR_EXCEPTION_SENDING_TOPO_INFO.get( |
| | | baseDn, |
| | | "replication", |
| | | Integer.toString(handler.getServerId()), |
| | | e.getMessage()); |
| | | logError(message); |
| | | } |
| | | Message message = |
| | | ERR_EXCEPTION_SENDING_TOPO_INFO.get(baseDn, "replication", |
| | | Integer.toString(handler.getServerId()), e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | public boolean hasLock() |
| | | { |
| | | return (lock.getHoldCount() > 0); |
| | | return lock.getHoldCount() > 0; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean isRunningStatusAnalyzer() |
| | | { |
| | | return (statusAnalyzer != null); |
| | | return statusAnalyzer != null; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean isRunningMonitoringPublisher() |
| | | { |
| | | return (monitoringPublisher != null); |
| | | return monitoringPublisher != null; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | ChangeNumber changelogLastCN = db.getLastChange(); |
| | | if (changelogLastCN != null) |
| | | if (changelogLastCN != null |
| | | && (eligibleCN == null || changelogLastCN.newer(eligibleCN))) |
| | | { |
| | | if ((eligibleCN == null) || (changelogLastCN.newer(eligibleCN))) |
| | | { |
| | | eligibleCN = changelogLastCN; |
| | | } |
| | | eligibleCN = changelogLastCN; |
| | | } |
| | | |
| | | if ((heartbeatLastDN != null) && |
| | | ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN)))) |
| | | if (heartbeatLastDN != null |
| | | && (eligibleCN == null || heartbeatLastDN.newer(eligibleCN))) |
| | | { |
| | | eligibleCN = heartbeatLastDN; |
| | | } |