mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-31 02:12:11 +00:00
@@ -341,8 +341,6 @@
|
||||
|
||||
- Added `jscore.debugger` to [call any available debugging functionality, such as breakpoints.](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/debugger).
|
||||
|
||||
- Added `std/channels`.
|
||||
|
||||
- Added `htmlgen.portal` for [making "SPA style" pages using HTML only](https://web.dev/hands-on-portals).
|
||||
|
||||
- `std/times`:
|
||||
|
||||
@@ -8016,7 +8016,7 @@ Threads
|
||||
|
||||
To enable thread support the `--threads:on`:option: command-line switch needs to
|
||||
be used. The system_ module then contains several threading primitives.
|
||||
See the `threads <threads.html>`_ and `channels <channels_builtin.html>`_ modules
|
||||
See the `channels <channels_builtin.html>`_ modules
|
||||
for the low-level thread API. There are also high-level parallelism constructs
|
||||
available. See `spawn <manual_experimental.html#parallel-amp-spawn>`_ for
|
||||
further details.
|
||||
|
||||
@@ -1,498 +0,0 @@
|
||||
#
|
||||
#
|
||||
# The Nim Compiler
|
||||
# (c) Copyright 2021 Andreas Prell, Mamy André-Ratsimbazafy & Nim Contributors
|
||||
#
|
||||
# See the file "copying.txt", included in this
|
||||
# distribution, for details about the copyright.
|
||||
#
|
||||
|
||||
|
||||
# Based on https://github.com/mratsim/weave/blob/5696d94e6358711e840f8c0b7c684fcc5cbd4472/unused/channels/channels_legacy.nim
|
||||
# Those are translations of @aprell (Andreas Prell) original channels from C to Nim
|
||||
# (https://github.com/aprell/tasking-2.0/blob/master/src/channel_shm/channel.c)
|
||||
# And in turn they are an implementation of Michael & Scott lock-based queues
|
||||
# (note the paper has 2 channels: lock-free and lock-based) with additional caching:
|
||||
# Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
|
||||
# Maged M. Michael, Michael L. Scott, 1996
|
||||
# https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf
|
||||
|
||||
## This module only works with `--gc:arc` or `--gc:orc`.
|
||||
##
|
||||
## .. warning:: This module is experimental and its interface may change.
|
||||
##
|
||||
## The following is a simple example of two different ways to use channels:
|
||||
## blocking and non-blocking.
|
||||
##
|
||||
|
||||
runnableExamples("--threads:on --gc:orc"):
|
||||
import std/os
|
||||
|
||||
# In this example a channel is declared at module scope.
|
||||
# Channels are generic, and they include support for passing objects between
|
||||
# threads.
|
||||
# Note that isolated data passed through channels is moved around.
|
||||
var chan = newChannel[string]()
|
||||
|
||||
# This proc will be run in another thread using the threads module.
|
||||
proc firstWorker() =
|
||||
chan.send("Hello World!")
|
||||
|
||||
# This is another proc to run in a background thread. This proc takes a while
|
||||
# to send the message since it sleeps for 2 seconds (or 2000 milliseconds).
|
||||
proc secondWorker() =
|
||||
sleep(2000)
|
||||
chan.send("Another message")
|
||||
|
||||
# Launch the worker.
|
||||
var worker1: Thread[void]
|
||||
createThread(worker1, firstWorker)
|
||||
|
||||
# Block until the message arrives, then print it out.
|
||||
let dest = chan.recv()
|
||||
assert dest == "Hello World!"
|
||||
|
||||
# Wait for the thread to exit before moving on to the next example.
|
||||
worker1.joinThread()
|
||||
|
||||
# Launch the other worker.
|
||||
var worker2: Thread[void]
|
||||
createThread(worker2, secondWorker)
|
||||
# This time, use a non-blocking approach with tryRecv.
|
||||
# Since the main thread is not blocked, it could be used to perform other
|
||||
# useful work while it waits for data to arrive on the channel.
|
||||
var messages: seq[string]
|
||||
while true:
|
||||
var msg = ""
|
||||
if chan.tryRecv(msg):
|
||||
messages.add msg # "Another message"
|
||||
break
|
||||
|
||||
messages.add "Pretend I'm doing useful work..."
|
||||
# For this example, sleep in order not to flood stdout with the above
|
||||
# message.
|
||||
sleep(400)
|
||||
|
||||
# Wait for the second thread to exit before cleaning up the channel.
|
||||
worker2.joinThread()
|
||||
|
||||
# Clean up the channel.
|
||||
assert chan.close()
|
||||
|
||||
assert messages[^1] == "Another message"
|
||||
assert messages.len >= 2
|
||||
|
||||
|
||||
when not defined(gcArc) and not defined(gcOrc) and not defined(nimdoc):
|
||||
{.error: "This channel implementation requires --gc:arc or --gc:orc".}
|
||||
|
||||
import std/[locks, atomics, isolation]
|
||||
import system/ansi_c
|
||||
|
||||
# Channel (Shared memory channels)
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
const
|
||||
cacheLineSize {.intdefine.} = 64 # TODO: some Samsung phone have 128 cache-line
|
||||
nimChannelCacheSize* {.intdefine.} = 100
|
||||
|
||||
type
|
||||
ChannelRaw = ptr ChannelObj
|
||||
ChannelObj = object
|
||||
headLock, tailLock: Lock
|
||||
notFullCond, notEmptyCond: Cond
|
||||
closed: Atomic[bool]
|
||||
size: int
|
||||
itemsize: int # up to itemsize bytes can be exchanged over this channel
|
||||
head {.align: cacheLineSize.} : int # Items are taken from head and new items are inserted at tail
|
||||
tail: int
|
||||
buffer: ptr UncheckedArray[byte]
|
||||
atomicCounter: Atomic[int]
|
||||
|
||||
ChannelCache = ptr ChannelCacheObj
|
||||
ChannelCacheObj = object
|
||||
next: ChannelCache
|
||||
chanSize: int
|
||||
chanN: int
|
||||
numCached: int
|
||||
cache: array[nimChannelCacheSize, ChannelRaw]
|
||||
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
proc numItems(chan: ChannelRaw): int {.inline.} =
|
||||
result = chan.tail - chan.head
|
||||
if result < 0:
|
||||
inc(result, 2 * chan.size)
|
||||
|
||||
assert result <= chan.size
|
||||
|
||||
template isFull(chan: ChannelRaw): bool =
|
||||
abs(chan.tail - chan.head) == chan.size
|
||||
|
||||
template isEmpty(chan: ChannelRaw): bool =
|
||||
chan.head == chan.tail
|
||||
|
||||
# Unbuffered / synchronous channels
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
template numItemsUnbuf(chan: ChannelRaw): int =
|
||||
chan.head
|
||||
|
||||
template isFullUnbuf(chan: ChannelRaw): bool =
|
||||
chan.head == 1
|
||||
|
||||
template isEmptyUnbuf(chan: ChannelRaw): bool =
|
||||
chan.head == 0
|
||||
|
||||
# ChannelRaw kinds
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
func isUnbuffered(chan: ChannelRaw): bool =
|
||||
chan.size - 1 == 0
|
||||
|
||||
# ChannelRaw status and properties
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
proc isClosed(chan: ChannelRaw): bool {.inline.} = load(chan.closed, moRelaxed)
|
||||
|
||||
proc peek(chan: ChannelRaw): int {.inline.} =
|
||||
(if chan.isUnbuffered: numItemsUnbuf(chan) else: numItems(chan))
|
||||
|
||||
# Per-thread channel cache
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
var channelCache {.threadvar.}: ChannelCache
|
||||
var channelCacheLen {.threadvar.}: int
|
||||
|
||||
proc allocChannelCache(size, n: int): bool =
|
||||
## Allocate a free list for storing channels of a given type
|
||||
var p = channelCache
|
||||
|
||||
# Avoid multiple free lists for the exact same type of channel
|
||||
while not p.isNil:
|
||||
if size == p.chanSize and n == p.chanN:
|
||||
return false
|
||||
p = p.next
|
||||
|
||||
p = cast[ptr ChannelCacheObj](c_malloc(csize_t sizeof(ChannelCacheObj)))
|
||||
if p.isNil:
|
||||
raise newException(OutOfMemDefect, "Could not allocate memory")
|
||||
|
||||
p.chanSize = size
|
||||
p.chanN = n
|
||||
p.numCached = 0
|
||||
|
||||
p.next = channelCache
|
||||
channelCache = p
|
||||
inc channelCacheLen
|
||||
result = true
|
||||
|
||||
proc freeChannelCache*() =
|
||||
## Frees the entire channel cache, including all channels
|
||||
var p = channelCache
|
||||
var q: ChannelCache
|
||||
|
||||
while not p.isNil:
|
||||
q = p.next
|
||||
for i in 0 ..< p.numCached:
|
||||
let chan = p.cache[i]
|
||||
if not chan.buffer.isNil:
|
||||
c_free(chan.buffer)
|
||||
deinitLock(chan.headLock)
|
||||
deinitLock(chan.tailLock)
|
||||
deinitCond(chan.notFullCond)
|
||||
deinitCond(chan.notEmptyCond)
|
||||
c_free(chan)
|
||||
c_free(p)
|
||||
dec channelCacheLen
|
||||
p = q
|
||||
|
||||
assert(channelCacheLen == 0)
|
||||
channelCache = nil
|
||||
|
||||
# Channels memory ops
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
proc allocChannel(size, n: int): ChannelRaw =
|
||||
when nimChannelCacheSize > 0:
|
||||
var p = channelCache
|
||||
|
||||
while not p.isNil:
|
||||
if size == p.chanSize and n == p.chanN:
|
||||
# Check if free list contains channel
|
||||
if p.numCached > 0:
|
||||
dec p.numCached
|
||||
result = p.cache[p.numCached]
|
||||
assert(result.isEmpty)
|
||||
return
|
||||
else:
|
||||
# All the other lists in cache won't match
|
||||
break
|
||||
p = p.next
|
||||
|
||||
result = cast[ChannelRaw](c_malloc(csize_t sizeof(ChannelObj)))
|
||||
if result.isNil:
|
||||
raise newException(OutOfMemDefect, "Could not allocate memory")
|
||||
|
||||
# To buffer n items, we allocate for n
|
||||
result.buffer = cast[ptr UncheckedArray[byte]](c_malloc(csize_t n*size))
|
||||
if result.buffer.isNil:
|
||||
raise newException(OutOfMemDefect, "Could not allocate memory")
|
||||
|
||||
initLock(result.headLock)
|
||||
initLock(result.tailLock)
|
||||
initCond(result.notFullCond)
|
||||
initCond(result.notEmptyCond)
|
||||
|
||||
result.closed.store(false, moRelaxed) # We don't need atomic here, how to?
|
||||
result.size = n
|
||||
result.itemsize = size
|
||||
result.head = 0
|
||||
result.tail = 0
|
||||
result.atomicCounter.store(0, moRelaxed)
|
||||
|
||||
when nimChannelCacheSize > 0:
|
||||
# Allocate a cache as well if one of the proper size doesn't exist
|
||||
discard allocChannelCache(size, n)
|
||||
|
||||
proc freeChannel(chan: ChannelRaw) =
|
||||
if chan.isNil:
|
||||
return
|
||||
|
||||
when nimChannelCacheSize > 0:
|
||||
var p = channelCache
|
||||
while not p.isNil:
|
||||
if chan.itemsize == p.chanSize and
|
||||
chan.size == p.chanN:
|
||||
if p.numCached < nimChannelCacheSize:
|
||||
# If space left in cache, cache it
|
||||
p.cache[p.numCached] = chan
|
||||
inc p.numCached
|
||||
return
|
||||
else:
|
||||
# All the other lists in cache won't match
|
||||
break
|
||||
p = p.next
|
||||
|
||||
if not chan.buffer.isNil:
|
||||
c_free(chan.buffer)
|
||||
|
||||
deinitLock(chan.headLock)
|
||||
deinitLock(chan.tailLock)
|
||||
deinitCond(chan.notFullCond)
|
||||
deinitCond(chan.notEmptyCond)
|
||||
|
||||
c_free(chan)
|
||||
|
||||
# MPMC Channels (Multi-Producer Multi-Consumer)
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
proc sendUnbufferedMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool =
|
||||
if nonBlocking and chan.isFullUnbuf:
|
||||
return false
|
||||
|
||||
acquire(chan.headLock)
|
||||
|
||||
if nonBlocking and chan.isFullUnbuf:
|
||||
# Another thread was faster
|
||||
release(chan.headLock)
|
||||
return false
|
||||
|
||||
while chan.isFullUnbuf:
|
||||
wait(chan.notFullcond, chan.headLock)
|
||||
|
||||
assert chan.isEmptyUnbuf
|
||||
assert size <= chan.itemsize
|
||||
copyMem(chan.buffer, data, size)
|
||||
|
||||
chan.head = 1
|
||||
|
||||
signal(chan.notEmptyCond)
|
||||
release(chan.headLock)
|
||||
result = true
|
||||
|
||||
proc sendMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool =
|
||||
assert not chan.isNil
|
||||
assert not data.isNil
|
||||
|
||||
if isUnbuffered(chan):
|
||||
return sendUnbufferedMpmc(chan, data, size, nonBlocking)
|
||||
|
||||
if nonBlocking and chan.isFull:
|
||||
return false
|
||||
|
||||
acquire(chan.tailLock)
|
||||
|
||||
if nonBlocking and chan.isFull:
|
||||
# Another thread was faster
|
||||
release(chan.tailLock)
|
||||
return false
|
||||
|
||||
while chan.isFull:
|
||||
wait(chan.notFullcond, chan.tailLock)
|
||||
|
||||
assert not chan.isFull
|
||||
assert size <= chan.itemsize
|
||||
|
||||
let writeIdx = if chan.tail < chan.size: chan.tail
|
||||
else: chan.tail - chan.size
|
||||
|
||||
copyMem(chan.buffer[writeIdx * chan.itemsize].addr, data, size)
|
||||
|
||||
inc chan.tail
|
||||
if chan.tail == 2 * chan.size:
|
||||
chan.tail = 0
|
||||
|
||||
signal(chan.notEmptyCond)
|
||||
release(chan.tailLock)
|
||||
result = true
|
||||
|
||||
proc recvUnbufferedMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool =
|
||||
if nonBlocking and chan.isEmptyUnbuf:
|
||||
return false
|
||||
|
||||
acquire(chan.headLock)
|
||||
|
||||
if nonBlocking and chan.isEmptyUnbuf:
|
||||
# Another thread was faster
|
||||
release(chan.headLock)
|
||||
return false
|
||||
|
||||
while chan.isEmptyUnbuf:
|
||||
wait(chan.notEmptyCond, chan.headLock)
|
||||
|
||||
assert chan.isFullUnbuf
|
||||
assert size <= chan.itemsize
|
||||
|
||||
copyMem(data, chan.buffer, size)
|
||||
|
||||
chan.head = 0
|
||||
|
||||
signal(chan.notFullCond)
|
||||
release(chan.headLock)
|
||||
result = true
|
||||
|
||||
proc recvMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool =
|
||||
assert not chan.isNil
|
||||
assert not data.isNil
|
||||
|
||||
if isUnbuffered(chan):
|
||||
return recvUnbufferedMpmc(chan, data, size, nonBlocking)
|
||||
|
||||
if nonBlocking and chan.isEmpty:
|
||||
return false
|
||||
|
||||
acquire(chan.headLock)
|
||||
|
||||
if nonBlocking and chan.isEmpty:
|
||||
# Another thread took the last data
|
||||
release(chan.headLock)
|
||||
return false
|
||||
|
||||
while chan.isEmpty:
|
||||
wait(chan.notEmptyCond, chan.headLock)
|
||||
|
||||
assert not chan.isEmpty
|
||||
assert size <= chan.itemsize
|
||||
|
||||
let readIdx = if chan.head < chan.size: chan.head
|
||||
else: chan.head - chan.size
|
||||
|
||||
copyMem(data, chan.buffer[readIdx * chan.itemsize].addr, size)
|
||||
|
||||
inc chan.head
|
||||
if chan.head == 2 * chan.size:
|
||||
chan.head = 0
|
||||
|
||||
signal(chan.notFullCond)
|
||||
release(chan.headLock)
|
||||
result = true
|
||||
|
||||
proc channelCloseMpmc(chan: ChannelRaw): bool =
|
||||
# Unsynchronized
|
||||
|
||||
if chan.isClosed:
|
||||
# ChannelRaw already closed
|
||||
return false
|
||||
|
||||
store(chan.closed, true, moRelaxed)
|
||||
result = true
|
||||
|
||||
proc channelOpenMpmc(chan: ChannelRaw): bool =
|
||||
# Unsynchronized
|
||||
|
||||
if not chan.isClosed:
|
||||
# ChannelRaw already open
|
||||
return false
|
||||
|
||||
store(chan.closed, false, moRelaxed)
|
||||
result = true
|
||||
|
||||
# Public API
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
type
|
||||
Channel*[T] = object ## Typed channels
|
||||
d: ChannelRaw
|
||||
|
||||
proc `=destroy`*[T](c: var Channel[T]) =
|
||||
if c.d != nil:
|
||||
if load(c.d.atomicCounter, moAcquire) == 0:
|
||||
if c.d.buffer != nil:
|
||||
freeChannel(c.d)
|
||||
else:
|
||||
atomicDec(c.d.atomicCounter)
|
||||
|
||||
proc `=copy`*[T](dest: var Channel[T], src: Channel[T]) =
|
||||
## Shares `Channel` by reference counting.
|
||||
if src.d != nil:
|
||||
atomicInc(src.d.atomicCounter)
|
||||
|
||||
if dest.d != nil:
|
||||
`=destroy`(dest)
|
||||
dest.d = src.d
|
||||
|
||||
func trySend*[T](c: Channel[T], src: var Isolated[T]): bool {.inline.} =
|
||||
## Sends item to the channel(non blocking).
|
||||
var data = src.extract
|
||||
result = sendMpmc(c.d, data.addr, sizeof(T), true)
|
||||
if result:
|
||||
wasMoved(data)
|
||||
|
||||
template trySend*[T](c: Channel[T], src: T): bool =
|
||||
## Helper templates for `trySend`.
|
||||
trySend(c, isolate(src))
|
||||
|
||||
func tryRecv*[T](c: Channel[T], dst: var T): bool {.inline.} =
|
||||
## Receives item from the channel(non blocking).
|
||||
recvMpmc(c.d, dst.addr, sizeof(T), true)
|
||||
|
||||
func send*[T](c: Channel[T], src: sink Isolated[T]) {.inline.} =
|
||||
## Sends item to the channel(blocking).
|
||||
var data = src.extract
|
||||
discard sendMpmc(c.d, data.addr, sizeof(T), false)
|
||||
wasMoved(data)
|
||||
|
||||
template send*[T](c: Channel[T]; src: T) =
|
||||
## Helper templates for `send`.
|
||||
send(c, isolate(src))
|
||||
|
||||
func recv*[T](c: Channel[T]): T {.inline.} =
|
||||
## Receives item from the channel(blocking).
|
||||
discard recvMpmc(c.d, result.addr, sizeof(result), false)
|
||||
|
||||
func open*[T](c: Channel[T]): bool {.inline.} =
|
||||
result = c.d.channelOpenMpmc()
|
||||
|
||||
func close*[T](c: Channel[T]): bool {.inline.} =
|
||||
result = c.d.channelCloseMpmc()
|
||||
|
||||
func peek*[T](c: Channel[T]): int {.inline.} = peek(c.d)
|
||||
|
||||
proc newChannel*[T](elements = 30): Channel[T] =
|
||||
## Returns a new `Channel`. `elements` should be positive.
|
||||
## `elements` is used to specify whether a channel is buffered or not.
|
||||
## If `elements` = 1, the channel is unbuffered. If `elements` > 1, the
|
||||
## channel is buffered.
|
||||
assert elements >= 1, "Elements must be positive!"
|
||||
result = Channel[T](d: allocChannel(sizeof(T), elements))
|
||||
@@ -1,33 +0,0 @@
|
||||
discard """
|
||||
timeout: 60.0 # but typically < 1s (in isolation but other tests running in parallel can affect this since based on epochTime)
|
||||
disabled: "freebsd"
|
||||
matrix: "--gc:arc --threads:on; --gc:arc --threads:on -d:danger"
|
||||
"""
|
||||
|
||||
when true:
|
||||
# bug #17380: this was either blocking (without -d:danger) or crashing with SIGSEGV (with -d:danger)
|
||||
import std/[channels, isolation]
|
||||
const
|
||||
N1 = 10
|
||||
N2 = 100
|
||||
var
|
||||
sender: array[N1, Thread[void]]
|
||||
receiver: array[5, Thread[void]]
|
||||
|
||||
var chan = newChannel[seq[string]](N1 * N2) # large enough to not block
|
||||
proc sendHandler() =
|
||||
chan.send(isolate(@["Hello, Nim"]))
|
||||
proc recvHandler() =
|
||||
template fn =
|
||||
let x = chan.recv()
|
||||
fn()
|
||||
|
||||
template benchmark() =
|
||||
for t in mitems(sender):
|
||||
t.createThread(sendHandler)
|
||||
joinThreads(sender)
|
||||
for t in mitems(receiver):
|
||||
t.createThread(recvHandler)
|
||||
joinThreads(receiver)
|
||||
for i in 0..<N2:
|
||||
benchmark()
|
||||
@@ -1,322 +0,0 @@
|
||||
discard """
|
||||
targets: "c cpp"
|
||||
matrix: "--gc:orc --threads:on; --gc:orc --threads:on -d:blockingTest"
|
||||
disabled: "windows"
|
||||
disabled: "bsd"
|
||||
disabled: "osx"
|
||||
"""
|
||||
|
||||
include std/channels
|
||||
|
||||
import std/unittest
|
||||
|
||||
|
||||
type
|
||||
ChannelBufKind = enum
|
||||
Unbuffered # Unbuffered (blocking) channel
|
||||
Buffered # Buffered (non-blocking channel)
|
||||
|
||||
|
||||
proc capacity(chan: ChannelRaw): int {.inline.} = chan.size
|
||||
func isBuffered(chan: ChannelRaw): bool =
|
||||
chan.size - 1 > 0
|
||||
|
||||
when defined(blockingTest):
|
||||
const nonBlocking = false
|
||||
else:
|
||||
const nonBlocking = true
|
||||
|
||||
type
|
||||
Pthread {.importc: "pthread_t", header: "<sys/types.h>".} = distinct culong
|
||||
PthreadAttr* {.byref, importc: "pthread_attr_t", header: "<sys/types.h>".} = object
|
||||
Errno* = distinct cint
|
||||
|
||||
proc pthread_create[T](
|
||||
thread: var Pthread,
|
||||
attr: ptr PthreadAttr, # In Nim this is a var and how Nim sets a custom stack
|
||||
fn: proc (x: ptr T): pointer {.thread, noconv.},
|
||||
arg: ptr T
|
||||
): Errno {.header: "<sys/types.h>".}
|
||||
|
||||
proc pthread_join(
|
||||
thread: Pthread,
|
||||
thread_exit_status: ptr pointer
|
||||
): Errno {.header: "<pthread.h>".}
|
||||
|
||||
template channel_send_loop(chan: ChannelRaw,
|
||||
data: sink pointer,
|
||||
size: int,
|
||||
body: untyped): untyped =
|
||||
while not sendMpmc(chan, data, size, nonBlocking):
|
||||
body
|
||||
|
||||
template channel_receive_loop(chan: ChannelRaw,
|
||||
data: pointer,
|
||||
size: int,
|
||||
body: untyped): untyped =
|
||||
while not recvMpmc(chan, data, size, nonBlocking):
|
||||
body
|
||||
|
||||
|
||||
# Without threads:on or release,
|
||||
# worker threads will crash on popFrame
|
||||
|
||||
import std/unittest
|
||||
|
||||
type ThreadArgs = object
|
||||
ID: int
|
||||
chan: ChannelRaw
|
||||
|
||||
template Worker(id: int, body: untyped): untyped {.dirty.} =
|
||||
if args.ID == id:
|
||||
body
|
||||
|
||||
|
||||
const Sender = 1
|
||||
const Receiver = 0
|
||||
|
||||
proc runSuite(
|
||||
name: string,
|
||||
fn: proc(args: ptr ThreadArgs): pointer {.noconv, gcsafe.}
|
||||
) =
|
||||
var chan: ChannelRaw
|
||||
|
||||
for i in Unbuffered .. Buffered:
|
||||
if i == Unbuffered:
|
||||
chan = allocChannel(size = 32, n = 1)
|
||||
check:
|
||||
peek(chan) == 0
|
||||
capacity(chan) == 1
|
||||
isBuffered(chan) == false
|
||||
isUnbuffered(chan) == true
|
||||
|
||||
else:
|
||||
chan = allocChannel(size = int.sizeof.int, n = 7)
|
||||
check:
|
||||
peek(chan) == 0
|
||||
capacity(chan) == 7
|
||||
isBuffered(chan) == true
|
||||
isUnbuffered(chan) == false
|
||||
|
||||
var threads: array[2, Pthread]
|
||||
var args = [
|
||||
ThreadArgs(ID: 0, chan: chan),
|
||||
ThreadArgs(ID: 1, chan: chan)
|
||||
]
|
||||
|
||||
discard pthread_create(threads[0], nil, fn, args[0].addr)
|
||||
discard pthread_create(threads[1], nil, fn, args[1].addr)
|
||||
|
||||
discard pthread_join(threads[0], nil)
|
||||
discard pthread_join(threads[1], nil)
|
||||
|
||||
freeChannel(chan)
|
||||
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
proc thread_func(args: ptr ThreadArgs): pointer {.noconv.} =
|
||||
|
||||
# Worker RECEIVER:
|
||||
# ---------
|
||||
# <- chan
|
||||
# <- chan
|
||||
# <- chan
|
||||
#
|
||||
# Worker SENDER:
|
||||
# ---------
|
||||
# chan <- 42
|
||||
# chan <- 53
|
||||
# chan <- 64
|
||||
#
|
||||
|
||||
Worker(Receiver):
|
||||
var val: int
|
||||
for j in 0 ..< 3:
|
||||
channel_receive_loop(args.chan, val.addr, val.sizeof.int):
|
||||
# Busy loop, normally it should yield
|
||||
discard
|
||||
check: val == 42 + j*11
|
||||
|
||||
Worker(Sender):
|
||||
var val: int
|
||||
check: peek(args.chan) == 0
|
||||
for j in 0 ..< 3:
|
||||
val = 42 + j*11
|
||||
channel_send_loop(args.chan, val.addr, val.sizeof.int):
|
||||
# Busy loop, normally it should yield
|
||||
discard
|
||||
|
||||
return nil
|
||||
|
||||
runSuite("[ChannelRaw] 2 threads can send data", thread_func)
|
||||
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
iterator pairs(chan: ChannelRaw, T: typedesc): (int, T) =
|
||||
var i = 0
|
||||
var x: T
|
||||
while not isClosed(chan) or peek(chan) > 0:
|
||||
let r = recvMpmc(chan, x.addr, x.sizeof.int, true)
|
||||
# printf("x: %d, r: %d\n", x, r)
|
||||
if r:
|
||||
yield (i, x)
|
||||
inc i
|
||||
|
||||
proc thread_func_2(args: ptr ThreadArgs): pointer {.noconv.} =
|
||||
# Worker RECEIVER:
|
||||
# ---------
|
||||
# <- chan until closed and empty
|
||||
#
|
||||
# Worker SENDER:
|
||||
# ---------
|
||||
# chan <- 42, 53, 64, ...
|
||||
|
||||
const N = 100
|
||||
|
||||
Worker(Receiver):
|
||||
for j, val in pairs(args.chan, int):
|
||||
# TODO: Need special handling that doesn't allocate
|
||||
# in thread with no GC
|
||||
# when check fails
|
||||
#
|
||||
check: val == 42 + j*11
|
||||
|
||||
Worker(Sender):
|
||||
var val: int
|
||||
check: peek(args.chan) == 0
|
||||
for j in 0 ..< N:
|
||||
val = 42 + j*11
|
||||
channel_send_loop(args.chan, val.addr, int.sizeof.int):
|
||||
discard
|
||||
discard channelCloseMpmc(args.chan)
|
||||
|
||||
return nil
|
||||
|
||||
runSuite("[ChannelRaw] channel_close, freeChannel, channelCache", thread_func_2)
|
||||
|
||||
# ----------------------------------------------------------------------------------
|
||||
|
||||
proc isCached(chan: ChannelRaw): bool =
|
||||
assert not chan.isNil
|
||||
|
||||
var p = channelCache
|
||||
while not p.isNil:
|
||||
if chan.itemsize == p.chanSize and
|
||||
chan.size == p.chanN:
|
||||
for i in 0 ..< p.numCached:
|
||||
if chan == p.cache[i]:
|
||||
return true
|
||||
# No more channel in cache can match
|
||||
return false
|
||||
p = p.next
|
||||
return false
|
||||
|
||||
block: # [ChannelRaw] ChannelRaw caching implementation
|
||||
|
||||
# Start from clean cache slate
|
||||
freeChannelCache()
|
||||
|
||||
block: # Explicit caches allocation
|
||||
check:
|
||||
allocChannelCache(int sizeof(char), 4)
|
||||
allocChannelCache(int sizeof(int), 8)
|
||||
allocChannelCache(int sizeof(ptr float64), 16)
|
||||
|
||||
# Don't create existing channel cache
|
||||
not allocChannelCache(int sizeof(char), 4)
|
||||
not allocChannelCache(int sizeof(int), 8)
|
||||
not allocChannelCache(int sizeof(ptr float64), 16)
|
||||
|
||||
check:
|
||||
channelCacheLen == 3
|
||||
|
||||
# ---------------------------------
|
||||
var chan, stash: array[10, ChannelRaw]
|
||||
|
||||
block: # Implicit caches allocation
|
||||
|
||||
chan[0] = allocChannel(sizeof(char), 4)
|
||||
chan[1] = allocChannel(sizeof(int32), 8)
|
||||
chan[2] = allocChannel(sizeof(ptr float64), 16)
|
||||
|
||||
chan[3] = allocChannel(sizeof(char), 5)
|
||||
chan[4] = allocChannel(sizeof(int64), 8)
|
||||
chan[5] = allocChannel(sizeof(ptr float32), 24)
|
||||
|
||||
# We have caches ready to store specific channel kinds
|
||||
check: channelCacheLen == 6 # Cumulated with previous test
|
||||
# But they are not in cache while in use
|
||||
check:
|
||||
not chan[0].isCached
|
||||
not chan[1].isCached
|
||||
not chan[2].isCached
|
||||
not chan[3].isCached
|
||||
not chan[4].isCached
|
||||
not chan[5].isCached
|
||||
|
||||
block: # Freed channels are returned to cache
|
||||
stash[0..5] = chan.toOpenArray(0, 5)
|
||||
for i in 0 .. 5:
|
||||
# Free the channels
|
||||
freeChannel(chan[i])
|
||||
|
||||
check:
|
||||
stash[0].isCached
|
||||
stash[1].isCached
|
||||
stash[2].isCached
|
||||
stash[3].isCached
|
||||
stash[4].isCached
|
||||
stash[5].isCached
|
||||
|
||||
block: # Cached channels are being reused
|
||||
|
||||
chan[6] = allocChannel(sizeof(char), 4)
|
||||
chan[7] = allocChannel(sizeof(int32), 8)
|
||||
chan[8] = allocChannel(sizeof(ptr float32), 16)
|
||||
chan[9] = allocChannel(sizeof(ptr float64), 16)
|
||||
|
||||
# All (itemsize, queue size, implementation) were already allocated
|
||||
check: channelCacheLen == 6
|
||||
|
||||
# We reused old channels from cache
|
||||
check:
|
||||
chan[6] == stash[0]
|
||||
chan[7] == stash[1]
|
||||
chan[8] == stash[2]
|
||||
# chan[9] - required a fresh alloc
|
||||
|
||||
block: # Clearing the cache
|
||||
|
||||
stash[6..9] = chan.toOpenArray(6, 9)
|
||||
|
||||
for i in 6 .. 9:
|
||||
freeChannel(chan[i])
|
||||
|
||||
check:
|
||||
stash[6].isCached
|
||||
stash[7].isCached
|
||||
stash[8].isCached
|
||||
stash[9].isCached
|
||||
|
||||
freeChannelCache()
|
||||
|
||||
# Check that nothing is cached anymore
|
||||
for i in 0 .. 9:
|
||||
check: not stash[i].isCached
|
||||
# And length is reset to 0
|
||||
check: channelCacheLen == 0
|
||||
|
||||
# Cache can grow again
|
||||
chan[0] = allocChannel(sizeof((int, float, int32, uint)), 1)
|
||||
chan[1] = allocChannel(sizeof(int32), 0)
|
||||
chan[2] = allocChannel(sizeof(int32), 0)
|
||||
|
||||
check: channelCacheLen == 2
|
||||
|
||||
# Interleave cache clear and channel free
|
||||
freeChannelCache()
|
||||
check: channelCacheLen == 0
|
||||
|
||||
freeChannel(chan[0])
|
||||
freeChannel(chan[1])
|
||||
freeChannel(chan[2])
|
||||
@@ -1,67 +0,0 @@
|
||||
discard """
|
||||
matrix: "--threads:on --gc:orc; --threads:on --gc:arc"
|
||||
disabled: "freebsd"
|
||||
"""
|
||||
|
||||
import std/channels
|
||||
import std/os
|
||||
|
||||
var chan = newChannel[string]()
|
||||
|
||||
# This proc will be run in another thread using the threads module.
|
||||
proc firstWorker() =
|
||||
chan.send("Hello World!")
|
||||
|
||||
# This is another proc to run in a background thread. This proc takes a while
|
||||
# to send the message since it sleeps for 2 seconds (or 2000 milliseconds).
|
||||
proc secondWorker() =
|
||||
sleep(2000)
|
||||
chan.send("Another message")
|
||||
|
||||
|
||||
# Launch the worker.
|
||||
var worker1: Thread[void]
|
||||
createThread(worker1, firstWorker)
|
||||
|
||||
# Block until the message arrives, then print it out.
|
||||
let dest = chan.recv()
|
||||
doAssert dest == "Hello World!"
|
||||
|
||||
# Wait for the thread to exit before moving on to the next example.
|
||||
worker1.joinThread()
|
||||
|
||||
# Launch the other worker.
|
||||
var worker2: Thread[void]
|
||||
createThread(worker2, secondWorker)
|
||||
# This time, use a non-blocking approach with tryRecv.
|
||||
# Since the main thread is not blocked, it could be used to perform other
|
||||
# useful work while it waits for data to arrive on the channel.
|
||||
|
||||
var messages: seq[string]
|
||||
var msg = ""
|
||||
while true:
|
||||
let tried = chan.tryRecv(msg)
|
||||
if tried:
|
||||
messages.add move(msg)
|
||||
break
|
||||
|
||||
messages.add "Pretend I'm doing useful work..."
|
||||
# For this example, sleep in order not to flood stdout with the above
|
||||
# message.
|
||||
sleep(400)
|
||||
|
||||
# Wait for the second thread to exit before cleaning up the channel.
|
||||
worker2.joinThread()
|
||||
|
||||
# Clean up the channel.
|
||||
doAssert chan.close()
|
||||
doAssert messages[^1] == "Another message"
|
||||
doAssert messages.len >= 2
|
||||
|
||||
|
||||
block:
|
||||
let chan0 = newChannel[int]()
|
||||
let chan1 = chan0
|
||||
block:
|
||||
let chan3 = chan0
|
||||
let chan4 = chan0
|
||||
Reference in New Issue
Block a user