| | |
| | | |
| | | import java.io.File; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.List; |
| | | import java.util.AbstractMap.SimpleImmutableEntry; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | |
| | | |
| | | try |
| | | { |
| | | EnvironmentConfig envConfig = new EnvironmentConfig(); |
| | | |
| | | /* |
| | | * Create the DB Environment that will be used for all the |
| | | * ReplicationServer activities related to the db |
| | | */ |
| | | envConfig.setAllowCreate(true); |
| | | envConfig.setTransactional(true); |
| | | envConfig.setConfigParam(STATS_COLLECT, "false"); |
| | | envConfig.setConfigParam(CLEANER_THREADS, "2"); |
| | | envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true"); |
| | | /* |
| | | * Tests have shown that since the parsing of the Replication log is |
| | | * always done sequentially, it is not necessary to use a large DB cache. |
| | | */ |
| | | if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024) |
| | | { |
| | | /* |
| | | * If the JVM is reasonably large then we can safely default to bigger |
| | | * read buffers. This will result in more scalable checkpointer and |
| | | * cleaner performance. |
| | | */ |
| | | envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2)); |
| | | envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2)); |
| | | envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4)); |
| | | |
| | | /* |
| | | * The cache size must be bigger in order to accommodate the larger |
| | | * buffers - see OPENDJ-943. |
| | | */ |
| | | envConfig.setConfigParam(MAX_MEMORY, mb(16)); |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | * Use 5M so that the replication can be used with 64M total for the |
| | | * JVM. |
| | | */ |
| | | envConfig.setConfigParam(MAX_MEMORY, mb(5)); |
| | | } |
| | | |
| | | // Since records are always added at the end of the Replication log and |
| | | // deleted at the beginning of the Replication log, this should never |
| | | // cause any deadlock. |
| | | envConfig.setTxnTimeout(0, TimeUnit.SECONDS); |
| | | envConfig.setLockTimeout(0, TimeUnit.SECONDS); |
| | | |
| | | // Since replication provides durability, we can reduce the DB durability |
| | | // level so that we are immune to application / JVM crashes. |
| | | envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC); |
| | | |
| | | dbEnvironment = new Environment(new File(path), envConfig); |
| | | dbEnvironment = openJEEnvironment(path); |
| | | |
| | | /* |
| | | * One database is created to store the update from each LDAP server in |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Open a JE environment. |
| | | * <p> |
| | | * protected so it can be overridden by tests. |
| | | * |
| | | * @param path |
| | | * the path to the JE environment in the filesystem |
| | | * @return the opened JE environment |
| | | */ |
| | | protected Environment openJEEnvironment(String path) |
| | | { |
| | | final EnvironmentConfig envConfig = new EnvironmentConfig(); |
| | | |
| | | /* |
| | | * Create the DB Environment that will be used for all the |
| | | * ReplicationServer activities related to the db |
| | | */ |
| | | envConfig.setAllowCreate(true); |
| | | envConfig.setTransactional(true); |
| | | envConfig.setConfigParam(STATS_COLLECT, "false"); |
| | | envConfig.setConfigParam(CLEANER_THREADS, "2"); |
| | | envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true"); |
| | | /* |
| | | * Tests have shown that since the parsing of the Replication log is |
| | | * always done sequentially, it is not necessary to use a large DB cache. |
| | | */ |
| | | if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024) |
| | | { |
| | | /* |
| | | * If the JVM is reasonably large then we can safely default to bigger |
| | | * read buffers. This will result in more scalable checkpointer and |
| | | * cleaner performance. |
| | | */ |
| | | envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2)); |
| | | envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2)); |
| | | envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4)); |
| | | |
| | | /* |
| | | * The cache size must be bigger in order to accommodate the larger |
| | | * buffers - see OPENDJ-943. |
| | | */ |
| | | envConfig.setConfigParam(MAX_MEMORY, mb(16)); |
| | | } |
| | | else |
| | | { |
| | | /* |
| | | * Use 5M so that the replication can be used with 64M total for the |
| | | * JVM. |
| | | */ |
| | | envConfig.setConfigParam(MAX_MEMORY, mb(5)); |
| | | } |
| | | |
| | | // Since records are always added at the end of the Replication log and |
| | | // deleted at the beginning of the Replication log, this should never |
| | | // cause any deadlock. |
| | | envConfig.setTxnTimeout(0, TimeUnit.SECONDS); |
| | | envConfig.setLockTimeout(0, TimeUnit.SECONDS); |
| | | |
| | | // Since replication provides durability, we can reduce the DB durability |
| | | // level so that we are immune to application / JVM crashes. |
| | | envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC); |
| | | |
| | | return new Environment(new File(path), envConfig); |
| | | } |
| | | |
| | | private String kb(int sizeInKb) |
| | | { |
| | | return String.valueOf(sizeInKb * 1024); |
| | |
| | | return String.valueOf(sizeInMb * 1024 * 1024); |
| | | } |
| | | |
| | | private Database openDatabase(String databaseName) throws ChangelogException, |
| | | RuntimeException |
| | | /** |
| | | * Open a JE database. |
| | | * <p> |
| | | * protected so it can be overridden by tests. |
| | | * |
| | | * @param databaseName |
| | | * the databaseName to open |
| | | * @return the opened JE database |
| | | * @throws ChangelogException |
| | | * if a problem happened opening the database |
| | | * @throws RuntimeException |
| | | * if a problem happened with the JE database |
| | | */ |
| | | protected Database openDatabase(String databaseName) |
| | | throws ChangelogException, RuntimeException |
| | | { |
| | | if (isShuttingDown.get()) |
| | | { |
| | |
| | | */ |
| | | public ChangelogState readChangelogState() throws ChangelogException |
| | | { |
| | | return decodeChangelogState(readWholeState()); |
| | | } |
| | | |
| | | /** |
| | | * Decode the whole changelog state DB. |
| | | * |
| | | * @param wholeState |
| | | * the whole changelog state DB as a Map. |
| | | * The Map is only used as a convenient collection of key => data objects |
| | | * @return the decoded changelog state |
| | | * @throws ChangelogException |
| | | * if a problem occurred while decoding |
| | | */ |
| | | ChangelogState decodeChangelogState(Map<byte[], byte[]> wholeState) |
| | | throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | final ChangelogState result = new ChangelogState(); |
| | | for (Entry<byte[], byte[]> entry : wholeState.entrySet()) |
| | | { |
| | | final String stringKey = toString(entry.getKey()); |
| | | final String stringData = toString(entry.getValue()); |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | debug("read (key, value)=(" + stringKey + ", " + stringData + ")"); |
| | | } |
| | | |
| | | final String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | if (str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | final long generationId = toLong(str[1]); |
| | | final DN baseDN = DN.valueOf(str[2]); |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | debug("has read generationId: baseDN=" + baseDN + " generationId=" |
| | | + generationId); |
| | | } |
| | | result.setDomainGenerationId(baseDN, generationId); |
| | | } |
| | | else |
| | | { |
| | | final int serverId = toInt(str[0]); |
| | | final DN baseDN = DN.valueOf(str[1]); |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId); |
| | | } |
| | | result.addServerIdToDomain(serverId, baseDN); |
| | | } |
| | | } |
| | | return result; |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | throw new ChangelogException(e.getMessageObject(), e); |
| | | } |
| | | } |
| | | |
| | | private Map<byte[], byte[]> readWholeState() throws ChangelogException |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | Cursor cursor = changelogStateDb.openCursor(null, null); |
| | | |
| | | try |
| | | { |
| | | final ChangelogState result = new ChangelogState(); |
| | | final Map<byte[], byte[]> results = new LinkedHashMap<byte[], byte[]>(); |
| | | |
| | | OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); |
| | | while (status == OperationStatus.SUCCESS) |
| | | { |
| | | final String stringData = toString(data.getData()); |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | debug("read (" + GENERATION_ID_TAG + " generationId baseDN) OR " |
| | | + "(serverId baseDN): " + stringData); |
| | | |
| | | final String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | if (str[0].equals(GENERATION_ID_TAG)) |
| | | { |
| | | long generationId = toLong(str[1]); |
| | | DN baseDN = DN.valueOf(str[2]); |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | debug("has read baseDN=" + baseDN + " generationId=" +generationId); |
| | | |
| | | result.setDomainGenerationId(baseDN, generationId); |
| | | } |
| | | else |
| | | { |
| | | int serverId = toInt(str[0]); |
| | | DN baseDN = DN.valueOf(str[1]); |
| | | |
| | | if (logger.isTraceEnabled()) |
| | | debug("has read: baseDN=" + baseDN + " serverId=" + serverId); |
| | | |
| | | result.addServerIdToDomain(serverId, baseDN); |
| | | } |
| | | |
| | | results.put(key.getData(), data.getData()); |
| | | status = cursor.getNext(key, data, LockMode.DEFAULT); |
| | | } |
| | | |
| | | return result; |
| | | return results; |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | | final LocalizableMessage message = ERR_JEB_DATABASE_EXCEPTION.get(e.getMessage()); |
| | | throw new ChangelogException(message, e); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | throw new ChangelogException(e.getMessageObject(), e); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | |
| | | try |
| | | { |
| | | return Integer.parseInt(data); |
| | | } catch (NumberFormatException e) |
| | | } |
| | | catch (NumberFormatException e) |
| | | { |
| | | // should never happen |
| | | // TODO: i18n |
| | |
| | | } |
| | | } |
| | | |
| | | private byte[] toBytes(String s) |
| | | /** |
| | | * Converts the string to a UTF8-encoded byte array. |
| | | * |
| | | * @param s |
| | | * the string to convert |
| | | * @return the byte array representation of the UTF8-encoded string |
| | | */ |
| | | static byte[] toBytes(String s) |
| | | { |
| | | try |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Finds or creates the database used to store changes from the server with |
| | | * the given serverId and the given baseDN. |
| | | * Finds or creates the database used to store changes for a replica with the |
| | | * given baseDN and serverId. |
| | | * |
| | | * @param serverId |
| | | * The server id that identifies the server. |
| | |
| | | * @throws ChangelogException |
| | | * in case of underlying Exception. |
| | | */ |
| | | public Database getOrAddDb(int serverId, DN baseDN, long generationId) |
| | | public Database getOrAddReplicationDB(int serverId, DN baseDN, long generationId) |
| | | throws ChangelogException |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDN + ", " |
| | | + generationId + ")"); |
| | | } |
| | | try |
| | | { |
| | | // JNR: redundant info is stored between the key and data down below. |
| | | // It is probably ok since "changelogstate" DB does not receive a high |
| | | // volume of inserts. |
| | | final String serverIdToBaseDn = buildServerIdKey(baseDN, serverId); |
| | | Entry<String, String> replicaEntry = toReplicaEntry(baseDN, serverId); |
| | | |
| | | // Opens the DB for the changes received from this server on this domain. |
| | | Database db = openDatabase(serverIdToBaseDn); |
| | | final Database replicaDB = openDatabase(replicaEntry.getKey()); |
| | | |
| | | putInChangelogStateDBIfNotExist(serverIdToBaseDn, serverIdToBaseDn); |
| | | putInChangelogStateDBIfNotExist(buildGenIdKey(baseDN), |
| | | buildGenIdData(baseDN, generationId)); |
| | | return db; |
| | | putInChangelogStateDBIfNotExist(replicaEntry); |
| | | putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId)); |
| | | return replicaDB; |
| | | } |
| | | catch (RuntimeException e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private String buildGenIdKey(DN baseDN) |
| | | /** |
| | | * Return an entry to store in the changelog state database representing a |
| | | * replica in the topology. |
| | | * |
| | | * @param baseDN |
| | | * the replica's baseDN |
| | | * @param serverId |
| | | * the replica's serverId |
| | | * @return a database entry for the replica |
| | | */ |
| | | Entry<String, String> toReplicaEntry(DN baseDN, int serverId) |
| | | { |
| | | return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDN.toNormalizedString(); |
| | | final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString(); |
| | | return new SimpleImmutableEntry<String, String>(key, key); |
| | | } |
| | | |
| | | private String buildServerIdKey(DN baseDN, int serverId) |
| | | /** |
| | | * Return an entry to store in the changelog state database representing the |
| | | * domain generation id. |
| | | * |
| | | * @param baseDN |
| | | * the domain's baseDN |
| | | * @param generationId |
| | | * the domain's generationId |
| | | * @return a database entry for the generationId |
| | | */ |
| | | Entry<String, String> toGenIdEntry(DN baseDN, long generationId) |
| | | { |
| | | return serverId + FIELD_SEPARATOR + baseDN.toNormalizedString(); |
| | | final String normDn = baseDN.toNormalizedString(); |
| | | final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn; |
| | | final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId |
| | | + FIELD_SEPARATOR + normDn; |
| | | return new SimpleImmutableEntry<String, String>(key, data); |
| | | } |
| | | |
| | | private String buildGenIdData(DN baseDN, long generationId) |
| | | private void putInChangelogStateDBIfNotExist(Entry<String, String> entry) |
| | | throws RuntimeException |
| | | { |
| | | return GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR |
| | | + baseDN.toNormalizedString(); |
| | | } |
| | | |
| | | private void putInChangelogStateDBIfNotExist(String keyString, |
| | | String dataString) throws RuntimeException |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(toBytes(keyString)); |
| | | DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey())); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND) |
| | | { |
| | | Transaction txn = dbEnvironment.beginTransaction(null, null); |
| | | try |
| | | { |
| | | data.setData(toBytes(dataString)); |
| | | data.setData(toBytes(entry.getValue())); |
| | | if (logger.isTraceEnabled()) |
| | | debug("putting record in the changelogstate Db key=[" + keyString |
| | | + "] value=[" + dataString + "]"); |
| | | { |
| | | debug("putting record in the changelogstate Db key=[" |
| | | + entry.getKey() + "] value=[" + entry.getValue() + "]"); |
| | | } |
| | | changelogStateDb.put(txn, key, data); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | | } |
| | |
| | | */ |
| | | public void clearGenerationId(DN baseDN) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(buildGenIdKey(baseDN), |
| | | final int unusedGenId = 0; |
| | | deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId), |
| | | "clearGenerationId(baseDN=" + baseDN + ")"); |
| | | } |
| | | |
| | |
| | | */ |
| | | public void clearServerId(DN baseDN, int serverId) throws ChangelogException |
| | | { |
| | | deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId), |
| | | deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId), |
| | | "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); |
| | | } |
| | | |
| | | private void deleteFromChangelogStateDB(String keyString, |
| | | private void deleteFromChangelogStateDB(Entry<String, ?> entry, |
| | | String methodInvocation) throws ChangelogException |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | debug(methodInvocation + " starting"); |
| | | } |
| | | |
| | | try |
| | | { |
| | | final DatabaseEntry key = new DatabaseEntry(toBytes(keyString)); |
| | | final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey())); |
| | | final DatabaseEntry data = new DatabaseEntry(); |
| | | if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS) |
| | | { |
| | |
| | | changelogStateDb.delete(txn, key); |
| | | txn.commit(Durability.COMMIT_WRITE_NO_SYNC); |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | debug(methodInvocation + " succeeded"); |
| | | } |
| | | } |
| | | catch (RuntimeException dbe) |
| | | { |
| | |
| | | else |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | debug(methodInvocation + " failed: key=[ " + keyString |
| | | { |
| | | debug(methodInvocation + " failed: key=[ " + entry.getKey() |
| | | + "] not found"); |
| | | } |
| | | } |
| | | } |
| | | catch (RuntimeException dbe) |
| | |
| | | try |
| | | { |
| | | if (txn != null) |
| | | { |
| | | txn.abort(); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { /* do nothing */ } |