| | |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.Message; |
| | | |
| | | import org.opends.messages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | import org.opends.messages.MessageBuilder; |
| | | import static org.opends.server.util.StaticUtils.getFileForPath; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.ResultCode; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | |
| | | import com.sleepycat.je.DatabaseException; |
| | | |
| | |
| | | private int queueSize; |
| | | private String dbDirname = null; |
| | | private long trimAge; // the time (in sec) after which the changes must |
| | | // be deleted from the persistent storage. |
| | | private int replicationPort; |
| | | // de deleted from the persistent storage. |
| | | private boolean stopListen = false; |
| | | private ReplSessionSecurity replSessionSecurity; |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * Creates a new Replication server using the provided configuration entry. |
| | | * |
| | | * @param configuration The configuration of this replication server. |
| | |
| | | // The socket has probably been closed as part of the |
| | | // shutdown or changing the port number process. |
| | | // just log debug information and loop. |
| | | Message message = DEBUG_REPLICATION_PORT_IOEXCEPTION.get(); |
| | | Message message = ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()); |
| | | logError(message); |
| | | } |
| | | } |
| | |
| | | String hostname = serverURL.substring(0, separator); |
| | | boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " + this.getMonitorInstanceName() + |
| | | " connects to " + serverURL); |
| | | |
| | | try |
| | | { |
| | | InetSocketAddress ServerAddr = new InetSocketAddress( |
| | |
| | | listenSocket.bind(new InetSocketAddress(changelogPort)); |
| | | |
| | | /* |
| | | * create working threads |
| | | * creates working threads |
| | | * We must first connect, then start to listen. |
| | | */ |
| | | listenThread = |
| | | new ReplicationServerListenThread("Replication Server Listener", this); |
| | | listenThread.start(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " creates connect threads"); |
| | | connectThread = |
| | | new ReplicationServerConnectThread("Replication Server Connect", this); |
| | | connectThread.start(); |
| | | |
| | | // FIXME : Is it better to have the time to receive the ReplServerInfo |
| | | // from all the other replication servers since this info is necessary |
| | | // to route an early received total update request. |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " creates listen threads"); |
| | | |
| | | listenThread = |
| | | new ReplicationServerListenThread("Replication Server Listener", this); |
| | | listenThread.start(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " successfully initialized"); |
| | | |
| | | } catch (DatabaseException e) |
| | | { |
| | | Message message = ERR_COULD_NOT_INITIALIZE_DB.get(dbDirname); |
| | | Message message = ERR_COULD_NOT_INITIALIZE_DB.get( |
| | | getFileForPath(dbDirname).getAbsolutePath()); |
| | | logError(message); |
| | | } catch (ReplicationDBException e) |
| | | { |
| | | Message message = ERR_COULD_NOT_READ_DB.get(dbDirname); |
| | | Message message = ERR_COULD_NOT_READ_DB.get(dbDirname, |
| | | e.getLocalizedMessage()); |
| | | logError(message); |
| | | } catch (UnknownHostException e) |
| | | { |
| | |
| | | * Get the ReplicationCache associated to the base DN given in parameter. |
| | | * |
| | | * @param baseDn The base Dn for which the ReplicationCache must be returned. |
| | | * @param create Specifies whether to create the ReplicationCache if it does |
| | | * not already exist. |
| | | * @return The ReplicationCache associated to the base DN given in parameter. |
| | | */ |
| | | public ReplicationCache getReplicationCache(DN baseDn) |
| | | public ReplicationCache getReplicationCache(DN baseDn, boolean create) |
| | | { |
| | | ReplicationCache replicationCache; |
| | | |
| | | synchronized (baseDNs) |
| | | { |
| | | replicationCache = baseDNs.get(baseDn); |
| | | if (replicationCache == null) |
| | | if ((replicationCache == null) && (create)) |
| | | { |
| | | replicationCache = new ReplicationCache(baseDn, this); |
| | | baseDNs.put(baseDn, replicationCache); |
| | | baseDNs.put(baseDn, replicationCache); |
| | | } |
| | | } |
| | | |
| | | return replicationCache; |
| | |
| | | */ |
| | | public void shutdown() |
| | | { |
| | | if (shutdown) |
| | | return; |
| | | |
| | | shutdown = true; |
| | | |
| | | // shutdown the connect thread |
| | |
| | | // replication Server service is closing anyway. |
| | | } |
| | | |
| | | // shutdown the listen thread |
| | | if (listenThread != null) |
| | | { |
| | | listenThread.interrupt(); |
| | | } |
| | | |
| | | // shutdown all the ChangelogCaches |
| | | for (ReplicationCache replicationCache : baseDNs.values()) |
| | | { |
| | |
| | | * |
| | | * @param id The serverId for which the dbHandler must be created. |
| | | * @param baseDn The DN for which the dbHandler muste be created. |
| | | * @param generationId The generationId for this server and this domain. |
| | | * @return The new DB handler for this ReplicationServer and the serverId and |
| | | * DN given in parameter. |
| | | * @throws DatabaseException in case of underlying database problem. |
| | | */ |
| | | DbHandler newDbHandler(short id, DN baseDn) throws DatabaseException |
| | | public DbHandler newDbHandler(short id, DN baseDn, long generationId) |
| | | throws DatabaseException |
| | | { |
| | | return new DbHandler(id, baseDn, this, dbEnv); |
| | | return new DbHandler(id, baseDn, this, dbEnv, generationId); |
| | | } |
| | | |
| | | /** |
| | | * Clears the generationId for the domain related to the provided baseDn. |
| | | * @param baseDn The baseDn for which to delete the generationId. |
| | | * @throws DatabaseException When it occurs. |
| | | */ |
| | | public void clearGenerationId(DN baseDn) |
| | | throws DatabaseException |
| | | { |
| | | try |
| | | { |
| | | dbEnv.clearGenerationId(baseDn); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugInfo( |
| | | "In RS <" + getMonitorInstanceName() + |
| | | " Exception in clearGenerationId" + |
| | | stackTraceToSingleLineString(e) + e.getLocalizedMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | Attribute bases = new Attribute(baseType, "base-dn", baseValues); |
| | | attributes.add(bases); |
| | | |
| | | // Publish to monitor the generation ID by domain |
| | | AttributeType generationIdType= |
| | | DirectoryServer.getAttributeType("base-dn-generation-id", true); |
| | | LinkedHashSet<AttributeValue> generationIdValues = |
| | | new LinkedHashSet<AttributeValue>(); |
| | | for (DN base : baseDNs.keySet()) |
| | | { |
| | | long generationId=-1; |
| | | ReplicationCache cache = getReplicationCache(base, false); |
| | | if (cache != null) |
| | | generationId = cache.getGenerationId(); |
| | | generationIdValues.add(new AttributeValue(generationIdType, |
| | | base.toString() + " " + generationId)); |
| | | } |
| | | Attribute generationIds = new Attribute(generationIdType, "generation-id", |
| | | generationIdValues); |
| | | attributes.add(generationIds); |
| | | |
| | | return attributes; |
| | | } |
| | | |
| | | /** |
| | | * Get the value of generationId for the replication domain |
| | | * associated with the provided baseDN. |
| | | * |
| | | * @param baseDN The baseDN of the domain. |
| | | * @return The value of the generationID. |
| | | */ |
| | | public long getGenerationId(DN baseDN) |
| | | { |
| | | ReplicationCache rc = this.getReplicationCache(baseDN, false); |
| | | if (rc!=null) |
| | | return rc.getGenerationId(); |
| | | return -1; |
| | | } |
| | | |
| | | /** |
| | | * Get the serverId for this replication server. |
| | | * |
| | | * @return The value of the serverId. |
| | | * |
| | | */ |
| | | public short getServerId() |
| | | { |
| | | return serverId; |
| | | } |
| | | } |