12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package page_writer
- import (
- "sync/atomic"
- )
- func (up *UploadPipeline) LockForRead(startOffset, stopOffset int64) {
- startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
- stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
- if stopOffset%up.ChunkSize > 0 {
- stopLogicChunkIndex += 1
- }
- up.chunksLock.Lock()
- defer up.chunksLock.Unlock()
- for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
- if count, found := up.activeReadChunks[i]; found {
- up.activeReadChunks[i] = count + 1
- } else {
- up.activeReadChunks[i] = 1
- }
- }
- }
- func (up *UploadPipeline) UnlockForRead(startOffset, stopOffset int64) {
- startLogicChunkIndex := LogicChunkIndex(startOffset / up.ChunkSize)
- stopLogicChunkIndex := LogicChunkIndex(stopOffset / up.ChunkSize)
- if stopOffset%up.ChunkSize > 0 {
- stopLogicChunkIndex += 1
- }
- up.chunksLock.Lock()
- defer up.chunksLock.Unlock()
- for i := startLogicChunkIndex; i < stopLogicChunkIndex; i++ {
- if count, found := up.activeReadChunks[i]; found {
- if count == 1 {
- delete(up.activeReadChunks, i)
- } else {
- up.activeReadChunks[i] = count - 1
- }
- }
- }
- }
- func (up *UploadPipeline) IsLocked(logicChunkIndex LogicChunkIndex) bool {
- if count, found := up.activeReadChunks[logicChunkIndex]; found {
- return count > 0
- }
- return false
- }
- func (up *UploadPipeline) waitForCurrentWritersToComplete() {
- up.uploaderCountCond.L.Lock()
- t := int32(100)
- for {
- t = atomic.LoadInt32(&up.uploaderCount)
- if t <= 0 {
- break
- }
- up.uploaderCountCond.Wait()
- }
- up.uploaderCountCond.L.Unlock()
- }
|