mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
16.05.2014 3f27a7ede5ca9df06137254aa32d41d023ac105d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
 
import static com.forgerock.opendj.util.Validator.*;
 
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
 
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
 
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.util.StaticUtils;
 
import com.forgerock.opendj.util.Pair;
 
/**
 * A log reader with binary search support.
 * <p>
 * The log file contains record offsets at fixed block size : given a block size N,
 * an offset is written at every N bytes. The offset contains the number of bytes to
 * reach the beginning of previous record (or next record if offset equals 0).
 * <p>
 * The reader provides both sequential access, using the {@code readRecord()} method,
 * and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
 * @param <V>
 *          Type of the value of a record.
 */
class BlockLogReader<K extends Comparable<K>, V> implements Closeable
{
  static final int SIZE_OF_BLOCK_OFFSET = 4;
 
  static final int SIZE_OF_RECORD_SIZE = 4;
 
  /**
   * Size of a block, after which an offset to the nearest record is written.
   * <p>
   * This value has been fixed based on performance tests. See
   * <a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1472">
   * OPENDJ-1472</a> for details.
   */
  static final int BLOCK_SIZE = 256;
 
  private final int blockSize;
 
  private final RecordParser<K, V> parser;
 
  private final RandomAccessFile reader;
 
  private final File file;
 
  /**
   * Creates a reader for the provided file, file reader and parser.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param file
   *          The log file to read.
   * @param reader
   *          The random access reader on the log file.
   * @param parser
   *          The parser to decode the records read.
   * @return a new log reader
   */
  static <K extends Comparable<K>, V> BlockLogReader<K, V> newReader(
      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser)
  {
    return new BlockLogReader<K, V>(file, reader, parser, BLOCK_SIZE);
  }
 
  /**
   * Creates a reader for the provided file, file reader, parser and block size.
   * <p>
   * This method is intended for tests only, to allow tuning of the block size.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param file
   *          The log file to read.
   * @param reader
   *          The random access reader on the log file.
   * @param parser
   *          The parser to decode the records read.
   * @param blockSize
   *          The size of each block, or frequency at which the record offset is
   *          present in the log file.
   * @return a new log reader
   */
  static <K extends Comparable<K>, V> BlockLogReader<K, V> newReaderForTests(
      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, int blockSize)
  {
    return new BlockLogReader<K, V>(file, reader, parser, blockSize);
  }
 
  private BlockLogReader(
      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, final int blockSize)
  {
    this.file = file;
    this.reader = reader;
    this.parser = parser;
    this.blockSize = blockSize;
  }
 
  /**
   * Position the reader to the record corresponding to the provided key and
   * matching and positioning strategies. Returns the last record read.
   *
   * @param key
   *          Key to use as a start position. Key must not be {@code null}.
   * @param matchStrategy
   *          The key matching strategy.
   * @param positionStrategy
   *          The positioning strategy.
   * @return The pair (key_found, last_record_read). key_found is a boolean
   *         indicating if reader is successfully positioned. last_record_read
   *         is the last record that was read. When key_found is equals to
   *         {@code false}, then last_record_read is always {@code null}. When
   *         key_found is equals to {@code true}, last_record_read can be valued
   *         or be {@code null}
   * @throws ChangelogException
   *           If an error occurs when seeking the key.
   */
  public Pair<Boolean, Record<K,V>> seekToRecord(
      final K key,
      final KeyMatchingStrategy matchStrategy,
      final PositionStrategy positionStrategy)
          throws ChangelogException
  {
    ensureNotNull(key);
    final long markerPosition = searchClosestBlockStartToKey(key);
    if (markerPosition >= 0)
    {
      return positionToKeySequentially(markerPosition, key, matchStrategy, positionStrategy);
    }
    return Pair.of(false, null);
  }
 
