| | |
| | | import java.net.UnknownHostException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.HashSet; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.api.Backend; |
| | |
| | | import org.opends.server.api.RestoreTaskListener; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.WorkflowImpl; |
| | | import org.opends.server.core.networkgroups.NetworkGroup; |
| | | import org.opends.server.loggers.LogLevel; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.ExternalChangeLogSession; |
| | | import org.opends.server.replication.protocol.ProtocolSession; |
| | | import org.opends.server.replication.protocol.ReplServerStartMsg; |
| | | import org.opends.server.replication.protocol.ReplSessionSecurity; |
| | | import org.opends.server.replication.protocol.ReplicationMsg; |
| | | import org.opends.server.replication.protocol.ServerStartECLMsg; |
| | | import org.opends.server.replication.protocol.ServerStartMsg; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg; |
| | | import org.opends.server.types.BackupConfig; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.LDIFExportConfig; |
| | |
| | | import org.opends.server.types.RestoreConfig; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.util.LDIFReader; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private String externalChangeLogWorkflowID = "External Changelog Workflow ID"; |
| | | ECLWorkflowElement eclwe; |
| | | private static HashSet<Integer> localPorts = new HashSet<Integer>(); |
| | | |
| | | /** |
| | |
| | | ReplSessionSecurity.HANDSHAKE_TIMEOUT); |
| | | if (session == null) // Error, go back to accept |
| | | continue; |
| | | ServerHandler handler = new ServerHandler(session, queueSize); |
| | | handler.start(null, serverId, serverURL, rcvWindow, |
| | | false, this); |
| | | |
| | | ReplicationMsg msg = session.receive(); |
| | | |
| | | if (msg instanceof ServerStartMsg) |
| | | { |
| | | DataServerHandler handler = new DataServerHandler(session, |
| | | queueSize,serverURL,serverId,this,rcvWindow); |
| | | handler.startFromRemoteDS((ServerStartMsg)msg); |
| | | } |
| | | else if (msg instanceof ReplServerStartMsg) |
| | | { |
| | | ReplicationServerHandler handler = new ReplicationServerHandler( |
| | | session,queueSize,serverURL,serverId,this,rcvWindow); |
| | | handler.startFromRemoteRS((ReplServerStartMsg)msg); |
| | | } |
| | | else if (msg instanceof ServerStartECLMsg) |
| | | { |
| | | ECLServerHandler handler = new ECLServerHandler( |
| | | session,queueSize,serverURL,serverId,this,rcvWindow); |
| | | handler.startFromRemoteServer((ServerStartECLMsg)msg); |
| | | } |
| | | else |
| | | { |
| | | // We did not recognize the message, close session as what |
| | | // can happen after is undetermined and we do not want the server to |
| | | // be disturbed |
| | | ServerHandler.closeSession(session, null, null); |
| | | return; |
| | | } |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | // shutdown or changing the port number process. |
| | | // Just log debug information and loop. |
| | | // Do not log the message during shutdown. |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | if (shutdown == false) { |
| | | Message message = |
| | | ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()); |
| | |
| | | /** |
| | | * Establish a connection to the server with the address and port. |
| | | * |
| | | * @param serverURL The address and port for the server, separated by a |
| | | * @param remoteServerURL The address and port for the server, separated by a |
| | | * colon. |
| | | * @param baseDn The baseDn of the connection |
| | | */ |
| | | private void connect(String serverURL, String baseDn) |
| | | private void connect(String remoteServerURL, String baseDn) |
| | | { |
| | | int separator = serverURL.lastIndexOf(':'); |
| | | String port = serverURL.substring(separator + 1); |
| | | String hostname = serverURL.substring(0, separator); |
| | | boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL); |
| | | int separator = remoteServerURL.lastIndexOf(':'); |
| | | String port = remoteServerURL.substring(separator + 1); |
| | | String hostname = remoteServerURL.substring(0, separator); |
| | | boolean sslEncryption =replSessionSecurity.isSslEncryption(remoteServerURL); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " + this.getMonitorInstanceName() + |
| | | " connects to " + serverURL); |
| | | " connects to " + remoteServerURL); |
| | | |
| | | try |
| | | { |
| | |
| | | socket.setTcpNoDelay(true); |
| | | socket.connect(ServerAddr, 500); |
| | | |
| | | /* |
| | | ServerHandler handler = new ServerHandler( |
| | | replSessionSecurity.createClientSession(serverURL, socket, |
| | | ReplSessionSecurity.HANDSHAKE_TIMEOUT), |
| | | queueSize); |
| | | handler.start(baseDn, serverId, this.serverURL, rcvWindow, |
| | | sslEncryption, this); |
| | | */ |
| | | |
| | | ReplicationServerHandler handler = new ReplicationServerHandler( |
| | | replSessionSecurity.createClientSession(remoteServerURL, |
| | | socket, |
| | | ReplSessionSecurity.HANDSHAKE_TIMEOUT), |
| | | queueSize, |
| | | this.serverURL, |
| | | serverId, |
| | | this, |
| | | rcvWindow); |
| | | handler.connect(baseDn, sslEncryption); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | serverId , this); |
| | | listenThread.start(); |
| | | |
| | | // Initialize the External Changelog |
| | | // FIXME: how is WF creation enabed/disabled in the RS ? |
| | | initializeECL(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " successfully initialized"); |
| | |
| | | Message message = |
| | | ERR_COULD_NOT_BIND_CHANGELOG.get(changelogPort, e.getMessage()); |
| | | logError(message); |
| | | } catch (DirectoryException e) |
| | | { |
| | | //FIXME:DirectoryException is raised by initializeECL => fix err msg |
| | | Message message = Message.raw(Category.SYNC, Severity.SEVERE_ERROR, |
| | | "Directory Exception raised by ECL initialization: " + e.getMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Initializes the ECL access by creating a dedicated workflow element. |
| | | * @throws DirectoryException |
| | | */ |
| | | private void initializeECL() |
| | | throws DirectoryException |
| | | { |
| | | WorkflowImpl externalChangeLogWorkflow; |
| | | if (WorkflowImpl.getWorkflow(externalChangeLogWorkflowID) |
| | | !=null) |
| | | return; |
| | | |
| | | ECLWorkflowElement eclwe = new ECLWorkflowElement(this); |
| | | |
| | | // Create the workflow for the base DN and register the workflow with |
| | | // the server. |
| | | externalChangeLogWorkflow = new WorkflowImpl( |
| | | externalChangeLogWorkflowID, |
| | | DN.decode(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT), |
| | | eclwe.getWorkflowElementID(), |
| | | eclwe); |
| | | externalChangeLogWorkflow.register(); |
| | | |
| | | NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup(); |
| | | defaultNetworkGroup.registerWorkflow(externalChangeLogWorkflow); |
| | | |
| | | // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup? |
| | | NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup(); |
| | | adminNetworkGroup.registerWorkflow(externalChangeLogWorkflow); |
| | | |
| | | // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup? |
| | | NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup(); |
| | | internalNetworkGroup.registerWorkflow(externalChangeLogWorkflow); |
| | | |
| | | } |
| | | |
| | | private void finalizeECL() |
| | | { |
| | | WorkflowImpl eclwf = |
| | | (WorkflowImpl)WorkflowImpl.getWorkflow(externalChangeLogWorkflowID); |
| | | |
| | | // do it only if not already done by another RS (unit test case) |
| | | // if (DirectoryServer.getWorkflowElement(externalChangeLogWorkflowID) |
| | | if (eclwf!=null) |
| | | { |
| | | |
| | | |
| | | // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup? |
| | | NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup(); |
| | | internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID); |
| | | |
| | | // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup? |
| | | NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup(); |
| | | adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID); |
| | | |
| | | NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup(); |
| | | defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID); |
| | | |
| | | eclwf.deregister(); |
| | | eclwf.finalizeWorkflow(); |
| | | } |
| | | |
| | | eclwe = (ECLWorkflowElement) |
| | | DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG"); |
| | | if (eclwe!=null) |
| | | { |
| | | DirectoryServer.deregisterWorkflowElement(eclwe); |
| | | eclwe.finalizeWorkflowElement(); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Get the ReplicationServerDomain associated to the base DN given in |
| | | * parameter. |
| | | * |
| | |
| | | { |
| | | dbEnv.shutdown(); |
| | | } |
| | | |
| | | finalizeECL(); |
| | | } |
| | | |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of domains managed by this replication server. |
| | | * @return the number of domains managed. |
| | | */ |
| | | public int getCacheSize() |
| | | { |
| | | return baseDNs.size(); |
| | | } |
| | | |
| | | /** |
| | | * Create a new session to get the ECL. |
| | | * @param msg The message that specifies the ECL request. |
| | | * @return Returns the created session. |
| | | * @throws DirectoryException When an error occurs. |
| | | */ |
| | | public ExternalChangeLogSession createECLSession(StartECLSessionMsg msg) |
| | | throws DirectoryException |
| | | { |
| | | ExternalChangeLogSessionImpl session = |
| | | new ExternalChangeLogSessionImpl(this, msg); |
| | | return session; |
| | | } |
| | | |
| | | /** |
| | | * Getter on the server URL. |
| | | * @return the server URL. |
| | | */ |
| | | public String getServerURL() |
| | | { |
| | | return this.serverURL; |
| | | } |
| | | |
| | | /** |
| | | * This method allows to check if the Replication Server given |
| | | * as the parameter is running in the local JVM. |
| | | * |