-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreader.go
More file actions
112 lines (98 loc) · 3.17 KB
/
Copy pathreader.go
File metadata and controls
112 lines (98 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package main
import (
"io"
"os"
"sync"
)
// Reader buffer pool for reusing read buffers
// Pool buffers are locked in memory on creation to prevent swap
// Buffers use huge pages (MADV_HUGEPAGE) to reduce TLB misses
var readerBufPool = sync.Pool{
New: func() interface{} {
// Pre-allocate with 10MB capacity for typical block size
buf := make([]byte, 0, 10*1024*1024)
// Lock the full capacity to prevent swap
TryLockBuffer(buf[:cap(buf)])
// Advise kernel to use huge pages (2MB) for better TLB utilization
AdviseHugePage(buf[:cap(buf)])
return buf
},
}
// getReaderBuf gets a buffer from the pool or allocates a new one
func getReaderBuf(size uint32) []byte {
buf := readerBufPool.Get().([]byte)
if cap(buf) < int(size) {
// Pool buffer too small, allocate new one (not returned to pool)
// New buffers are automatically locked
newBuf := make([]byte, size)
TryLockBuffer(newBuf)
return newBuf
}
return buf[:size]
}
// putReaderBuf returns a buffer to the pool if it's appropriately sized
func putReaderBuf(buf []byte) {
if cap(buf) <= 10*1024*1024 {
readerBufPool.Put(buf[:0])
}
// Large buffers are GC'd instead of polluting the pool
// Note: munlock happens automatically when GC frees the memory
}
// BlockData represents a block with its index and data
type BlockData struct {
BlockIdx uint32
Data []byte
}
// SequentialReader reads blocks sequentially and pushes to channel
type SequentialReader struct {
file *os.File
blockSize uint32
lastBlockNum uint32
skipIdx uint32
blockChan chan BlockData
wg sync.WaitGroup
}
// NewSequentialReader creates a new sequential reader
func NewSequentialReader(file *os.File, blockSize uint32, fileSize uint64, skipIdx uint32, bufferAhead int) *SequentialReader {
lastBlock := uint32((fileSize - 1) / uint64(blockSize))
return &SequentialReader{
file: file,
blockSize: blockSize,
lastBlockNum: lastBlock,
skipIdx: skipIdx,
blockChan: make(chan BlockData, bufferAhead),
}
}
// Start begins reading blocks in the background
func (sr *SequentialReader) Start() {
sr.wg.Add(1)
go func() {
defer sr.wg.Done()
defer close(sr.blockChan)
for blockIdx := sr.skipIdx; blockIdx <= sr.lastBlockNum; blockIdx++ {
// Read each block directly into its own buffer. The buffer is handed
// off on the channel, so reading straight into it (instead of into a
// shared scratch buffer and then copying) saves a full block-sized
// memcpy per block.
offset := int64(blockIdx) * int64(sr.blockSize)
buf := getReaderBuf(sr.blockSize)
n, err := sr.file.ReadAt(buf, offset)
if err != nil && err != io.EOF {
Log("sequential reader error reading block %d: %s\n", blockIdx, err)
return
}
// We won't read this source range again; drop its clean page-cache
// pages so a large transfer doesn't evict useful read-ahead pages.
AdviseDontNeed(sr.file, offset, int64(n))
sr.blockChan <- BlockData{BlockIdx: blockIdx, Data: buf[:n]}
}
}()
}
// Blocks returns the channel for consumers
func (sr *SequentialReader) Blocks() <-chan BlockData {
return sr.blockChan
}
// Wait waits for the reader to finish
func (sr *SequentialReader) Wait() {
sr.wg.Wait()
}