  /**
   * Position the reader to the provided file position.
   *
   * @param filePosition
   *            offset from the beginning of the file, in bytes.
   * @throws ChangelogException
   *            If an error occurs.
   */
  public void seekToPosition(final long filePosition) throws ChangelogException
  {
    try
    {
      reader.seek(filePosition);
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(filePosition, file.getPath()), e);
    }
  }
 
  /**
   * Read a record from current position.
   *
   * @return the record read
   * @throws ChangelogException
   *            If an error occurs during read.
   */
  public Record<K,V> readRecord() throws ChangelogException
  {
    return readRecord(-1);
  }
 
  /**
   * Returns the file position for this reader.
   *
   * @return the position of reader on the log file
   * @throws ChangelogException
   *          If an error occurs.
   */
  public long getFilePosition() throws ChangelogException
  {
    try
    {
      return reader.getFilePointer();
    }
    catch (IOException e)
    {
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(file.getPath()), e);
    }
  }
 
  /** {@inheritDoc} */
  @Override
  public void close() throws IOException
  {
    reader.close();
  }
 
  /**
   * Read a record, either from the provided start of block position or from
   * the current position.
   *
   * @param blockStartPosition
   *          The position of start of block, where a record offset is written.
   *          If provided value is -1, then record is read from current position instead.
   * @return the record read
   * @throws ChangelogException
   *           If an error occurs during read.
   */
  private Record<K,V> readRecord(final long blockStartPosition) throws ChangelogException
  {
    try
    {
      if (blockStartPosition != -1)
      {
        positionToRecordFromBlockStart(blockStartPosition);
      }
      final ByteString recordData = readNextRecord();
      return recordData != null ? parser.decodeRecord(recordData) : null;
    }
    catch (Exception io)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
    }
  }
 
  /**
   * Position to the record given by the offset read from provided block
   * start.
   *
   * @param blockStartPosition
   *          Position of read pointer in the file, expected to be the start of
   *          a block where a record offset is written.
   * @throws IOException
   *           If an error occurs during read.
   */
  private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException
  {
    reader.seek(blockStartPosition);
    if (blockStartPosition > 0)
    {
      final byte[] offsetData = new byte[SIZE_OF_BLOCK_OFFSET];
      reader.readFully(offsetData);
      final int offsetToRecord = ByteString.wrap(offsetData).toInt();
      if (offsetToRecord > 0)
      {
        reader.seek(blockStartPosition - offsetToRecord);
      } // if offset is zero, reader is already well positioned
    }
  }
 
  /**
   * Reads the next record.
   *
   * @return the bytes of the next record, or {@code null} if no record is available
   * @throws IOException
   *            If an error occurs while reading.
   */
  private ByteString readNextRecord() throws IOException
  {
    try
    {
      // read length of record
      final long filePosition = reader.getFilePointer();
      int distanceToBlockStart = getDistanceToNextBlockStart(filePosition, blockSize);
      final int recordLength = readRecordLength(distanceToBlockStart);
 
      // read the record
      long currentPosition = reader.getFilePointer();
      distanceToBlockStart = getDistanceToNextBlockStart(currentPosition, blockSize);
      final ByteStringBuilder recordBytes =
          new ByteStringBuilder(getLengthOfStoredRecord(recordLength, distanceToBlockStart));
      int remainingBytesToRead = recordLength;
      while (distanceToBlockStart < remainingBytesToRead)
      {
        if (distanceToBlockStart != 0)
        {
          recordBytes.append(reader, distanceToBlockStart);
        }
        // skip the offset
        reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
 
        // next step
        currentPosition += distanceToBlockStart + SIZE_OF_BLOCK_OFFSET;
        remainingBytesToRead -= distanceToBlockStart;
        distanceToBlockStart = blockSize - SIZE_OF_BLOCK_OFFSET;
      }
      if (remainingBytesToRead > 0)
      {
        // last bytes of the record
        recordBytes.append(reader, remainingBytesToRead);
      }
      return recordBytes.toByteString();
    }
    catch (EOFException e)
    {
      // end of stream, no record or uncomplete record
      return null;
    }
  }
 
  /**
   * Returns the total length in bytes taken by a record when stored in log file,
   * including size taken by block offsets.
   *
   * @param recordLength
   *            The length of record to write.
   * @param distanceToBlockStart
   *            Distance before the next block start.
   * @return the length in bytes necessary to store the record in the log
   */
  int getLengthOfStoredRecord(int recordLength, int distanceToBlockStart)
  {
    int totalLength = recordLength;
    if (recordLength > distanceToBlockStart)
    {
      totalLength += SIZE_OF_BLOCK_OFFSET;
      final int remainingBlocks = (recordLength - distanceToBlockStart -1) / (blockSize - SIZE_OF_BLOCK_OFFSET);
      totalLength += remainingBlocks * SIZE_OF_BLOCK_OFFSET;
    }
    return totalLength;
  }
 
  /** Read the length of a record. */
  private int readRecordLength(final int distanceToBlockStart) throws IOException
  {
    final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE);
    if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
    {
      lengthBytes.append(reader, distanceToBlockStart);
      // skip the offset
      reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
      lengthBytes.append(reader, SIZE_OF_RECORD_SIZE - distanceToBlockStart);
      return lengthBytes.toByteString().toInt();
    }
    else
    {
      if (distanceToBlockStart == 0)
      {
        // skip the offset
        reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
      }
      lengthBytes.append(reader, SIZE_OF_RECORD_SIZE);
      return lengthBytes.toByteString().toInt();
    }
  }
 
  /**
   * Search the closest block start to the provided key, using binary search.
   * <p>
   * Note that position of reader is modified by this method.
   *
   * @param key
   *          The key to search
   * @return the file position of block start that must be used to find the given key,
   *      or a negative number if no position could be found.
   * @throws ChangelogException
   *          if a problem occurs
   */
  long searchClosestBlockStartToKey(K key) throws ChangelogException
  {
    final long maxPos = getFileLength() - 1;
    long lowPos = 0L;
    long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
 
    while (lowPos <= highPos)
    {
      final long middlePos = Math.min((lowPos + highPos) / 2, maxPos);
      final long middleBlockStartPos = getClosestBlockStartBeforeOrAtPosition(middlePos);
      final Record<K, V> middleRecord = readRecord(middleBlockStartPos);
      if (middleRecord == null)
      {
        return -1;
      }
 
      final int keyComparison = middleRecord.getKey().compareTo(key);
      if (keyComparison < 0)
      {
        if (middleBlockStartPos <= lowPos)
        {
          return lowPos;
        }
        lowPos = middleBlockStartPos;
      }
      else if (keyComparison > 0)
      {
        if (middleBlockStartPos >= highPos)
        {
          return highPos;
        }
        highPos = middleBlockStartPos;
      }
      else
      {
        return middleBlockStartPos;
      }
    }
    // Unable to find a position where key can be found
    return -1;
  }
 
  private long getFileLength() throws ChangelogException
  {
    try
    {
      return reader.length();
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH.get(file.getPath()), e);
    }
  }
 
  /**
   * Position before, at or after provided key, starting from provided block
   * start position and reading sequentially until key is found according to
   * matching and positioning strategies.
   *
   * @param blockStartPosition
   *          Position of read pointer in the file, expected to be the start of
   *          a block where a record offset is written
   * @param key
   *          The key to find
   * @param matchStrategy
   *          The key matching strategy
   * @param positionStrategy
   *          The positioning strategy
   * @return The pair ({@code true}, selected record) if reader is successfully
   *         positioned (selected record may be null if end of file is reached),
   *         ({@code false}, null) otherwise.
   * @throws ChangelogException
   *           If an error occurs.
   */
   Pair<Boolean, Record<K,V>> positionToKeySequentially(final long blockStartPosition, final K key,
       final KeyMatchingStrategy matchStrategy, final PositionStrategy positionStrategy) throws ChangelogException
   {
    Record<K,V> record = readRecord(blockStartPosition);
    Record<K,V> previousRecord = null;
    long previousPosition = blockStartPosition;
    boolean matchingKeyIsLowerThanAnyRecord = true;
    while (record != null)
    {
      final int keysComparison = record.getKey().compareTo(key);
      if (keysComparison <= 0)
      {
        matchingKeyIsLowerThanAnyRecord = false;
      }
      if ((keysComparison == 0 && matchStrategy == EQUAL_TO_KEY)
          || (keysComparison >= 0 && matchStrategy != EQUAL_TO_KEY))
      {
        return getMatchingRecord(matchStrategy, positionStrategy, keysComparison, matchingKeyIsLowerThanAnyRecord,
            record, previousRecord, previousPosition);
      }
      previousRecord = record;
      previousPosition = getFilePosition();
      record = readRecord();
    }
 
    if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY)
    {
      return getRecordNoMatchForLessStrategy(positionStrategy, previousRecord, previousPosition);
    }
    return Pair.of(false, null);
  }
 
  private Pair<Boolean,Record<K,V>> getMatchingRecord(KeyMatchingStrategy matchStrategy,
      PositionStrategy positionStrategy, int keysComparison, boolean matchKeyIsLowerThanAnyRecord,
      Record<K, V> currentRecord, Record<K, V> previousRecord, long previousPosition)
          throws ChangelogException
  {
    Record<K, V> record = currentRecord;
 
    if (positionStrategy == AFTER_MATCHING_KEY)
    {
      if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && matchKeyIsLowerThanAnyRecord)
      {
        return Pair.of(false, null);
      }
      if (keysComparison == 0)
      {
        // skip matching key
        record = readRecord();
      }
    }
    else if (positionStrategy == ON_MATCHING_KEY && matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && keysComparison > 0)
    {
      seekToPosition(previousPosition);
      return Pair.of(previousRecord != null, previousRecord);
    }
    return Pair.of(true, record);
  }
 
  private Pair<Boolean, Record<K, V>> getRecordNoMatchForLessStrategy(
      final PositionStrategy positionStrategy, final Record<K, V> previousRecord, final long previousPosition)
          throws ChangelogException
  {
    if (positionStrategy == ON_MATCHING_KEY)
    {
      seekToPosition(previousPosition);
      return Pair.of(previousRecord != null, previousRecord);
    }
    else
    {
      return Pair.of(true, null);
    }
  }
 
  /**
   * Returns the closest start of block which has a position lower than or equal
   * to the provided file position.
   *
   * @param filePosition
   *          The position of reader on file.
   * @return the file position of the block start.
   */
  long getClosestBlockStartBeforeOrAtPosition(final long filePosition)
  {
    final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
    return dist == 0 ? filePosition : filePosition + dist - blockSize;
  }
 
  /**
   * Returns the closest start of block which has a position strictly
   * higher than the provided file position.
   *
   * @param filePosition
   *           The position of reader on file.
   * @return the file position of the block start.
   */
  long getClosestBlockStartStrictlyAfterPosition(final long filePosition)
  {
    final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
    return dist == 0 ? filePosition + blockSize : filePosition + dist;
  }
 
  /**
   * Returns the distance to next block for the provided file position.
   *
   * @param filePosition
   *            offset from the beginning of the file, in bytes.
   * @param blockSize
   *            Size of each block in bytes.
   * @return the distance to next block in bytes
   */
  static int getDistanceToNextBlockStart(final long filePosition, final int blockSize)
  {
    if (filePosition == 0)
    {
      return blockSize;
    }
    final int distance = (int) (filePosition % blockSize);
    return distance == 0 ? 0 : blockSize - distance;
  }
 
  /**
   * Check if the log file is valid, by reading the latest records at the end of
   * the log file.
   * <p>
   * The intent of this method is to allow self-recovery in case a partial
   * record as been written at the end of file (eg, after a server crash).
   * <p>
   * Any unexpected exception is considered as a severe error where
   * self-recovery is not appropriate and thus will lead to a
   * ChangelogException.
   *
   * @return -1 if log is valid, or a positive number if log is invalid, where
   *         the number represents the last valid position in the log file.
   * @throws ChangelogException
   *           if an error occurs while checking the log
   */
 long checkLogIsValid() throws ChangelogException
 {
   try
   {
     final long fileSize = getFileLength();
     final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize);
     positionToRecordFromBlockStart(lastBlockStart);
 
     long lastValidPosition = lastBlockStart;
     for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) {
       parser.decodeRecord(recordData);
       lastValidPosition = reader.getFilePointer();
     }
 
     final boolean isFileValid = lastValidPosition == fileSize;
     return isFileValid ? -1 : lastValidPosition;
   }
   catch (Exception e)
   {
     throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
         file.getPath(),
         StaticUtils.stackTraceToSingleLineString(e)));
   }
 }
 
}