Code cleanup.
MessageHandler.java:
Removed instance members replicationServerId and replicationServerURL from the classes in org.opends.server.replication.server package (duplicated information already held on replicationServer).
Added getReplicationServerURL().
ECLServerHandler.java:
Removed duplicated code in ctors.
In getNextECLUpdate(), tried to simplify reading of the code.
*.java:
Changed code as a consequence of the changes to MessageHandler.
Made some fields private.
In getMonitorData(), changed return type from ArrayList to List.
| | |
| | | package org.opends.server.replication.server; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | |
| | |
| | | */ |
| | | public class DataServerHandler extends ServerHandler |
| | | { |
| | | // Temporary generationId received in handshake/phase1, |
| | | // and used after handshake/phase2 |
| | | long tmpGenerationId; |
| | | /** |
| | | * Temporary generationId received in handshake/phase1, and used after |
| | | * handshake/phase2. |
| | | */ |
| | | private long tmpGenerationId; |
| | | |
| | | // Status of this DS (only used if this server handler represents a DS) |
| | | /** Status of this DS (only used if this server handler represents a DS). */ |
| | | private ServerStatus status = ServerStatus.INVALID_STATUS; |
| | | |
| | | // Referrals URLs this DS is exporting |
| | | /** Referrals URLs this DS is exporting. */ |
| | | private List<String> refUrls = new ArrayList<String>(); |
| | | // Assured replication enabled on DS or not |
| | | /** Assured replication enabled on DS or not. */ |
| | | private boolean assuredFlag = false; |
| | | // DS assured mode (relevant if assured replication enabled) |
| | | /** DS assured mode (relevant if assured replication enabled). */ |
| | | private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | // DS safe data level (relevant if assured mode is safe data) |
| | | /** DS safe data level (relevant if assured mode is safe data). */ |
| | | private byte safeDataLevel = (byte) -1; |
| | | private Set<String> eclIncludes = new HashSet<String>(); |
| | | private Set<String> eclIncludesForDeletes = new HashSet<String>(); |
| | |
| | | * Creates a new data server handler. |
| | | * @param session The session opened with the remote data server. |
| | | * @param queueSize The queue size. |
| | | * @param replicationServerURL The URL of the hosting RS. |
| | | * @param replicationServerId The serverID of the hosting RS. |
| | | * @param replicationServer The hosting RS. |
| | | * @param rcvWindowSize The receiving window size. |
| | | */ |
| | | public DataServerHandler( |
| | | Session session, |
| | | int queueSize, |
| | | String replicationServerURL, |
| | | int replicationServerId, |
| | | ReplicationServer replicationServer, |
| | | int rcvWindowSize) |
| | | { |
| | | super(session, queueSize, replicationServerURL, replicationServerId, |
| | | replicationServer, rcvWindowSize); |
| | | super(session, queueSize, replicationServer, rcvWindowSize); |
| | | } |
| | | |
| | | /** |
| | |
| | | * requested. |
| | | */ |
| | | @Override |
| | | public ArrayList<Attribute> getMonitorData() |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | // Get the generic ones |
| | | ArrayList<Attribute> attributes = super.getMonitorData(); |
| | | List<Attribute> attributes = super.getMonitorData(); |
| | | |
| | | // Add the specific DS ones |
| | | attributes.add(Attributes.create("replica", serverURL)); |
| | |
| | | * Gets the status of the connected DS. |
| | | * @return The status of the connected DS. |
| | | */ |
| | | @Override |
| | | public ServerStatus getStatus() |
| | | { |
| | | return status; |
| | |
| | | replicationServerDomain.buildAndSendTopoInfoToRSs(); |
| | | } |
| | | |
| | | // Send our own TopologyMsg to DS |
| | | /** Send our own TopologyMsg to DS. */ |
| | | private TopologyMsg sendTopoToRemoteDS() throws IOException |
| | | { |
| | | TopologyMsg outTopoMsg = replicationServerDomain |
| | |
| | | logError(message); |
| | | |
| | | super.finalizeStart(); |
| | | |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | |
| | | if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // Peer DS uses protocol < V4 : send it a ReplServerStartMsg |
| | | startMsg = new ReplServerStartMsg(replicationServerId, |
| | | replicationServerURL, getBaseDN(), maxRcvWindow, |
| | | startMsg = new ReplServerStartMsg(getReplicationServerId(), |
| | | getReplicationServerURL(), getBaseDN(), maxRcvWindow, |
| | | replicationServerDomain.getDbServerState(), |
| | | localGenerationId, sslEncryption, getLocalGroupId(), |
| | | replicationServerDomain.getReplicationServer() |
| | |
| | | else |
| | | { |
| | | // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg |
| | | startMsg = new ReplServerStartDSMsg(replicationServerId, |
| | | replicationServerURL, getBaseDN(), maxRcvWindow, |
| | | startMsg = new ReplServerStartDSMsg(getReplicationServerId(), |
| | | getReplicationServerURL(), getBaseDN(), maxRcvWindow, |
| | | replicationServerDomain.getDbServerState(), |
| | | localGenerationId, sslEncryption, getLocalGroupId(), |
| | | replicationServerDomain.getReplicationServer() |
| | |
| | | */ |
| | | public DSInfo toDSInfo() |
| | | { |
| | | return new DSInfo(serverId, serverURL, replicationServerId, generationId, |
| | | status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, |
| | | eclIncludes, eclIncludesForDeletes, getProtocolVersion()); |
| | | return new DSInfo(serverId, serverURL, getReplicationServerId(), |
| | | generationId, status, assuredFlag, assuredMode, safeDataLevel, groupId, |
| | | refUrls, eclIncludes, eclIncludesForDeletes, getProtocolVersion()); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4) |
| | | { |
| | | // Peer DS uses protocol < V4 : send it a ReplServerStartMsg |
| | | startMsg = new ReplServerStartMsg(replicationServerId, |
| | | replicationServerURL, getBaseDN(), maxRcvWindow, |
| | | startMsg = new ReplServerStartMsg(getReplicationServerId(), |
| | | getReplicationServerURL(), getBaseDN(), maxRcvWindow, |
| | | replicationServerDomain.getDbServerState(), |
| | | localGenerationId, sslEncryption, getLocalGroupId(), |
| | | replicationServerDomain.getReplicationServer() |
| | |
| | | else |
| | | { |
| | | // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg |
| | | startMsg = new ReplServerStartDSMsg(replicationServerId, |
| | | replicationServerURL, getBaseDN(), maxRcvWindow, |
| | | startMsg = new ReplServerStartDSMsg(getReplicationServerId(), |
| | | getReplicationServerURL(), getBaseDN(), maxRcvWindow, |
| | | new ServerState(), localGenerationId, sslEncryption, |
| | | getLocalGroupId(), 0, replicationServer.getWeight(), 0); |
| | | } |
| | |
| | | * Creates a new handler object to a remote replication server. |
| | | * @param session The session with the remote RS. |
| | | * @param queueSize The queue size to manage updates to that RS. |
| | | * @param replicationServerURL The hosting local RS URL. |
| | | * @param replicationServerId The hosting local RS serverId. |
| | | * @param replicationServer The hosting local RS object. |
| | | * @param rcvWindowSize The receiving window size. |
| | | */ |
| | | public ECLServerHandler( |
| | | Session session, |
| | | int queueSize, |
| | | String replicationServerURL, |
| | | int replicationServerId, |
| | | ReplicationServer replicationServer, |
| | | int rcvWindowSize) |
| | | { |
| | | super(session, queueSize, replicationServerURL, replicationServerId, |
| | | replicationServer, rcvWindowSize); |
| | | super(session, queueSize, replicationServer, rcvWindowSize); |
| | | try |
| | | { |
| | | setBaseDNAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true); |
| | |
| | | |
| | | /** |
| | | * Creates a new handler object to a remote replication server. |
| | | * @param replicationServerURL The hosting local RS URL. |
| | | * @param replicationServerId The hosting local RS serverId. |
| | | * @param replicationServer The hosting local RS object. |
| | | * @param startECLSessionMsg the start parameters. |
| | | * @throws DirectoryException when an errors occurs. |
| | | */ |
| | | public ECLServerHandler( |
| | | String replicationServerURL, |
| | | int replicationServerId, |
| | | ReplicationServer replicationServer, |
| | | StartECLSessionMsg startECLSessionMsg) |
| | | throws DirectoryException |
| | | { |
| | | // queueSize is hard coded to 1 else super class hangs for some reason |
| | | super(null, 1, replicationServerURL, replicationServerId, |
| | | replicationServer, 0); |
| | | try |
| | | { |
| | | setBaseDNAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, true); |
| | | } |
| | | catch(DirectoryException de) |
| | | { |
| | | // no chance to have a bad domain set here |
| | | } |
| | | this(null, 1, replicationServer, 0); |
| | | initialize(startECLSessionMsg); |
| | | } |
| | | |
| | |
| | | |
| | | // Creates the table that will contain the real-time info for each |
| | | // and every domain. |
| | | HashSet<DomainContext> tmpSet = new HashSet<DomainContext>(); |
| | | Set<DomainContext> tmpSet = new HashSet<DomainContext>(); |
| | | String missingDomains = ""; |
| | | if (rsdi != null) |
| | | { |
| | |
| | | newDomainCtxt.currentState = new ServerState(); |
| | | |
| | | // Creates an unconnected SH for the domain |
| | | MessageHandler mh = new MessageHandler(maxQueueSize, |
| | | replicationServerURL, replicationServerId, replicationServer); |
| | | MessageHandler mh = |
| | | new MessageHandler(maxQueueSize, replicationServer); |
| | | mh.setInitialServerState(newDomainCtxt.startState); |
| | | mh.setBaseDNAndDomain(rsd.getBaseDn(), false); |
| | | // register the unconnected into the domain |
| | |
| | | * requested. |
| | | */ |
| | | @Override |
| | | public ArrayList<Attribute> getMonitorData() |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | // Get the generic ones |
| | | ArrayList<Attribute> attributes = super.getMonitorData(); |
| | | List<Attribute> attributes = super.getMonitorData(); |
| | | |
| | | // Add the specific RS ones |
| | | attributes.add(Attributes.create("External-Changelog-Server", |
| | | serverURL)); |
| | | attributes.add(Attributes.create("External-Changelog-Server", serverURL)); |
| | | |
| | | // TODO:ECL No monitoring exist for ECL. |
| | | return attributes; |
| | |
| | | // take the oldest |
| | | // if one domain has no msg, still is candidate |
| | | |
| | | int iDom; |
| | | boolean continueLooping = true; |
| | | while (continueLooping && searchPhase == INIT_PHASE) |
| | | { |
| | |
| | | // Default is not to loop, with one exception |
| | | continueLooping = false; |
| | | |
| | | iDom = getOldestChangeFromDomainCtxts(); |
| | | |
| | | // iDom == -1 means that there is no oldest change to process |
| | | int iDom = getOldestChangeFromDomainCtxts(); |
| | | if (iDom == -1) |
| | | { |
| | | { // there is no oldest change to process |
| | | closeInitPhase(); |
| | | |
| | | // signals end of phase 1 to the caller |
| | |
| | | } |
| | | |
| | | // Build the ECLUpdateMsg to be returned |
| | | DomainContext oldestContext = domainCtxts[iDom]; |
| | | String suffix = oldestContext.rsd.getBaseDn(); |
| | | oldestChange = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg)domainCtxts[iDom].nextMsg, |
| | | (LDAPUpdateMsg)oldestContext.nextMsg, |
| | | null, // cookie will be set later |
| | | domainCtxts[iDom].rsd.getBaseDn(), |
| | | suffix, |
| | | 0); // draftChangeNumber may be set later |
| | | domainCtxts[iDom].nextMsg = null; |
| | | oldestContext.nextMsg = null; |
| | | |
| | | if (draftCompat) |
| | | { |
| | |
| | | // replogcn : the oldest change from the changelog db |
| | | ChangeNumber cnFromChangelogDb = |
| | | oldestChange.getUpdateMsg().getChangeNumber(); |
| | | String dnFromChangelogDb = domainCtxts[iDom].rsd.getBaseDn(); |
| | | String dnFromChangelogDb = suffix; |
| | | |
| | | while (true) |
| | | { |
| | |
| | | draftCNDb.add( |
| | | oldestChange.getDraftChangeNumber(), |
| | | this.previousCookie.toString(), |
| | | domainCtxts[iDom].rsd.getBaseDn(), |
| | | suffix, |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | break; |
| | |
| | | |
| | | // Set and test the domain of the oldestChange see if we reached |
| | | // the end of the phase for this domain |
| | | domainCtxts[iDom].currentState.update( |
| | | oldestContext.currentState.update( |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | if (domainCtxts[iDom].currentState.cover(domainCtxts[iDom].stopState)) |
| | | if (oldestContext.currentState.cover(oldestContext.stopState)) |
| | | { |
| | | domainCtxts[iDom].active = false; |
| | | oldestContext.active = false; |
| | | } |
| | | if (draftCompat && (lastDraftCN>0) && |
| | | (oldestChange.getDraftChangeNumber()>lastDraftCN)) |
| | | { |
| | | domainCtxts[iDom].active = false; |
| | | oldestContext.active = false; |
| | | } |
| | | if (domainCtxts[iDom].active) |
| | | if (oldestContext.active) |
| | | { |
| | | // populates the table with the next eligible msg from iDom |
| | | // in non blocking mode, return null when no more eligible msg |
| | | domainCtxts[iDom].getNextEligibleMessageForDomain(operationId); |
| | | oldestContext.getNextEligibleMessageForDomain(operationId); |
| | | } |
| | | } // phase == INIT_PHASE |
| | | } // while (...) |
| | |
| | | } |
| | | |
| | | // take the oldest one |
| | | iDom = getOldestChangeFromDomainCtxts(); |
| | | |
| | | int iDom = getOldestChangeFromDomainCtxts(); |
| | | if (iDom != -1) |
| | | { |
| | | String suffix = this.domainCtxts[iDom].rsd.getBaseDn(); |
| | | DomainContext oldestContext = domainCtxts[iDom]; |
| | | String suffix = oldestContext.rsd.getBaseDn(); |
| | | |
| | | oldestChange = new ECLUpdateMsg( |
| | | (LDAPUpdateMsg)domainCtxts[iDom].nextMsg, |
| | | (LDAPUpdateMsg)oldestContext.nextMsg, |
| | | null, // set later |
| | | suffix, 0); |
| | | domainCtxts[iDom].nextMsg = null; // clean |
| | | oldestContext.nextMsg = null; // clean |
| | | |
| | | domainCtxts[iDom].currentState.update( |
| | | oldestContext.currentState.update( |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | |
| | | if (draftCompat) |
| | |
| | | draftCNDb.add( |
| | | oldestChange.getDraftChangeNumber(), |
| | | this.previousCookie.toString(), |
| | | domainCtxts[iDom].rsd.getBaseDn(), |
| | | suffix, |
| | | oldestChange.getUpdateMsg().getChangeNumber()); |
| | | } |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2009-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2012 ForgeRock AS |
| | | * Portions Copyright 2012-2013 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | |
| | | import org.opends.server.replication.common.ExternalChangeLogSession; |
| | | import org.opends.server.replication.protocol.ECLUpdateMsg; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg; |
| | |
| | | * This class implements a session used to search the external changelog |
| | | * in the Directory Server. |
| | | */ |
| | | public class ExternalChangeLogSessionImpl |
| | | implements ExternalChangeLogSession |
| | | public class ExternalChangeLogSessionImpl implements ExternalChangeLogSession |
| | | { |
| | | |
| | | ECLServerHandler handler; |
| | | |
| | | private ECLServerHandler handler; |
| | | |
| | | /** |
| | | * Create a new external changelog session. |
| | |
| | | StartECLSessionMsg startECLSessionMsg) |
| | | throws DirectoryException |
| | | { |
| | | this.handler = new ECLServerHandler( |
| | | rs.getServerURL(), |
| | | rs.getServerId(), |
| | | rs, |
| | | startECLSessionMsg); |
| | | this.handler = new ECLServerHandler(rs, startECLSessionMsg); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @return the next available message from the ECL. |
| | | * @throws DirectoryException when needed. |
| | | */ |
| | | @Override |
| | | public ECLUpdateMsg getNextUpdate() |
| | | throws DirectoryException |
| | | { |
| | |
| | | /** |
| | | * Close the session. |
| | | */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | // ECL is a special case in the sense that there is no |
| | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.SortedSet; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | |
| | | */ |
| | | protected ReplicationServer replicationServer = null; |
| | | /** |
| | | * The URL of the hosting replication server. |
| | | */ |
| | | protected String replicationServerURL = null; |
| | | /** |
| | | * The serverID of the hosting replication server. |
| | | */ |
| | | protected int replicationServerId; |
| | | /** |
| | | * Specifies the related replication server domain based on baseDn. |
| | | */ |
| | | protected ReplicationServerDomain replicationServerDomain = null; |
| | |
| | | /** |
| | | * Specifies whether the consumer is following the producer (is not late). |
| | | */ |
| | | protected boolean following = false; |
| | | private boolean following = false; |
| | | /** |
| | | * Specifies the current serverState of this handler. |
| | | */ |
| | |
| | | * Creates a new server handler instance with the provided socket. |
| | | * @param queueSize The maximum number of update that will be kept |
| | | * in memory by this ServerHandler. |
| | | * @param replicationServerURL The URL of the hosting replication server. |
| | | * @param replicationServerId The ID of the hosting replication server. |
| | | * @param replicationServer The hosting replication server. |
| | | */ |
| | | public MessageHandler( |
| | | int queueSize, |
| | | String replicationServerURL, |
| | | int replicationServerId, |
| | | ReplicationServer replicationServer) |
| | | public MessageHandler(int queueSize, ReplicationServer replicationServer) |
| | | { |
| | | this.maxQueueSize = queueSize; |
| | | this.maxQueueBytesSize = queueSize * 100; |
| | | this.replicationServerURL = replicationServerURL; |
| | | this.replicationServerId = replicationServerId; |
| | | this.replicationServer = replicationServer; |
| | | } |
| | | |
| | |
| | | * requested. |
| | | */ |
| | | @Override |
| | | public ArrayList<Attribute> getMonitorData() |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | ArrayList<Attribute> attributes = new ArrayList<Attribute>(); |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(Attributes.create("handler", getMonitorInstanceName())); |
| | | attributes.add( |
| | | Attributes.create("queue-size", String.valueOf(msgQueue.count()))); |
| | | attributes.add( |
| | | Attributes.create( |
| | | "queue-size-bytes", String.valueOf(msgQueue.bytesCount()))); |
| | | attributes.add( |
| | | Attributes.create( |
| | | "following", String.valueOf(following))); |
| | | attributes.add(Attributes.create("following", String.valueOf(following))); |
| | | return attributes; |
| | | } |
| | | |
| | |
| | | return msgQueue.count(); |
| | | else |
| | | { |
| | | /** |
| | | /* |
| | | * When the server is not able to follow, the msgQueue |
| | | * may become too large and therefore won't contain all the |
| | | * changes. Some changes may only be stored in the backing DB |
| | |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | /* |
| | | * Shutdown ServerWriter |
| | | */ |
| | | synchronized (msgQueue) |
| | | { |
| | | msgQueue.clear(); |
| | |
| | | */ |
| | | public int getReplicationServerId() |
| | | { |
| | | return this.replicationServerId; |
| | | return this.replicationServer.getServerId(); |
| | | } |
| | | |
| | | /** |
| | | * Get the server URL of the hosting replication server. |
| | | * |
| | | * @return the replication server URL. |
| | | */ |
| | | public String getReplicationServerURL() |
| | | { |
| | | return this.replicationServer.getServerURL(); |
| | | } |
| | | } |
| | |
| | | |
| | | if (msg instanceof ServerStartMsg) |
| | | { |
| | | DataServerHandler handler = new DataServerHandler(session, |
| | | queueSize,serverURL,serverId,this,rcvWindow); |
| | | DataServerHandler handler = new DataServerHandler( |
| | | session, queueSize, this, rcvWindow); |
| | | handler.startFromRemoteDS((ServerStartMsg)msg); |
| | | } |
| | | else if (msg instanceof ReplServerStartMsg) |
| | | { |
| | | ReplicationServerHandler handler = new ReplicationServerHandler( |
| | | session,queueSize,serverURL,serverId,this,rcvWindow); |
| | | session, queueSize, this, rcvWindow); |
| | | handler.startFromRemoteRS((ReplServerStartMsg)msg); |
| | | } |
| | | else if (msg instanceof ServerStartECLMsg) |
| | | { |
| | | ECLServerHandler handler = new ECLServerHandler( |
| | | session,queueSize,serverURL,serverId,this,rcvWindow); |
| | | session, queueSize, this, rcvWindow); |
| | | handler.startFromRemoteServer((ServerStartECLMsg)msg); |
| | | } |
| | | else |
| | |
| | | session = replSessionSecurity.createClientSession(socket, timeoutMS); |
| | | |
| | | ReplicationServerHandler handler = new ReplicationServerHandler( |
| | | session, queueSize, this.serverURL, serverId, this, |
| | | rcvWindow); |
| | | session, queueSize, this, rcvWindow); |
| | | handler.connect(baseDn, sslEncryption); |
| | | } |
| | | catch (Exception e) |
| | |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | |
| | | private ReplServerStartMsg sendStartToRemote() throws IOException |
| | | { |
| | | ReplServerStartMsg outReplServerStartMsg = new ReplServerStartMsg( |
| | | replicationServerId, replicationServerURL, getBaseDN(), |
| | | getReplicationServerId(), getReplicationServerURL(), getBaseDN(), |
| | | maxRcvWindow, replicationServerDomain.getDbServerState(), |
| | | localGenerationId, sslEncryption, |
| | | getLocalGroupId(), replicationServerDomain.getReplicationServer() |
| | |
| | | * Creates a new handler object to a remote replication server. |
| | | * @param session The session with the remote RS. |
| | | * @param queueSize The queue size to manage updates to that RS. |
| | | * @param replicationServerURL The hosting local RS URL. |
| | | * @param replicationServerId The hosting local RS serverId. |
| | | * @param replicationServer The hosting local RS object. |
| | | * @param rcvWindowSize The receiving window size. |
| | | */ |
| | | public ReplicationServerHandler( |
| | | Session session, |
| | | int queueSize, |
| | | String replicationServerURL, |
| | | int replicationServerId, |
| | | ReplicationServer replicationServer, |
| | | int rcvWindowSize) |
| | | { |
| | | super(session, queueSize, replicationServerURL, replicationServerId, |
| | | replicationServer, rcvWindowSize); |
| | | super(session, queueSize, replicationServer, rcvWindowSize); |
| | | } |
| | | |
| | | /** |
| | |
| | | * requested. |
| | | */ |
| | | @Override |
| | | public ArrayList<Attribute> getMonitorData() |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | // Get the generic ones |
| | | ArrayList<Attribute> attributes = super.getMonitorData(); |
| | | List<Attribute> attributes = super.getMonitorData(); |
| | | |
| | | // Add the specific RS ones |
| | | attributes.add(Attributes.create("Replication-Server", serverURL)); |
| | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Random; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | |
| | | /** |
| | | * The thread that will send heartbeats. |
| | | */ |
| | | HeartbeatThread heartbeatThread = null; |
| | | private HeartbeatThread heartbeatThread; |
| | | |
| | | /** |
| | | * Set when ServerWriter is stopping. |
| | |
| | | * communicate with the remote entity. |
| | | * @param queueSize The maximum number of update that will be kept |
| | | * in memory by this ServerHandler. |
| | | * @param replicationServerURL The URL of the hosting replication server. |
| | | * @param replicationServerId The serverId of the hosting replication server. |
| | | * @param replicationServer The hosting replication server. |
| | | * @param rcvWindowSize The window size to receive from the remote server. |
| | | */ |
| | | public ServerHandler( |
| | | Session session, |
| | | int queueSize, |
| | | String replicationServerURL, |
| | | int replicationServerId, |
| | | ReplicationServer replicationServer, |
| | | int rcvWindowSize) |
| | | { |
| | | super(queueSize, replicationServerURL, |
| | | replicationServerId, replicationServer); |
| | | super(queueSize, replicationServer); |
| | | this.session = session; |
| | | this.rcvWindowSizeHalf = rcvWindowSize / 2; |
| | | this.maxRcvWindow = rcvWindowSize; |
| | |
| | | * requested. |
| | | */ |
| | | @Override |
| | | public ArrayList<Attribute> getMonitorData() |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | // Get the generic ones |
| | | ArrayList<Attribute> attributes = super.getMonitorData(); |
| | | List<Attribute> attributes = super.getMonitorData(); |
| | | |
| | | attributes.add(Attributes.create("server-id", String.valueOf(serverId))); |
| | | attributes.add(Attributes.create("domain-name", getBaseDN())); |
| | |
| | | getBaseDN(), |
| | | serverId, |
| | | session.getReadableRemoteAddress(), |
| | | replicationServerId); |
| | | getReplicationServerId()); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | } |