| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | ((handler != null) ? handler.toString() : "Replication Server") + |
| | | " closing session with err=" + |
| | | providedMsg.toString()); |
| | | " closing session with err=" + providedMsg); |
| | | logError(providedMsg); |
| | | } |
| | | |
| | |
| | | */ |
| | | protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger(); |
| | | /** |
| | | // Number of updates received from the server in assured safe data mode. |
| | | * Number of updates received from the server in assured safe data mode. |
| | | */ |
| | | protected int assuredSdReceivedUpdates = 0; |
| | | /** |
| | |
| | | /** |
| | | * The initial size of the sending window. |
| | | */ |
| | | int sendWindowSize; |
| | | protected int sendWindowSize; |
| | | /** |
| | | * remote generation id. |
| | | */ |
| | |
| | | /** |
| | | * Group id of this remote server. |
| | | */ |
| | | protected byte groupId = (byte) -1; |
| | | protected byte groupId = -1; |
| | | /** |
| | | * The SSL encryption after the negotiation with the peer. |
| | | */ |
| | |
| | | closeSession(localSession, reason, this); |
| | | } |
| | | |
| | | if ((replicationServerDomain != null) && |
| | | replicationServerDomain.hasLock()) |
| | | if (replicationServerDomain != null && replicationServerDomain.hasLock()) |
| | | replicationServerDomain.release(); |
| | | |
| | | // If generation id of domain was changed, set it back to old value |
| | |
| | | // peer server and the last topo message sent may have failed being |
| | | // sent: in that case retrieve old value of generation id for |
| | | // replication server domain |
| | | if (oldGenerationId != -100) |
| | | if (oldGenerationId != -100 && replicationServerDomain != null) |
| | | { |
| | | if (replicationServerDomain!=null) |
| | | replicationServerDomain.changeGenerationId(oldGenerationId, false); |
| | | replicationServerDomain.changeGenerationId(oldGenerationId, false); |
| | | } |
| | | } |
| | | |
| | |
| | | @Override |
| | | public boolean engageShutdown() |
| | | { |
| | | // Use thread safe boolean |
| | | return shuttingDown.getAndSet(true); |
| | | } |
| | | |
| | |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | |
| | | writer = new ServerWriter(session, this, |
| | | replicationServerDomain); |
| | | writer = new ServerWriter(session, this, replicationServerDomain); |
| | | reader = new ServerReader(session, this); |
| | | |
| | | session.setName("Replication server RS(" |
| | | + this.getReplicationServerId() |
| | | + ") session thread to " + this.toString() + " at " |
| | | session.setName("Replication server RS(" + getReplicationServerId() |
| | | + ") session thread to " + this + " at " |
| | | + session.getReadableRemoteAddress()); |
| | | session.start(); |
| | | try |
| | |
| | | // Create a thread to send heartbeat messages. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | String threadName = "Replication server RS(" |
| | | + this.getReplicationServerId() |
| | | + ") heartbeat publisher to " + this.toString() + " at " |
| | | String threadName = "Replication server RS(" + getReplicationServerId() |
| | | + ") heartbeat publisher to " + this + " at " |
| | | + session.getReadableRemoteAddress(); |
| | | heartbeatThread = new HeartbeatThread(threadName, session, |
| | | heartbeatInterval / 3); |
| | |
| | | */ |
| | | public boolean isReplicationServer() |
| | | { |
| | | return (!this.isDataServer()); |
| | | return !this.isDataServer(); |
| | | } |
| | | |
| | | |
| | |
| | | // it will be created and locked later in the method |
| | | if (!timedout) |
| | | { |
| | | // !timedout |
| | | if (!replicationServerDomain.hasLock()) |
| | | replicationServerDomain.lock(); |
| | | return; |
| | | } |
| | | else |
| | | |
| | | /** |
| | | * Take the lock on the domain. |
| | | * WARNING: Here we try to acquire the lock with a timeout. This |
| | | * is for preventing a deadlock that may happen if there are cross |
| | | * connection attempts (for same domain) from this replication |
| | | * server and from a peer one: |
| | | * Here is the scenario: |
| | | * - RS1 connect thread takes the domain lock and starts |
| | | * connection to RS2 |
| | | * - at the same time RS2 connect thread takes his domain lock and |
| | | * start connection to RS2 |
| | | * - RS2 listen thread starts processing received |
| | | * ReplServerStartMsg from RS1 and wants to acquire the lock on |
| | | * the domain (here) but cannot as RS2 connect thread already has |
| | | * it |
| | | * - RS1 listen thread starts processing received |
| | | * ReplServerStartMsg from RS2 and wants to acquire the lock on |
| | | * the domain (here) but cannot as RS1 connect thread already has |
| | | * it |
| | | * => Deadlock: 4 threads are locked. |
| | | * So to prevent that in such situation, the listen threads here |
| | | * will both timeout trying to acquire the lock. The random time |
| | | * for the timeout should allow on connection attempt to be |
| | | * aborted whereas the other one should have time to finish in the |
| | | * same time. |
| | | * Warning: the minimum time (3s) should be big enough to allow |
| | | * normal situation connections to terminate. The added random |
| | | * time should represent a big enough range so that the chance to |
| | | * have one listen thread timing out a lot before the peer one is |
| | | * great. When the first listen thread times out, the remote |
| | | * connect thread should release the lock and allow the peer |
| | | * listen thread to take the lock it was waiting for and process |
| | | * the connection attempt. |
| | | */ |
| | | Random random = new Random(); |
| | | int randomTime = random.nextInt(6); // Random from 0 to 5 |
| | | // Wait at least 3 seconds + (0 to 5 seconds) |
| | | long timeout = 3000 + (randomTime * 1000); |
| | | boolean lockAcquired = replicationServerDomain.tryLock(timeout); |
| | | if (!lockAcquired) |
| | | { |
| | | // timedout |
| | | /** |
| | | * Take the lock on the domain. |
| | | * WARNING: Here we try to acquire the lock with a timeout. This |
| | | * is for preventing a deadlock that may happen if there are cross |
| | | * connection attempts (for same domain) from this replication |
| | | * server and from a peer one: |
| | | * Here is the scenario: |
| | | * - RS1 connect thread takes the domain lock and starts |
| | | * connection to RS2 |
| | | * - at the same time RS2 connect thread takes his domain lock and |
| | | * start connection to RS2 |
| | | * - RS2 listen thread starts processing received |
| | | * ReplServerStartMsg from RS1 and wants to acquire the lock on |
| | | * the domain (here) but cannot as RS2 connect thread already has |
| | | * it |
| | | * - RS1 listen thread starts processing received |
| | | * ReplServerStartMsg from RS2 and wants to acquire the lock on |
| | | * the domain (here) but cannot as RS1 connect thread already has |
| | | * it |
| | | * => Deadlock: 4 threads are locked. |
| | | * So to prevent that in such situation, the listen threads here |
| | | * will both timeout trying to acquire the lock. The random time |
| | | * for the timeout should allow on connection attempt to be |
| | | * aborted whereas the other one should have time to finish in the |
| | | * same time. |
| | | * Warning: the minimum time (3s) should be big enough to allow |
| | | * normal situation connections to terminate. The added random |
| | | * time should represent a big enough range so that the chance to |
| | | * have one listen thread timing out a lot before the peer one is |
| | | * great. When the first listen thread times out, the remote |
| | | * connect thread should release the lock and allow the peer |
| | | * listen thread to take the lock it was waiting for and process |
| | | * the connection attempt. |
| | | */ |
| | | Random random = new Random(); |
| | | int randomTime = random.nextInt(6); // Random from 0 to 5 |
| | | // Wait at least 3 seconds + (0 to 5 seconds) |
| | | long timeout = 3000 + (randomTime * 1000); |
| | | boolean noTimeout = replicationServerDomain.tryLock(timeout); |
| | | if (!noTimeout) |
| | | { |
| | | // Timeout |
| | | Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get( |
| | | getBaseDN(), |
| | | serverId, |
| | | session.getReadableRemoteAddress(), |
| | | getReplicationServerId()); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get( |
| | | getBaseDN(), |
| | | serverId, |
| | | session.getReadableRemoteAddress(), |
| | | getReplicationServerId()); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | } |
| | | |
| | |
| | | session.close(); |
| | | } |
| | | |
| | | /* |
| | | * Stop the heartbeat thread. |
| | | */ |
| | | // Stop the heartbeat thread. |
| | | if (heartbeatThread != null) |
| | | { |
| | | heartbeatThread.shutdown(); |
| | |
| | | */ |
| | | try |
| | | { |
| | | if ((writer != null) && (!(Thread.currentThread().equals(writer)))) |
| | | if (writer != null && !Thread.currentThread().equals(writer)) |
| | | { |
| | | |
| | | writer.join(SHUTDOWN_JOIN_TIMEOUT); |
| | | } |
| | | if ((reader != null) && (!(Thread.currentThread().equals(reader)))) |
| | | if (reader != null && !Thread.currentThread().equals(reader)) |
| | | { |
| | | reader.join(SHUTDOWN_JOIN_TIMEOUT); |
| | | } |
| | |
| | | { |
| | | // loop until not interrupted |
| | | } |
| | | } while (((interrupted) || (!acquired)) && (!shutdownWriter)); |
| | | } while ((interrupted || !acquired) && !shutdownWriter); |
| | | if (msg != null) |
| | | { |
| | | incrementOutCount(); |
| | |
| | | if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | incrementAssuredSrSentUpdates(); |
| | | } else |
| | | } else if (!isDataServer()) |
| | | { |
| | | if (!isDataServer()) |
| | | incrementAssuredSdSentUpdates(); |
| | | incrementAssuredSdSentUpdates(); |
| | | } |
| | | } |
| | | } |
| | |
| | | */ |
| | | public RSInfo toRSInfo() |
| | | { |
| | | |
| | | return new RSInfo(serverId, serverURL, generationId, groupId, |
| | | weight); |
| | | return new RSInfo(serverId, serverURL, generationId, groupId, weight); |
| | | } |
| | | |
| | | /** |
| | | * Starts the monitoring publisher for the domain if not already started. |
| | | */ |
| | | protected void createMonitoringPublisher() |
| | | { |
| | | if (!replicationServerDomain.isRunningMonitoringPublisher()) |
| | | { |
| | | replicationServerDomain.startMonitoringPublisher(); |
| | | } |
| | | } |
| | | /** |
| | | * Update the send window size based on the credit specified in the |
| | | * given window message. |
| | | * |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | this.replicationServer.getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + ":" + |
| | | "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+ |
| | | "\nAND REPLIED:\n" + outStartMsg.toString()); |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + ", " + getClass().getSimpleName() + " " + this + ":" |
| | | + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg |
| | | + "\nAND REPLIED:\n" + outStartMsg); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | this.replicationServer.getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + ":" + |
| | | "\nSH START HANDSHAKE SENT("+ this + |
| | | "):\n" + outStartMsg.toString()+ |
| | | "\nAND RECEIVED:\n" + inStartMsg.toString()); |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + ", " + getClass().getSimpleName() + " " + this + ":" |
| | | + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n" |
| | | + inStartMsg); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | this.replicationServer.getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + ":" + |
| | | "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() + |
| | | "\nAND REPLIED:\n" + outTopoMsg.toString()); |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + ", " + getClass().getSimpleName() + " " + this + ":" |
| | | + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n" |
| | | + outTopoMsg); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | this.replicationServer.getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + ":" + |
| | | "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() + |
| | | "\nAND RECEIVED:\n" + inTopoMsg.toString()); |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + ", " + getClass().getSimpleName() + " " + this + ":" |
| | | + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n" |
| | | + inTopoMsg); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | this.replicationServer.getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + " :" + |
| | | "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() + |
| | | "\nAND REPLIED:\n" + outTopoMsg.toString()); |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + ", " + getClass().getSimpleName() + " " + this + " :" |
| | | + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg |
| | | + "\nAND REPLIED:\n" + outTopoMsg); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | this.replicationServer.getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + " :" + |
| | | "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE"); |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + ", " + getClass().getSimpleName() + " " + this + " :" |
| | | + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE"); |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + |
| | | this.replicationServer.getMonitorInstanceName() + ", " + |
| | | this.getClass().getSimpleName() + " " + this + " :" + |
| | | "\nSH SESSION HANDSHAKE RECEIVED:\n" + |
| | | inStartECLSessionMsg.toString()); |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + ", " + getClass().getSimpleName() + " " + this + " :" |
| | | + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public long getReferenceGenId() |
| | | { |
| | | long refgenid = -1; |
| | | if (replicationServerDomain!=null) |
| | | refgenid = replicationServerDomain.getGenerationId(); |
| | | return refgenid; |
| | | if (replicationServerDomain != null) |
| | | return replicationServerDomain.getGenerationId(); |
| | | return -1; |
| | | } |
| | | |
| | | /** |
| | |
| | | * @param update the update message received. |
| | | * @throws IOException when it occurs. |
| | | */ |
| | | public void put(UpdateMsg update) |
| | | throws IOException |
| | | public void put(UpdateMsg update) throws IOException |
| | | { |
| | | if (replicationServerDomain!=null) |
| | | replicationServerDomain.put(update, this); |