mirror of
https://github.com/nim-lang/Nim.git
synced 2026-01-04 12:07:51 +00:00
@@ -93,8 +93,6 @@ type
|
||||
freeList: ptr FreeCell
|
||||
free: int # how many bytes remain
|
||||
acc: int # accumulator for small object allocation
|
||||
when defined(gcDestructors):
|
||||
sharedFreeList: ptr FreeCell # make no attempt at avoiding false sharing for now for this object field
|
||||
data {.align: MemAlign.}: UncheckedArray[byte] # start of usable memory
|
||||
|
||||
BigChunk = object of BaseChunk # not necessarily > PageSize!
|
||||
@@ -109,7 +107,9 @@ type
|
||||
MemRegion = object
|
||||
when not defined(gcDestructors):
|
||||
minLargeObj, maxLargeObj: int
|
||||
freeSmallChunks: array[0..max(1,SmallChunkSize div MemAlign-1), PSmallChunk]
|
||||
freeSmallChunks: array[0..max(1, SmallChunkSize div MemAlign-1), PSmallChunk]
|
||||
when defined(gcDestructors):
|
||||
sharedFreeLists: array[0..max(1, SmallChunkSize div MemAlign-1), ptr FreeCell]
|
||||
flBitmap: uint32
|
||||
slBitmap: array[RealFli, uint32]
|
||||
matrix: array[RealFli, array[MaxSli, PBigChunk]]
|
||||
@@ -775,8 +775,10 @@ when defined(gcDestructors):
|
||||
sysAssert c.next == nil, "c.next pointer must be nil"
|
||||
atomicPrepend a.sharedFreeListBigChunks, c
|
||||
|
||||
proc addToSharedFreeList(c: PSmallChunk; f: ptr FreeCell) {.inline.} =
|
||||
atomicPrepend c.sharedFreeList, f
|
||||
proc addToSharedFreeList(c: PSmallChunk; f: ptr FreeCell; size: int) {.inline.} =
|
||||
atomicPrepend c.owner.sharedFreeLists[size], f
|
||||
|
||||
const MaxSteps = 20
|
||||
|
||||
proc compensateCounters(a: var MemRegion; c: PSmallChunk; size: int) =
|
||||
# rawDealloc did NOT do the usual:
|
||||
@@ -785,24 +787,27 @@ when defined(gcDestructors):
|
||||
# Well, not for the entire list, but for `max` elements of the list because
|
||||
# we split the list in order to achieve bounded response times.
|
||||
var it = c.freeList
|
||||
var total = 0
|
||||
var x = 0
|
||||
while it != nil:
|
||||
inc total, size
|
||||
inc x, size
|
||||
let chunk = cast[PSmallChunk](pageAddr(it))
|
||||
inc(chunk.free, size)
|
||||
inc(chunk.free, x)
|
||||
it = it.next
|
||||
dec(a.occ, total)
|
||||
dec(a.occ, x)
|
||||
|
||||
proc freeDeferredObjects(a: var MemRegion; root: PBigChunk) =
|
||||
var it = root
|
||||
var maxIters = 20 # make it time-bounded
|
||||
var maxIters = MaxSteps # make it time-bounded
|
||||
while true:
|
||||
let rest = it.next.loada
|
||||
it.next.storea nil
|
||||
deallocBigChunk(a, cast[PBigChunk](it))
|
||||
if maxIters == 0:
|
||||
let rest = it.next.loada
|
||||
it.next.storea nil
|
||||
addToSharedFreeListBigChunks(a, rest)
|
||||
if rest != nil:
|
||||
addToSharedFreeListBigChunks(a, rest)
|
||||
sysAssert a.sharedFreeListBigChunks != nil, "re-enqueing failed"
|
||||
break
|
||||
it = it.next.loada
|
||||
it = rest
|
||||
dec maxIters
|
||||
if it == nil: break
|
||||
|
||||
@@ -826,8 +831,6 @@ proc rawAlloc(a: var MemRegion, requestedSize: int): pointer =
|
||||
sysAssert c.size == PageSize, "rawAlloc 3"
|
||||
c.size = size
|
||||
c.acc = size
|
||||
when defined(gcDestructors):
|
||||
c.sharedFreeList = nil
|
||||
c.free = SmallChunkSize - smallChunkOverhead() - size
|
||||
sysAssert c.owner == addr(a), "rawAlloc: No owner set!"
|
||||
c.next = nil
|
||||
@@ -844,10 +847,11 @@ proc rawAlloc(a: var MemRegion, requestedSize: int): pointer =
|
||||
when defined(gcDestructors):
|
||||
if c.freeList == nil:
|
||||
when hasThreadSupport:
|
||||
c.freeList = atomicExchangeN(addr c.sharedFreeList, nil, ATOMIC_RELAXED)
|
||||
# Steal the entire list from `sharedFreeList`:
|
||||
c.freeList = atomicExchangeN(addr a.sharedFreeLists[s], nil, ATOMIC_RELAXED)
|
||||
else:
|
||||
c.freeList = c.sharedFreeList
|
||||
c.sharedFreeList = nil
|
||||
c.freeList = a.sharedFreeLists[s]
|
||||
a.sharedFreeLists[s] = nil
|
||||
compensateCounters(a, c, size)
|
||||
if c.freeList == nil:
|
||||
sysAssert(c.acc + smallChunkOverhead() + size <= SmallChunkSize,
|
||||
@@ -914,7 +918,7 @@ proc rawDealloc(a: var MemRegion, p: pointer) =
|
||||
if isSmallChunk(c):
|
||||
# `p` is within a small chunk:
|
||||
var c = cast[PSmallChunk](c)
|
||||
var s = c.size
|
||||
let s = c.size
|
||||
# ^ We might access thread foreign storage here.
|
||||
# The other thread cannot possibly free this block as it's still alive.
|
||||
var f = cast[ptr FreeCell](p)
|
||||
@@ -950,7 +954,7 @@ proc rawDealloc(a: var MemRegion, p: pointer) =
|
||||
when logAlloc: cprintf("dealloc(pointer_%p) # SMALL FROM %p CALLER %p\n", p, c.owner, addr(a))
|
||||
|
||||
when defined(gcDestructors):
|
||||
addToSharedFreeList(c, f)
|
||||
addToSharedFreeList(c, f, s div MemAlign)
|
||||
sysAssert(((cast[int](p) and PageMask) - smallChunkOverhead()) %%
|
||||
s == 0, "rawDealloc 2")
|
||||
else:
|
||||
|
||||
54
tests/alloc/tmembug.nim
Normal file
54
tests/alloc/tmembug.nim
Normal file
@@ -0,0 +1,54 @@
|
||||
discard """
|
||||
joinable: false
|
||||
"""
|
||||
|
||||
import std / [atomics, strutils, sequtils]
|
||||
|
||||
type
|
||||
BackendMessage* = object
|
||||
field*: seq[int]
|
||||
|
||||
var
|
||||
chan1: Channel[BackendMessage]
|
||||
chan2: Channel[BackendMessage]
|
||||
|
||||
chan1.open()
|
||||
chan2.open()
|
||||
|
||||
proc routeMessage*(msg: BackendMessage) =
|
||||
discard chan2.trySend(msg)
|
||||
|
||||
var
|
||||
recv: Thread[void]
|
||||
stopToken: Atomic[bool]
|
||||
|
||||
proc recvMsg() =
|
||||
while not stopToken.load(moRelaxed):
|
||||
let resp = chan1.tryRecv()
|
||||
if resp.dataAvailable:
|
||||
routeMessage(resp.msg)
|
||||
echo "child consumes ", formatSize getOccupiedMem()
|
||||
|
||||
createThread[void](recv, recvMsg)
|
||||
|
||||
const MESSAGE_COUNT = 100
|
||||
|
||||
proc main() =
|
||||
let msg: BackendMessage = BackendMessage(field: (0..500).toSeq())
|
||||
for j in 0..0: #100:
|
||||
echo "New iteration"
|
||||
|
||||
for _ in 1..MESSAGE_COUNT:
|
||||
chan1.send(msg)
|
||||
echo "After sending"
|
||||
|
||||
var counter = 0
|
||||
while counter < MESSAGE_COUNT:
|
||||
let resp = recv(chan2)
|
||||
counter.inc
|
||||
echo "After receiving ", formatSize getOccupiedMem()
|
||||
|
||||
stopToken.store true, moRelaxed
|
||||
joinThreads(recv)
|
||||
|
||||
main()
|
||||
58
tests/alloc/tmembug2.nim
Normal file
58
tests/alloc/tmembug2.nim
Normal file
@@ -0,0 +1,58 @@
|
||||
discard """
|
||||
disabled: "true"
|
||||
"""
|
||||
|
||||
import std / [atomics, strutils, sequtils, isolation]
|
||||
|
||||
import threading / channels
|
||||
|
||||
type
|
||||
BackendMessage* = object
|
||||
field*: seq[int]
|
||||
|
||||
const MESSAGE_COUNT = 100
|
||||
|
||||
var
|
||||
chan1 = newChan[BackendMessage](MESSAGE_COUNT*2)
|
||||
chan2 = newChan[BackendMessage](MESSAGE_COUNT*2)
|
||||
|
||||
#chan1.open()
|
||||
#chan2.open()
|
||||
|
||||
proc routeMessage*(msg: BackendMessage) =
|
||||
var m = isolate(msg)
|
||||
discard chan2.trySend(m)
|
||||
|
||||
var
|
||||
thr: Thread[void]
|
||||
stopToken: Atomic[bool]
|
||||
|
||||
proc recvMsg() =
|
||||
while not stopToken.load(moRelaxed):
|
||||
var resp: BackendMessage
|
||||
if chan1.tryRecv(resp):
|
||||
#if resp.dataAvailable:
|
||||
routeMessage(resp)
|
||||
echo "child consumes ", formatSize getOccupiedMem()
|
||||
|
||||
createThread[void](thr, recvMsg)
|
||||
|
||||
proc main() =
|
||||
let msg: BackendMessage = BackendMessage(field: (0..5).toSeq())
|
||||
for j in 0..100:
|
||||
echo "New iteration"
|
||||
|
||||
for _ in 1..MESSAGE_COUNT:
|
||||
chan1.send(msg)
|
||||
echo "After sending"
|
||||
|
||||
var counter = 0
|
||||
while counter < MESSAGE_COUNT:
|
||||
let resp = recv(chan2)
|
||||
counter.inc
|
||||
echo "After receiving ", formatSize getOccupiedMem()
|
||||
|
||||
stopToken.store true, moRelaxed
|
||||
joinThreads(thr)
|
||||
|
||||
main()
|
||||
51
tests/threads/tmembug.nim
Normal file
51
tests/threads/tmembug.nim
Normal file
@@ -0,0 +1,51 @@
|
||||
|
||||
import std / [atomics, strutils, sequtils]
|
||||
|
||||
type
|
||||
BackendMessage* = object
|
||||
field*: seq[int]
|
||||
|
||||
var
|
||||
chan1: Channel[BackendMessage]
|
||||
chan2: Channel[BackendMessage]
|
||||
|
||||
chan1.open()
|
||||
chan2.open()
|
||||
|
||||
proc routeMessage*(msg: BackendMessage) =
|
||||
discard chan2.trySend(msg)
|
||||
|
||||
var
|
||||
recv: Thread[void]
|
||||
stopToken: Atomic[bool]
|
||||
|
||||
proc recvMsg() =
|
||||
while not stopToken.load(moRelaxed):
|
||||
let resp = chan1.tryRecv()
|
||||
if resp.dataAvailable:
|
||||
routeMessage(resp.msg)
|
||||
echo "child consumes ", formatSize getOccupiedMem()
|
||||
|
||||
createThread[void](recv, recvMsg)
|
||||
|
||||
const MESSAGE_COUNT = 100
|
||||
|
||||
proc main() =
|
||||
let msg: BackendMessage = BackendMessage(field: (0..500).toSeq())
|
||||
for j in 0..0: #100:
|
||||
echo "New iteration"
|
||||
|
||||
for _ in 1..MESSAGE_COUNT:
|
||||
chan1.send(msg)
|
||||
echo "After sending"
|
||||
|
||||
var counter = 0
|
||||
while counter < MESSAGE_COUNT:
|
||||
let resp = recv(chan2)
|
||||
counter.inc
|
||||
echo "After receiving ", formatSize getOccupiedMem()
|
||||
|
||||
stopToken.store true, moRelaxed
|
||||
joinThreads(recv)
|
||||
|
||||
main()
|
||||
Reference in New Issue
Block a user