Improve documentation for threadpool (#16821)

This commit is contained in:
konsumlamm
2021-01-27 20:05:31 +01:00
committed by GitHub
parent 11a54ab2b2
commit c9801d7abc

View File

@@ -7,15 +7,16 @@
# distribution, for details about the copyright.
#
## Implements Nim's `spawn <manual_experimental.html#parallel-amp-spawn>`_.
##
## **See also:**
## * `threads module <threads.html>`_
## * `channels module <channels.html>`_
## * `locks module <locks.html>`_
## * `asyncdispatch module <asyncdispatch.html>`_
## Implements Nim's `parallel & spawn statements <manual_experimental.html#parallel-amp-spawn>`_.
##
## Unstable API.
##
## See also
## ========
## * `threads module <threads.html>`_ for basic thread support
## * `channels module <channels.html>`_ for message passing support
## * `locks module <locks.html>`_ for locks and condition variables
## * `asyncdispatch module <asyncdispatch.html>`_ for asynchronous IO
when not compileOption("threads"):
{.error: "Threadpool requires --threads:on option.".}
@@ -101,7 +102,7 @@ type
cv: Semaphore
idx: int
FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for ``FlowVar[T]``.
FlowVarBase* = ref FlowVarBaseObj ## Untyped base class for `FlowVar[T] <#FlowVar>`_.
FlowVarBaseObj = object of RootObj
ready, usesSemaphore, awaited: bool
cv: Semaphore # for 'blockUntilAny' support
@@ -114,7 +115,7 @@ type
FlowVarObj[T] = object of FlowVarBaseObj
blob: T
FlowVar*{.compilerproc.}[T] = ref FlowVarObj[T] ## A data flow variable.
FlowVar*[T] {.compilerproc.} = ref FlowVarObj[T] ## A data flow variable.
ToFreeQueue = object
len: int
@@ -138,7 +139,7 @@ type
const threadpoolWaitMs {.intdefine.}: int = 100
proc blockUntil*(fv: var FlowVarBaseObj) =
## Waits until the value for the ``fv`` arrives.
## Waits until the value for `fv` arrives.
##
## Usually it is not necessary to call this explicitly.
if fv.usesSemaphore and not fv.awaited:
@@ -230,12 +231,12 @@ proc nimFlowVarSignal(fv: FlowVarBase) {.compilerproc.} =
signal(fv.cv)
proc awaitAndThen*[T](fv: FlowVar[T]; action: proc (x: T) {.closure.}) =
## Blocks until the ``fv`` is available and then passes its value
## to ``action``.
## Blocks until `fv` is available and then passes its value
## to `action`.
##
## Note that due to Nim's parameter passing semantics this
## means that ``T`` doesn't need to be copied so ``awaitAndThen`` can
## sometimes be more efficient than `^ proc <#^,FlowVar[T]>`_.
## Note that due to Nim's parameter passing semantics, this
## means that `T` doesn't need to be copied, so `awaitAndThen` can
## sometimes be more efficient than the `^ proc <#^,FlowVar[T]>`_.
blockUntil(fv[])
when defined(nimV2):
action(fv.blob)
@@ -266,15 +267,15 @@ proc `^`*[T](fv: FlowVar[T]): T =
finished(fv[])
proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar``
## Awaits any of the given `flowVars`. Returns the index of one `flowVar`
## for which a value arrived.
##
## A ``flowVar`` only supports one call to ``blockUntilAny`` at the same time.
## That means if you ``blockUntilAny([a,b])`` and ``blockUntilAny([b,c])``
## the second call will only block until ``c``. If there is no ``flowVar`` left
## A `flowVar` only supports one call to `blockUntilAny` at the same time.
## That means if you `blockUntilAny([a,b])` and `blockUntilAny([b,c])`
## the second call will only block until `c`. If there is no `flowVar` left
## to be able to wait on, -1 is returned.
##
## **Note**: This results in non-deterministic behaviour and should be avoided.
## **Note:** This results in non-deterministic behaviour and should be avoided.
var ai: AwaitInfo
ai.cv.initSemaphore()
var conflicts = 0
@@ -295,9 +296,9 @@ proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
destroySemaphore(ai.cv)
proc isReady*(fv: FlowVarBase): bool =
## Determines whether the specified ``FlowVarBase``'s value is available.
## Determines whether the specified `FlowVarBase`'s value is available.
##
## If ``true``, awaiting ``fv`` will not block.
## If `true`, awaiting `fv` will not block.
if fv.usesSemaphore and not fv.awaited:
acquire(fv.cv.L)
result = fv.cv.counter > 0
@@ -315,7 +316,7 @@ const
MaxDistinguishedThread* {.intdefine.} = 32 ## Maximum number of "distinguished" threads.
type
ThreadId* = range[0..MaxDistinguishedThread-1]
ThreadId* = range[0..MaxDistinguishedThread-1] ## A thread identifier.
var
currentPoolSize: int
@@ -402,7 +403,7 @@ proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
## Sets the maximum thread pool size. The default value of this
## is ``MaxThreadPoolSize`` (256).
## is `MaxThreadPoolSize <#MaxThreadPoolSize>`_.
maxPoolSize = size
if currentPoolSize > maxPoolSize:
for i in maxPoolSize..currentPoolSize-1:
@@ -442,43 +443,43 @@ proc setup() =
for i in 0..<currentPoolSize: activateWorkerThread(i)
proc preferSpawn*(): bool =
## Use this proc to determine quickly if a ``spawn`` or a direct call is
## Use this proc to determine quickly if a `spawn` or a direct call is
## preferable.
##
## If it returns ``true``, a ``spawn`` may make sense. In general
## it is not necessary to call this directly; use `spawnX template
## If it returns `true`, a `spawn` may make sense. In general
## it is not necessary to call this directly; use the `spawnX template
## <#spawnX.t>`_ instead.
result = gSomeReady.counter > 0
proc spawn*(call: sink typed): void {.magic: "Spawn".}
## Always spawns a new task, so that the ``call`` is never executed on
## Always spawns a new task, so that the `call` is never executed on
## the calling thread.
##
## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a
## return type that is either ``void`` or compatible with ``FlowVar[T]``.
## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
## return type that is either `void` or compatible with `FlowVar[T]`.
proc pinnedSpawn*(id: ThreadId; call: sink typed): void {.magic: "Spawn".}
## Always spawns a new task on the worker thread with ``id``, so that
## the ``call`` is **always** executed on the thread.
## Always spawns a new task on the worker thread with `id`, so that
## the `call` is **always** executed on the thread.
##
## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a
## return type that is either ``void`` or compatible with ``FlowVar[T]``.
## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
## return type that is either `void` or compatible with `FlowVar[T]`.
template spawnX*(call) =
## Spawns a new task if a CPU core is ready, otherwise executes the
## call in the calling thread.
##
## Usually it is advised to use `spawn proc <#spawn,sinktyped>`_ in order to
## not block the producer for an unknown amount of time.
## Usually, it is advised to use the `spawn proc <#spawn,sinktyped>`_
## in order to not block the producer for an unknown amount of time.
##
## ``call`` has to be proc call ``p(...)`` where ``p`` is gcsafe and has a
## return type that is either 'void' or compatible with ``FlowVar[T]``.
## `call` has to be a proc call `p(...)` where `p` is gcsafe and has a
## return type that is either 'void' or compatible with `FlowVar[T]`.
(if preferSpawn(): spawn call else: call)
proc parallel*(body: untyped) {.magic: "Parallel".}
## A parallel section can be used to execute a block in parallel.
##
## ``body`` has to be in a DSL that is a particular subset of the language.
## `body` has to be in a DSL that is a particular subset of the language.
##
## Please refer to `the manual <manual_experimental.html#parallel-amp-spawn>`_
## for further information.
@@ -585,7 +586,7 @@ proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerproc.} =
proc sync*() =
## A simple barrier to wait for all ``spawn``'ed tasks.
## A simple barrier to wait for all `spawn`ed tasks.
##
## If you need more elaborate waiting, you have to use an explicit barrier.
while true: