[std/channels]fix recv leaks(Part One) (#17394)

This commit is contained in:
flywind
2021-03-17 17:08:54 +08:00
committed by GitHub
parent bebf2ce24a
commit e9b5543bd5
3 changed files with 9 additions and 26 deletions

View File

@@ -49,8 +49,7 @@ runnableExamples("--threads:on --gc:orc"):
createThread(worker1, firstWorker)
# Block until the message arrives, then print it out.
var dest = ""
chan.recv(dest)
let dest = chan.recv()
assert dest == "Hello World!"
# Wait for the thread to exit before moving on to the next example.
@@ -453,21 +452,10 @@ proc `=`*[T](dest: var Channel[T], src: Channel[T]) =
`=destroy`(dest)
dest.d = src.d
proc channelSend[T](chan: Channel[T], data: sink T, size: int, nonBlocking: bool): bool {.inline.} =
## Send item to the channel (FIFO queue)
## (Insert at last)
result = sendMpmc(chan.d, data.unsafeAddr, size, nonBlocking)
wasMoved(data)
proc channelReceive[T](chan: Channel[T], data: ptr T, size: int, nonBlocking: bool): bool {.inline.} =
## Receive an item from the channel
## (Remove the first item)
recvMpmc(chan.d, data, size, nonBlocking)
func trySend*[T](c: Channel[T], src: var Isolated[T]): bool {.inline.} =
## Sends item to the channel(non blocking).
var data = src.extract
result = channelSend(c, data, sizeof(data), true)
result = sendMpmc(c.d, data.addr, sizeof(T), true)
if result:
wasMoved(data)
@@ -477,26 +465,21 @@ template trySend*[T](c: Channel[T], src: T): bool =
func tryRecv*[T](c: Channel[T], dst: var T): bool {.inline.} =
## Receives item from the channel(non blocking).
channelReceive(c, dst.addr, sizeof(dst), true)
recvMpmc(c.d, dst.addr, sizeof(T), true)
func send*[T](c: Channel[T], src: sink Isolated[T]) {.inline.} =
## Sends item to the channel(blocking).
var data = src.extract
discard channelSend(c, data, sizeof(data), false)
discard sendMpmc(c.d, data.addr, sizeof(T), false)
wasMoved(data)
template send*[T](c: var Channel[T]; src: T) =
## Helper templates for `send`.
send(c, isolate(src))
func recv*[T](c: Channel[T], dst: var T) {.inline.} =
func recv*[T](c: Channel[T]): T {.inline.} =
## Receives item from the channel(blocking).
discard channelReceive(c, dst.addr, sizeof(dst), false)
func recvIso*[T](c: Channel[T]): Isolated[T] {.inline.} =
var dst: T
discard channelReceive(c, dst.addr, sizeof(dst), false)
result = isolate(dst)
discard recvMpmc(c.d, result.addr, sizeof(result), false)
func open*[T](c: Channel[T]): bool {.inline.} =
result = c.d.channelOpenMpmc()

View File

@@ -89,6 +89,7 @@ proc runSuite(
capacity(chan) == 1
isBuffered(chan) == false
isUnbuffered(chan) == true
else:
chan = allocChannel(size = int.sizeof.int, n = 7)
check:
@@ -109,7 +110,7 @@ proc runSuite(
discard pthread_join(threads[0], nil)
discard pthread_join(threads[1], nil)
freeChannel(chan)
freeChannel(chan)
# ----------------------------------------------------------------------------------

View File

@@ -24,8 +24,7 @@ var worker1: Thread[void]
createThread(worker1, firstWorker)
# Block until the message arrives, then print it out.
var dest = ""
chan.recv(dest)
let dest = chan.recv()
doAssert dest == "Hello World!"
# Wait for the thread to exit before moving on to the next example.