mirror of
https://github.com/nim-lang/Nim.git
synced 2025-12-29 01:14:41 +00:00
499 lines
14 KiB
Nim
499 lines
14 KiB
Nim
#
|
|
#
|
|
# The Nim Compiler
|
|
# (c) Copyright 2021 Andreas Prell, Mamy André-Ratsimbazafy & Nim Contributors
|
|
#
|
|
# See the file "copying.txt", included in this
|
|
# distribution, for details about the copyright.
|
|
#
|
|
|
|
|
|
# Based on https://github.com/mratsim/weave/blob/5696d94e6358711e840f8c0b7c684fcc5cbd4472/unused/channels/channels_legacy.nim
|
|
# Those are translations of @aprell (Andreas Prell) original channels from C to Nim
|
|
# (https://github.com/aprell/tasking-2.0/blob/master/src/channel_shm/channel.c)
|
|
# And in turn they are an implementation of Michael & Scott lock-based queues
|
|
# (note the paper has 2 channels: lock-free and lock-based) with additional caching:
|
|
# Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
|
|
# Maged M. Michael, Michael L. Scott, 1996
|
|
# https://www.cs.rochester.edu/~scott/papers/1996_PODC_queues.pdf
|
|
|
|
## This module only works with `--gc:arc` or `--gc:orc`.
|
|
##
|
|
## .. warning:: This module is experimental and its interface may change.
|
|
##
|
|
## The following is a simple example of two different ways to use channels:
|
|
## blocking and non-blocking.
|
|
##
|
|
|
|
runnableExamples("--threads:on --gc:orc"):
|
|
import std/os
|
|
|
|
# In this example a channel is declared at module scope.
|
|
# Channels are generic, and they include support for passing objects between
|
|
# threads.
|
|
# Note that isolated data passed through channels is moved around.
|
|
var chan = newChannel[string]()
|
|
|
|
# This proc will be run in another thread using the threads module.
|
|
proc firstWorker() =
|
|
chan.send("Hello World!")
|
|
|
|
# This is another proc to run in a background thread. This proc takes a while
|
|
# to send the message since it sleeps for 2 seconds (or 2000 milliseconds).
|
|
proc secondWorker() =
|
|
sleep(2000)
|
|
chan.send("Another message")
|
|
|
|
# Launch the worker.
|
|
var worker1: Thread[void]
|
|
createThread(worker1, firstWorker)
|
|
|
|
# Block until the message arrives, then print it out.
|
|
let dest = chan.recv()
|
|
assert dest == "Hello World!"
|
|
|
|
# Wait for the thread to exit before moving on to the next example.
|
|
worker1.joinThread()
|
|
|
|
# Launch the other worker.
|
|
var worker2: Thread[void]
|
|
createThread(worker2, secondWorker)
|
|
# This time, use a non-blocking approach with tryRecv.
|
|
# Since the main thread is not blocked, it could be used to perform other
|
|
# useful work while it waits for data to arrive on the channel.
|
|
var messages: seq[string]
|
|
while true:
|
|
var msg = ""
|
|
if chan.tryRecv(msg):
|
|
messages.add msg # "Another message"
|
|
break
|
|
|
|
messages.add "Pretend I'm doing useful work..."
|
|
# For this example, sleep in order not to flood stdout with the above
|
|
# message.
|
|
sleep(400)
|
|
|
|
# Wait for the second thread to exit before cleaning up the channel.
|
|
worker2.joinThread()
|
|
|
|
# Clean up the channel.
|
|
assert chan.close()
|
|
|
|
assert messages[^1] == "Another message"
|
|
assert messages.len >= 2
|
|
|
|
|
|
when not defined(gcArc) and not defined(gcOrc) and not defined(nimdoc):
|
|
{.error: "This channel implementation requires --gc:arc or --gc:orc".}
|
|
|
|
import std/[locks, atomics, isolation]
|
|
import system/ansi_c
|
|
|
|
# Channel (Shared memory channels)
|
|
# ----------------------------------------------------------------------------------
|
|
|
|
const
|
|
cacheLineSize {.intdefine.} = 64 # TODO: some Samsung phone have 128 cache-line
|
|
nimChannelCacheSize* {.intdefine.} = 100
|
|
|
|
type
|
|
ChannelRaw = ptr ChannelObj
|
|
ChannelObj = object
|
|
headLock, tailLock: Lock
|
|
notFullCond, notEmptyCond: Cond
|
|
closed: Atomic[bool]
|
|
size: int
|
|
itemsize: int # up to itemsize bytes can be exchanged over this channel
|
|
head {.align: cacheLineSize.} : int # Items are taken from head and new items are inserted at tail
|
|
tail: int
|
|
buffer: ptr UncheckedArray[byte]
|
|
atomicCounter: Atomic[int]
|
|
|
|
ChannelCache = ptr ChannelCacheObj
|
|
ChannelCacheObj = object
|
|
next: ChannelCache
|
|
chanSize: int
|
|
chanN: int
|
|
numCached: int
|
|
cache: array[nimChannelCacheSize, ChannelRaw]
|
|
|
|
# ----------------------------------------------------------------------------------
|
|
|
|
proc numItems(chan: ChannelRaw): int {.inline.} =
|
|
result = chan.tail - chan.head
|
|
if result < 0:
|
|
inc(result, 2 * chan.size)
|
|
|
|
assert result <= chan.size
|
|
|
|
template isFull(chan: ChannelRaw): bool =
|
|
abs(chan.tail - chan.head) == chan.size
|
|
|
|
template isEmpty(chan: ChannelRaw): bool =
|
|
chan.head == chan.tail
|
|
|
|
# Unbuffered / synchronous channels
|
|
# ----------------------------------------------------------------------------------
|
|
|
|
template numItemsUnbuf(chan: ChannelRaw): int =
|
|
chan.head
|
|
|
|
template isFullUnbuf(chan: ChannelRaw): bool =
|
|
chan.head == 1
|
|
|
|
template isEmptyUnbuf(chan: ChannelRaw): bool =
|
|
chan.head == 0
|
|
|
|
# ChannelRaw kinds
|
|
# ----------------------------------------------------------------------------------
|
|
|
|
func isUnbuffered(chan: ChannelRaw): bool =
|
|
chan.size - 1 == 0
|
|
|
|
# ChannelRaw status and properties
|
|
# ----------------------------------------------------------------------------------
|
|
|
|
proc isClosed(chan: ChannelRaw): bool {.inline.} = load(chan.closed, moRelaxed)
|
|
|
|
proc peek(chan: ChannelRaw): int {.inline.} =
|
|
(if chan.isUnbuffered: numItemsUnbuf(chan) else: numItems(chan))
|
|
|
|
# Per-thread channel cache
|
|
# ----------------------------------------------------------------------------------
|
|
|
|
var channelCache {.threadvar.}: ChannelCache
|
|
var channelCacheLen {.threadvar.}: int
|
|
|
|
proc allocChannelCache(size, n: int): bool =
|
|
## Allocate a free list for storing channels of a given type
|
|
var p = channelCache
|
|
|
|
# Avoid multiple free lists for the exact same type of channel
|
|
while not p.isNil:
|
|
if size == p.chanSize and n == p.chanN:
|
|
return false
|
|
p = p.next
|
|
|
|
p = cast[ptr ChannelCacheObj](c_malloc(csize_t sizeof(ChannelCacheObj)))
|
|
if p.isNil:
|
|
raise newException(OutOfMemDefect, "Could not allocate memory")
|
|
|
|
p.chanSize = size
|
|
p.chanN = n
|
|
p.numCached = 0
|
|
|
|
p.next = channelCache
|
|
channelCache = p
|
|
inc channelCacheLen
|
|
result = true
|
|
|
|
proc freeChannelCache*() =
|
|
## Frees the entire channel cache, including all channels
|
|
var p = channelCache
|
|
var q: ChannelCache
|
|
|
|
while not p.isNil:
|
|
q = p.next
|
|
for i in 0 ..< p.numCached:
|
|
let chan = p.cache[i]
|
|
if not chan.buffer.isNil:
|
|
c_free(chan.buffer)
|
|
deinitLock(chan.headLock)
|
|
deinitLock(chan.tailLock)
|
|
deinitCond(chan.notFullCond)
|
|
deinitCond(chan.notEmptyCond)
|
|
c_free(chan)
|
|
c_free(p)
|
|
dec channelCacheLen
|
|
p = q
|
|
|
|
assert(channelCacheLen == 0)
|
|
channelCache = nil
|
|
|
|
# Channels memory ops
|
|
# ----------------------------------------------------------------------------------
|
|
|
|
proc allocChannel(size, n: int): ChannelRaw =
|
|
when nimChannelCacheSize > 0:
|
|
var p = channelCache
|
|
|
|
while not p.isNil:
|
|
if size == p.chanSize and n == p.chanN:
|
|
# Check if free list contains channel
|
|
if p.numCached > 0:
|
|
dec p.numCached
|
|
result = p.cache[p.numCached]
|
|
assert(result.isEmpty)
|
|
return
|
|
else:
|
|
# All the other lists in cache won't match
|
|
break
|
|
p = p.next
|
|
|
|
result = cast[ChannelRaw](c_malloc(csize_t sizeof(ChannelObj)))
|
|
if result.isNil:
|
|
raise newException(OutOfMemDefect, "Could not allocate memory")
|
|
|
|
# To buffer n items, we allocate for n
|
|
result.buffer = cast[ptr UncheckedArray[byte]](c_malloc(csize_t n*size))
|
|
if result.buffer.isNil:
|
|
raise newException(OutOfMemDefect, "Could not allocate memory")
|
|
|
|
initLock(result.headLock)
|
|
initLock(result.tailLock)
|
|
initCond(result.notFullCond)
|
|
initCond(result.notEmptyCond)
|
|
|
|
result.closed.store(false, moRelaxed) # We don't need atomic here, how to?
|
|
result.size = n
|
|
result.itemsize = size
|
|
result.head = 0
|
|
result.tail = 0
|
|
result.atomicCounter.store(0, moRelaxed)
|
|
|
|
when nimChannelCacheSize > 0:
|
|
# Allocate a cache as well if one of the proper size doesn't exist
|
|
discard allocChannelCache(size, n)
|
|
|
|
proc freeChannel(chan: ChannelRaw) =
|
|
if chan.isNil:
|
|
return
|
|
|
|
when nimChannelCacheSize > 0:
|
|
var p = channelCache
|
|
while not p.isNil:
|
|
if chan.itemsize == p.chanSize and
|
|
chan.size == p.chanN:
|
|
if p.numCached < nimChannelCacheSize:
|
|
# If space left in cache, cache it
|
|
p.cache[p.numCached] = chan
|
|
inc p.numCached
|
|
return
|
|
else:
|
|
# All the other lists in cache won't match
|
|
break
|
|
p = p.next
|
|
|
|
if not chan.buffer.isNil:
|
|
c_free(chan.buffer)
|
|
|
|
deinitLock(chan.headLock)
|
|
deinitLock(chan.tailLock)
|
|
deinitCond(chan.notFullCond)
|
|
deinitCond(chan.notEmptyCond)
|
|
|
|
c_free(chan)
|
|
|
|
# MPMC Channels (Multi-Producer Multi-Consumer)
|
|
# ----------------------------------------------------------------------------------
|
|
|
|
proc sendUnbufferedMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool =
|
|
if nonBlocking and chan.isFullUnbuf:
|
|
return false
|
|
|
|
acquire(chan.headLock)
|
|
|
|
if nonBlocking and chan.isFullUnbuf:
|
|
# Another thread was faster
|
|
release(chan.headLock)
|
|
return false
|
|
|
|
while chan.isFullUnbuf:
|
|
wait(chan.notFullcond, chan.headLock)
|
|
|
|
assert chan.isEmptyUnbuf
|
|
assert size <= chan.itemsize
|
|
copyMem(chan.buffer, data, size)
|
|
|
|
chan.head = 1
|
|
|
|
signal(chan.notEmptyCond)
|
|
release(chan.headLock)
|
|
result = true
|
|
|
|
proc sendMpmc(chan: ChannelRaw, data: sink pointer, size: int, nonBlocking: bool): bool =
|
|
assert not chan.isNil
|
|
assert not data.isNil
|
|
|
|
if isUnbuffered(chan):
|
|
return sendUnbufferedMpmc(chan, data, size, nonBlocking)
|
|
|
|
if nonBlocking and chan.isFull:
|
|
return false
|
|
|
|
acquire(chan.tailLock)
|
|
|
|
if nonBlocking and chan.isFull:
|
|
# Another thread was faster
|
|
release(chan.tailLock)
|
|
return false
|
|
|
|
while chan.isFull:
|
|
wait(chan.notFullcond, chan.tailLock)
|
|
|
|
assert not chan.isFull
|
|
assert size <= chan.itemsize
|
|
|
|
let writeIdx = if chan.tail < chan.size: chan.tail
|
|
else: chan.tail - chan.size
|
|
|
|
copyMem(chan.buffer[writeIdx * chan.itemsize].addr, data, size)
|
|
|
|
inc chan.tail
|
|
if chan.tail == 2 * chan.size:
|
|
chan.tail = 0
|
|
|
|
signal(chan.notEmptyCond)
|
|
release(chan.tailLock)
|
|
result = true
|
|
|
|
proc recvUnbufferedMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool =
|
|
if nonBlocking and chan.isEmptyUnbuf:
|
|
return false
|
|
|
|
acquire(chan.headLock)
|
|
|
|
if nonBlocking and chan.isEmptyUnbuf:
|
|
# Another thread was faster
|
|
release(chan.headLock)
|
|
return false
|
|
|
|
while chan.isEmptyUnbuf:
|
|
wait(chan.notEmptyCond, chan.headLock)
|
|
|
|
assert chan.isFullUnbuf
|
|
assert size <= chan.itemsize
|
|
|
|
copyMem(data, chan.buffer, size)
|
|
|
|
chan.head = 0
|
|
|
|
signal(chan.notFullCond)
|
|
release(chan.headLock)
|
|
result = true
|
|
|
|
proc recvMpmc(chan: ChannelRaw, data: pointer, size: int, nonBlocking: bool): bool =
|
|
assert not chan.isNil
|
|
assert not data.isNil
|
|
|
|
if isUnbuffered(chan):
|
|
return recvUnbufferedMpmc(chan, data, size, nonBlocking)
|
|
|
|
if nonBlocking and chan.isEmpty:
|
|
return false
|
|
|
|
acquire(chan.headLock)
|
|
|
|
if nonBlocking and chan.isEmpty:
|
|
# Another thread took the last data
|
|
release(chan.headLock)
|
|
return false
|
|
|
|
while chan.isEmpty:
|
|
wait(chan.notEmptyCond, chan.headLock)
|
|
|
|
assert not chan.isEmpty
|
|
assert size <= chan.itemsize
|
|
|
|
let readIdx = if chan.head < chan.size: chan.head
|
|
else: chan.head - chan.size
|
|
|
|
copyMem(data, chan.buffer[readIdx * chan.itemsize].addr, size)
|
|
|
|
inc chan.head
|
|
if chan.head == 2 * chan.size:
|
|
chan.head = 0
|
|
|
|
signal(chan.notFullCond)
|
|
release(chan.headLock)
|
|
result = true
|
|
|
|
proc channelCloseMpmc(chan: ChannelRaw): bool =
|
|
# Unsynchronized
|
|
|
|
if chan.isClosed:
|
|
# ChannelRaw already closed
|
|
return false
|
|
|
|
store(chan.closed, true, moRelaxed)
|
|
result = true
|
|
|
|
proc channelOpenMpmc(chan: ChannelRaw): bool =
|
|
# Unsynchronized
|
|
|
|
if not chan.isClosed:
|
|
# ChannelRaw already open
|
|
return false
|
|
|
|
store(chan.closed, false, moRelaxed)
|
|
result = true
|
|
|
|
# Public API
|
|
# ----------------------------------------------------------------------------------
|
|
|
|
type
|
|
Channel*[T] = object ## Typed channels
|
|
d: ChannelRaw
|
|
|
|
proc `=destroy`*[T](c: var Channel[T]) =
|
|
if c.d != nil:
|
|
if load(c.d.atomicCounter, moAcquire) == 0:
|
|
if c.d.buffer != nil:
|
|
freeChannel(c.d)
|
|
else:
|
|
atomicDec(c.d.atomicCounter)
|
|
|
|
proc `=copy`*[T](dest: var Channel[T], src: Channel[T]) =
|
|
## Shares `Channel` by reference counting.
|
|
if src.d != nil:
|
|
atomicInc(src.d.atomicCounter)
|
|
|
|
if dest.d != nil:
|
|
`=destroy`(dest)
|
|
dest.d = src.d
|
|
|
|
func trySend*[T](c: Channel[T], src: var Isolated[T]): bool {.inline.} =
|
|
## Sends item to the channel(non blocking).
|
|
var data = src.extract
|
|
result = sendMpmc(c.d, data.addr, sizeof(T), true)
|
|
if result:
|
|
wasMoved(data)
|
|
|
|
template trySend*[T](c: Channel[T], src: T): bool =
|
|
## Helper templates for `trySend`.
|
|
trySend(c, isolate(src))
|
|
|
|
func tryRecv*[T](c: Channel[T], dst: var T): bool {.inline.} =
|
|
## Receives item from the channel(non blocking).
|
|
recvMpmc(c.d, dst.addr, sizeof(T), true)
|
|
|
|
func send*[T](c: Channel[T], src: sink Isolated[T]) {.inline.} =
|
|
## Sends item to the channel(blocking).
|
|
var data = src.extract
|
|
discard sendMpmc(c.d, data.addr, sizeof(T), false)
|
|
wasMoved(data)
|
|
|
|
template send*[T](c: Channel[T]; src: T) =
|
|
## Helper templates for `send`.
|
|
send(c, isolate(src))
|
|
|
|
func recv*[T](c: Channel[T]): T {.inline.} =
|
|
## Receives item from the channel(blocking).
|
|
discard recvMpmc(c.d, result.addr, sizeof(result), false)
|
|
|
|
func open*[T](c: Channel[T]): bool {.inline.} =
|
|
result = c.d.channelOpenMpmc()
|
|
|
|
func close*[T](c: Channel[T]): bool {.inline.} =
|
|
result = c.d.channelCloseMpmc()
|
|
|
|
func peek*[T](c: Channel[T]): int {.inline.} = peek(c.d)
|
|
|
|
proc newChannel*[T](elements = 30): Channel[T] =
|
|
## Returns a new `Channel`. `elements` should be positive.
|
|
## `elements` is used to specify whether a channel is buffered or not.
|
|
## If `elements` = 1, the channel is unbuffered. If `elements` > 1, the
|
|
## channel is buffered.
|
|
assert elements >= 1, "Elements must be positive!"
|
|
result = Channel[T](d: allocChannel(sizeof(T), elements))
|