Merge branch 'more_concurrency' into devel

Conflicts:
	doc/tut1.txt
	lib/core/locks.nim
	lib/pure/collections/tables.nim
	lib/pure/selectors.nim
This commit is contained in:
Araq
2015-06-30 12:50:24 +02:00
13 changed files with 584 additions and 265 deletions

View File

@@ -187,7 +187,7 @@ proc genDeref*(n: PNode): PNode =
result.add n
proc callCodegenProc*(name: string, arg1: PNode;
arg2, arg3: PNode = nil): PNode =
arg2, arg3, optionalArgs: PNode = nil): PNode =
result = newNodeI(nkCall, arg1.info)
let sym = magicsys.getCompilerProc(name)
if sym == nil:
@@ -197,6 +197,9 @@ proc callCodegenProc*(name: string, arg1: PNode;
result.add arg1
if arg2 != nil: result.add arg2
if arg3 != nil: result.add arg3
if optionalArgs != nil:
for i in 1..optionalArgs.len-3:
result.add optionalArgs[i]
result.typ = sym.typ.sons[0]
proc callProc(a: PNode): PNode =
@@ -503,7 +506,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
barrier, dest: PNode = nil): PNode =
# if 'barrier' != nil, then it is in a 'parallel' section and we
# generate quite different code
let n = spawnExpr[1]
let n = spawnExpr[^2]
let spawnKind = spawnResult(retType, barrier!=nil)
case spawnKind
of srVoid:
@@ -589,7 +592,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
fvField = newDotExpr(scratchObj, field)
fvAsExpr = indirectAccess(castExpr, field, n.info)
# create flowVar:
result.add newFastAsgnStmt(fvField, callProc(spawnExpr[2]))
result.add newFastAsgnStmt(fvField, callProc(spawnExpr[^1]))
if barrier == nil:
result.add callCodegenProc("nimFlowVarCreateSemaphore", fvField)
@@ -604,7 +607,7 @@ proc wrapProcForSpawn*(owner: PSym; spawnExpr: PNode; retType: PType;
let wrapper = createWrapperProc(fn, threadParam, argsParam,
varSection, varInit, call,
barrierAsExpr, fvAsExpr, spawnKind)
result.add callCodegenProc("nimSpawn", wrapper.newSymNode,
genAddrOf(scratchObj.newSymNode))
result.add callCodegenProc("nimSpawn" & $spawnExpr.len, wrapper.newSymNode,
genAddrOf(scratchObj.newSymNode), nil, spawnExpr)
if spawnKind == srFlowVar: result.add fvField

View File

@@ -1727,13 +1727,17 @@ proc semMagic(c: PContext, n: PNode, s: PSym, flags: TExprFlags): PNode =
dec c.inParallelStmt
of mSpawn:
result = setMs(n, s)
result.sons[1] = semExpr(c, n.sons[1])
if not result[1].typ.isEmptyType:
if spawnResult(result[1].typ, c.inParallelStmt > 0) == srFlowVar:
result.typ = createFlowVar(c, result[1].typ, n.info)
for i in 1 .. <n.len:
result.sons[i] = semExpr(c, n.sons[i])
let typ = result[^1].typ
if not typ.isEmptyType:
if spawnResult(typ, c.inParallelStmt > 0) == srFlowVar:
result.typ = createFlowVar(c, typ, n.info)
else:
result.typ = result[1].typ
result.add instantiateCreateFlowVarCall(c, result[1].typ, n.info).newSymNode
result.typ = typ
result.add instantiateCreateFlowVarCall(c, typ, n.info).newSymNode
else:
result.add emptyNode
of mProcCall:
result = setMs(n, s)
result.sons[1] = semExpr(c, n.sons[1])

View File

@@ -241,9 +241,10 @@ proc useVar(a: PEffects, n: PNode) =
message(n.info, warnUninit, s.name.s)
# prevent superfluous warnings about the same variable:
a.init.add s.id
if {sfGlobal, sfThread} * s.flags == {sfGlobal} and s.kind in {skVar, skLet}:
if {sfGlobal, sfThread} * s.flags != {} and s.kind in {skVar, skLet}:
if s.guard != nil: guardGlobal(a, n, s.guard)
if (tfHasGCedMem in s.typ.flags or s.typ.isGCedMem):
if {sfGlobal, sfThread} * s.flags == {sfGlobal} and
(tfHasGCedMem in s.typ.flags or s.typ.isGCedMem):
#if warnGcUnsafe in gNotes: warnAboutGcUnsafe(n)
markGcUnsafe(a, s)

View File

@@ -592,7 +592,7 @@ the ``of`` operator can be used to determine the object's type.
.. code-block:: nim
type
Person {.inheritable.} = object
Person = object of RootObj
name*: string # the * means that `name` is accessible from other modules
age: int # no * means that the field is hidden

View File

@@ -1334,11 +1334,11 @@ define operators which accept Slice objects to define ranges.
b = "Slices are useless."
echo a[7..12] # --> 'a prog'
b[11.. ^2] = "useful"
b[11..^2] = "useful"
echo b # --> 'Slices are useful.'
In the previous example slices are used to modify a part of a string, and even
a negative index is used. The slice's bounds can hold any value supported by
In the previous example slices are used to modify a part of a string. The
slice's bounds can hold any value supported by
their type, but it is the proc using the slice object which defines what values
are accepted.

View File

@@ -13,8 +13,7 @@ include "system/syslocks"
type
Lock* = SysLock ## Nim lock; whether this is re-entrant
## or not is unspecified!
## or not is unspecified!
Cond* = SysCond ## Nim condition variable
{.deprecated: [TLock: Lock, TCond: Cond].}

View File

@@ -122,7 +122,6 @@ export Port, SocketFlag
## Limitations/Bugs
## ----------------
##
## * ``except`` statement (without `try`) does not work inside async procedures.
## * The effect system (``raises: []``) does not work with async procedures.
## * Can't await in a ``except`` body
@@ -933,7 +932,7 @@ else:
result = newRawSocket(domain, sockType, protocol).AsyncFD
result.SocketHandle.setBlocking(false)
when defined(macosx):
result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
register(result)
proc newAsyncRawSocket*(domain: Domain = AF_INET,
@@ -942,7 +941,7 @@ else:
result = newRawSocket(domain, sockType, protocol).AsyncFD
result.SocketHandle.setBlocking(false)
when defined(macosx):
result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
result.SocketHandle.setSockOptInt(SOL_SOCKET, SO_NOSIGPIPE, 1)
register(result)
proc closeSocket*(sock: AsyncFD) =

View File

@@ -0,0 +1,154 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2015 Andreas Rumpf
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## Shared string support for Nim.
const ArrayDummySize = when defined(cpu16): 10_000 else: 100_000_000
type
UncheckedCharArray {.unchecked.} = array[0..ArrayDummySize, char]
type
Buffer = ptr object
refcount: int
capacity, realLen: int
data: UncheckedCharArray
SharedString* = object ## A string that can be shared. Slicing is O(1).
buffer: Buffer
first, len: int
proc decRef(b: Buffer) {.inline.} =
if atomicDec(b.refcount) <= 0:
deallocShared(b)
proc incRef(b: Buffer) {.inline.} =
atomicInc(b.refcount)
{.experimental.}
proc `=destroy`*(s: SharedString) =
#echo "destroyed"
if not s.buffer.isNil:
decRef(s.buffer)
when false:
proc `=`*(dest: var SharedString; src: SharedString) =
incRef(src.buffer)
if not dest.buffer.isNil:
decRef(dest.buffer)
dest.buffer = src.buffer
dest.first = src.first
dest.len = src.len
proc len*(s: SharedString): int = s.len
proc `[]`*(s: SharedString; i: Natural): char =
if i < s.len: result = s.buffer.data[i+s.first]
else: raise newException(IndexError, "index out of bounds")
proc `[]=`*(s: var SharedString; i: Natural; value: char) =
if i < s.len: s.buffer.data[i+s.first] = value
else: raise newException(IndexError, "index out of bounds")
proc `[]`*(s: SharedString; ab: Slice[int]): SharedString =
#incRef(src.buffer)
if ab.a < s.len:
result.buffer = s.buffer
result.first = ab.a
result.len = min(s.len, ab.b - ab.a + 1)
# else: produce empty string ;-)
proc newBuffer(cap, len: int): Buffer =
assert cap >= len
result = cast[Buffer](allocShared0(sizeof(int)*3 + cap))
result.refcount = 0
result.capacity = cap
result.realLen = len
proc newSharedString*(len: Natural): SharedString =
if len != 0:
# optimization: Don't have an underlying buffer when 'len == 0'
result.buffer = newBuffer(len, len)
result.first = 0
result.len = len
proc newSharedString*(s: string): SharedString =
let len = s.len
if len != 0:
# optimization: Don't have an underlying buffer when 'len == 0'
result.buffer = newBuffer(len, len)
copyMem(addr result.buffer.data[0], cstring(s), s.len)
result.first = 0
result.len = len
when declared(atomicLoadN):
template load(x): expr = atomicLoadN(addr x, ATOMIC_SEQ_CST)
else:
# XXX Fixme
template load(x): expr = x
proc add*(s: var SharedString; t: cstring; len: Natural) =
if len == 0: return
let newLen = s.len + len
if s.buffer.isNil:
s.buffer = newBuffer(len, len)
copyMem(addr s.buffer.data[0], t, len)
s.len = len
elif newLen >= s.buffer.capacity or s.first != 0 or
s.len != s.buffer.realLen or load(s.buffer.refcount) > 1:
let oldBuf = s.buffer
s.buffer = newBuffer(max(s.buffer.capacity * 3 div 2, newLen), newLen)
copyMem(addr s.buffer.data[0], addr oldBuf.data[s.first], s.len)
copyMem(addr s.buffer.data[s.len], t, len)
decRef(oldBuf)
else:
copyMem(addr s.buffer.data[s.len], t, len)
s.buffer.realLen += len
s.len += len
proc add*(s: var SharedString; t: string) =
s.add(t.cstring, t.len)
proc rawData*(s: var SharedString): pointer =
if s.buffer.isNil: result = nil
else: result = addr s.buffer.data[s.first]
proc add*(s: var SharedString; t: SharedString) =
if t.buffer.isNil: return
s.add(cast[cstring](addr s.buffer.data[s.first]), t.len)
proc `$`*(s: SharedString): string =
result = newString(s.len)
if s.len > 0:
copyMem(addr result[0], addr s.buffer.data[s.first], s.len)
proc `==`*(s: SharedString; t: string): bool =
if s.buffer.isNil: result = t.len == 0
else: result = t.len == s.len and equalMem(addr s.buffer.data[s.first],
cstring(t), t.len)
proc `==`*(s, t: SharedString): bool =
if s.buffer.isNil: result = t.len == 0
else: result = t.len == s.len and equalMem(addr s.buffer.data[s.first],
addr t.buffer.data[t.first], t.len)
iterator items*(s: SharedString): char =
let buf = s.buffer.data
let x = s.first
if buf != nil:
for i in 0..<s.len:
yield buf[i+x]
import hashes
proc hash*(s: SharedString): THash =
var h: THash = 0
for x in s: h = h !& x.hash
result = !$h

View File

@@ -0,0 +1,105 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2015 Andreas Rumpf
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## Shared table support for Nim. Use plain old non GC'ed keys and values or
## you'll be in trouble. Uses a single lock to protect the table, lockfree
## implementations welcome but if lock contention is so high that you need a
## lockfree hash table, you're doing it wrong.
import
hashes, math, locks
type
KeyValuePair[A, B] = tuple[hcode: THash, key: A, val: B]
KeyValuePairSeq[A, B] = ptr array[10_000_000, KeyValuePair[A, B]]
SharedTable* [A, B] = object ## generic hash SharedTable
data: KeyValuePairSeq[A, B]
counter, dataLen: int
lock: TLock
template maxHash(t): expr = t.dataLen-1
include tableimpl
proc enlarge[A, B](t: var SharedTable[A, B]) =
let oldSize = t.dataLen
let size = oldSize * growthFactor
var n = cast[KeyValuePairSeq[A, B]](allocShared0(
sizeof(KeyValuePair[A, B]) * size))
t.dataLen = size
swap(t.data, n)
for i in 0..<oldSize:
if isFilled(n[i].hcode):
var j = -1 - rawGetKnownHC(t, n[i].key, n[i].hcode)
rawInsert(t, t.data, n[i].key, n[i].val, n[i].hcode, j)
deallocShared(n)
template withLock(t, x: untyped) =
acquire(t.lock)
x
release(t.lock)
proc mget*[A, B](t: var SharedTable[A, B], key: A): var B =
## retrieves the value at ``t[key]``. The value can be modified.
## If `key` is not in `t`, the ``KeyError`` exception is raised.
withLock t:
var hc: THash
var index = rawGet(t, key, hc)
let hasKey = index >= 0
if hasKey: result = t.data[index].val
if not hasKey:
when compiles($key):
raise newException(KeyError, "key not found: " & $key)
else:
raise newException(KeyError, "key not found")
proc mgetOrPut*[A, B](t: var SharedTable[A, B], key: A, val: B): var B =
## retrieves value at ``t[key]`` or puts ``val`` if not present, either way
## returning a value which can be modified. **Note**: This is inherently
## unsafe in the context of multi-threading since it returns a pointer
## to ``B``.
withLock t:
mgetOrPutImpl(enlarge)
proc hasKeyOrPut*[A, B](t: var SharedTable[A, B], key: A, val: B): bool =
## returns true iff `key` is in the table, otherwise inserts `value`.
withLock t:
hasKeyOrPutImpl(enlarge)
proc `[]=`*[A, B](t: var SharedTable[A, B], key: A, val: B) =
## puts a (key, value)-pair into `t`.
withLock t:
putImpl(enlarge)
proc add*[A, B](t: var SharedTable[A, B], key: A, val: B) =
## puts a new (key, value)-pair into `t` even if ``t[key]`` already exists.
withLock t:
addImpl(enlarge)
proc del*[A, B](t: var SharedTable[A, B], key: A) =
## deletes `key` from hash table `t`.
withLock t:
delImpl()
proc initSharedTable*[A, B](initialSize=64): SharedTable[A, B] =
## creates a new hash table that is empty.
##
## `initialSize` needs to be a power of two. If you need to accept runtime
## values for this you could use the ``nextPowerOfTwo`` proc from the
## `math <math.html>`_ module or the ``rightSize`` proc from this module.
assert isPowerOfTwo(initialSize)
result.counter = 0
result.dataLen = initialSize
result.data = cast[KeyValuePairSeq[A, B]](allocShared0(
sizeof(KeyValuePair[A, B]) * initialSize))
initLock result.lock
proc deinitSharedTable*[A, B](t: var SharedTable[A, B]) =
deallocShared(t.data)
deinitLock t.lock

View File

@@ -0,0 +1,132 @@
#
#
# Nim's Runtime Library
# (c) Copyright 2015 Andreas Rumpf
#
# See the file "copying.txt", included in this
# distribution, for details about the copyright.
#
## An ``include`` file for the different table implementations.
# hcode for real keys cannot be zero. hcode==0 signifies an empty slot. These
# two procs retain clarity of that encoding without the space cost of an enum.
proc isEmpty(hcode: Hash): bool {.inline.} =
result = hcode == 0
proc isFilled(hcode: Hash): bool {.inline.} =
result = hcode != 0
const
growthFactor = 2
proc mustRehash(length, counter: int): bool {.inline.} =
assert(length > counter)
result = (length * 2 < counter * 3) or (length - counter < 4)
proc nextTry(h, maxHash: Hash): Hash {.inline.} =
result = (h + 1) and maxHash
template rawGetKnownHCImpl() {.dirty.} =
var h: Hash = hc and maxHash(t) # start with real hash value
while isFilled(t.data[h].hcode):
# Compare hc THEN key with boolean short circuit. This makes the common case
# zero ==key's for missing (e.g.inserts) and exactly one ==key for present.
# It does slow down succeeding lookups by one extra Hash cmp&and..usually
# just a few clock cycles, generally worth it for any non-integer-like A.
if t.data[h].hcode == hc and t.data[h].key == key:
return h
h = nextTry(h, maxHash(t))
result = -1 - h # < 0 => MISSING; insert idx = -1 - result
template rawGetImpl() {.dirty.} =
hc = hash(key)
if hc == 0: # This almost never taken branch should be very predictable.
hc = 314159265 # Value doesn't matter; Any non-zero favorite is fine.
rawGetKnownHCImpl()
template rawGetDeepImpl() {.dirty.} = # Search algo for unconditional add
hc = hash(key)
if hc == 0:
hc = 314159265
var h: Hash = hc and maxHash(t)
while isFilled(t.data[h].hcode):
h = nextTry(h, maxHash(t))
result = h
template rawInsertImpl() {.dirty.} =
data[h].key = key
data[h].val = val
data[h].hcode = hc
proc rawGetKnownHC[X, A](t: X, key: A, hc: Hash): int {.inline.} =
rawGetKnownHCImpl()
proc rawGetDeep[X, A](t: X, key: A, hc: var Hash): int {.inline.} =
rawGetDeepImpl()
proc rawGet[X, A](t: X, key: A, hc: var Hash): int {.inline.} =
rawGetImpl()
proc rawInsert[X, A, B](t: var X, data: var KeyValuePairSeq[A, B],
key: A, val: B, hc: Hash, h: Hash) =
rawInsertImpl()
template addImpl(enlarge) {.dirty, immediate.} =
if mustRehash(t.dataLen, t.counter): enlarge(t)
var hc: Hash
var j = rawGetDeep(t, key, hc)
rawInsert(t, t.data, key, val, hc, j)
inc(t.counter)
template maybeRehashPutImpl(enlarge) {.dirty, immediate.} =
if mustRehash(t.dataLen, t.counter):
enlarge(t)
index = rawGetKnownHC(t, key, hc)
index = -1 - index # important to transform for mgetOrPutImpl
rawInsert(t, t.data, key, val, hc, index)
inc(t.counter)
template putImpl(enlarge) {.dirty, immediate.} =
var hc: Hash
var index = rawGet(t, key, hc)
if index >= 0: t.data[index].val = val
else: maybeRehashPutImpl(enlarge)
template mgetOrPutImpl(enlarge) {.dirty, immediate.} =
var hc: Hash
var index = rawGet(t, key, hc)
if index < 0:
# not present: insert (flipping index)
maybeRehashPutImpl(enlarge)
# either way return modifiable val
result = t.data[index].val
template hasKeyOrPutImpl(enlarge) {.dirty, immediate.} =
var hc: Hash
var index = rawGet(t, key, hc)
if index < 0:
result = false
maybeRehashPutImpl(enlarge)
else: result = true
template delImpl() {.dirty, immediate.} =
var hc: Hash
var i = rawGet(t, key, hc)
let msk = maxHash(t)
if i >= 0:
t.data[i].hcode = 0
dec(t.counter)
block outer:
while true: # KnuthV3 Algo6.4R adapted for i=i+1 instead of i=i-1
var j = i # The correctness of this depends on (h+1) in nextTry,
var r = j # though may be adaptable to other simple sequences.
t.data[i].hcode = 0 # mark current EMPTY
while true:
i = (i + 1) and msk # increment mod table size
if isEmpty(t.data[i].hcode): # end of collision cluster; So all done
break outer
r = t.data[i].hcode and msk # "home" location of key@i
if not ((i >= r and r > j) or (r > j and j > i) or (j > i and i >= r)):
break
shallowCopy(t.data[j], t.data[i]) # data[j] will be marked EMPTY next loop

View File

@@ -68,65 +68,20 @@
import
hashes, math
{.pragma: myShallow.}
type
KeyValuePair[A, B] = tuple[hcode: Hash, key: A, val: B]
KeyValuePairSeq[A, B] = seq[KeyValuePair[A, B]]
Table* {.myShallow.}[A, B] = object ## generic hash table
Table*[A, B] = object ## generic hash table
data: KeyValuePairSeq[A, B]
counter: int
TableRef*[A,B] = ref Table[A, B]
{.deprecated: [TTable: Table, PTable: TableRef].}
when not defined(nimhygiene):
{.pragma: dirty.}
template maxHash(t): expr {.immediate.} = high(t.data)
template dataLen(t): expr = len(t.data)
# hcode for real keys cannot be zero. hcode==0 signifies an empty slot. These
# two procs retain clarity of that encoding without the space cost of an enum.
proc isEmpty(hcode: Hash): bool {.inline.} =
result = hcode == 0
proc isFilled(hcode: Hash): bool {.inline.} =
result = hcode != 0
proc len*[A, B](t: Table[A, B]): int =
## returns the number of keys in `t`.
result = t.counter
iterator pairs*[A, B](t: Table[A, B]): (A, B) =
## iterates over any (key, value) pair in the table `t`.
for h in 0..high(t.data):
if isFilled(t.data[h].hcode): yield (t.data[h].key, t.data[h].val)
iterator mpairs*[A, B](t: var Table[A, B]): (A, var B) =
## iterates over any (key, value) pair in the table `t`. The values
## can be modified.
for h in 0..high(t.data):
if isFilled(t.data[h].hcode): yield (t.data[h].key, t.data[h].val)
iterator keys*[A, B](t: Table[A, B]): A =
## iterates over any key in the table `t`.
for h in 0..high(t.data):
if isFilled(t.data[h].hcode): yield t.data[h].key
iterator values*[A, B](t: Table[A, B]): B =
## iterates over any value in the table `t`.
for h in 0..high(t.data):
if isFilled(t.data[h].hcode): yield t.data[h].val
iterator mvalues*[A, B](t: var Table[A, B]): var B =
## iterates over any value in the table `t`. The values can be modified.
for h in 0..high(t.data):
if isFilled(t.data[h].hcode): yield t.data[h].val
const
growthFactor = 2
proc mustRehash(length, counter: int): bool {.inline.} =
assert(length > counter)
result = (length * 2 < counter * 3) or (length - counter < 4)
include tableimpl
proc rightSize*(count: Natural): int {.inline.} =
## Return the value of `initialSize` to support `count` items.
@@ -137,49 +92,9 @@ proc rightSize*(count: Natural): int {.inline.} =
## Internally, we want mustRehash(rightSize(x), x) == false.
result = nextPowerOfTwo(count * 3 div 2 + 4)
proc nextTry(h, maxHash: Hash): Hash {.inline.} =
result = (h + 1) and maxHash
template rawGetKnownHCImpl() {.dirty.} =
var h: Hash = hc and high(t.data) # start with real hash value
while isFilled(t.data[h].hcode):
# Compare hc THEN key with boolean short circuit. This makes the common case
# zero ==key's for missing (e.g.inserts) and exactly one ==key for present.
# It does slow down succeeding lookups by one extra Hash cmp&and..usually
# just a few clock cycles, generally worth it for any non-integer-like A.
if t.data[h].hcode == hc and t.data[h].key == key:
return h
h = nextTry(h, high(t.data))
result = -1 - h # < 0 => MISSING; insert idx = -1 - result
template rawGetImpl() {.dirty.} =
hc = hash(key)
if hc == 0: # This almost never taken branch should be very predictable.
hc = 314159265 # Value doesn't matter; Any non-zero favorite is fine.
rawGetKnownHCImpl()
template rawGetDeepImpl() {.dirty.} = # Search algo for unconditional add
hc = hash(key)
if hc == 0:
hc = 314159265
var h: Hash = hc and high(t.data)
while isFilled(t.data[h].hcode):
h = nextTry(h, high(t.data))
result = h
template rawInsertImpl() {.dirty.} =
data[h].key = key
data[h].val = val
data[h].hcode = hc
proc rawGetKnownHC[A, B](t: Table[A, B], key: A, hc: Hash): int {.inline.} =
rawGetKnownHCImpl()
proc rawGetDeep[A, B](t: Table[A, B], key: A, hc: var Hash): int {.inline.} =
rawGetDeepImpl()
proc rawGet[A, B](t: Table[A, B], key: A, hc: var Hash): int {.inline.} =
rawGetImpl()
proc len*[A, B](t: Table[A, B]): int =
## returns the number of keys in `t`.
result = t.counter
proc `[]`*[A, B](t: Table[A, B], key: A): B =
## retrieves the value at ``t[key]``. If `key` is not in `t`,
@@ -219,9 +134,35 @@ proc contains*[A, B](t: Table[A, B], key: A): bool =
## alias of `hasKey` for use with the `in` operator.
return hasKey[A, B](t, key)
proc rawInsert[A, B](t: var Table[A, B], data: var KeyValuePairSeq[A, B],
key: A, val: B, hc: Hash, h: Hash) =
rawInsertImpl()
iterator pairs*[A, B](t: Table[A, B]): (A, B) =
## iterates over any (key, value) pair in the table `t`.
for h in 0..high(t.data):
if isFilled(t.data[h].hcode): yield (t.data[h].key, t.data[h].val)
iterator mpairs*[A, B](t: var Table[A, B]): (A, var B) =
## iterates over any (key, value) pair in the table `t`. The values
## can be modified.
for h in 0..high(t.data):
if isFilled(t.data[h].hcode): yield (t.data[h].key, t.data[h].val)
iterator keys*[A, B](t: Table[A, B]): A =
## iterates over any key in the table `t`.
for h in 0..high(t.data):
if isFilled(t.data[h].hcode): yield t.data[h].key
iterator values*[A, B](t: Table[A, B]): B =
## iterates over any value in the table `t`.
for h in 0..high(t.data):
if isFilled(t.data[h].hcode): yield t.data[h].val
iterator mvalues*[A, B](t: var Table[A, B]): var B =
## iterates over any value in the table `t`. The values can be modified.
for h in 0..high(t.data):
if isFilled(t.data[h].hcode): yield t.data[h].val
proc del*[A, B](t: var Table[A, B], key: A) =
## deletes `key` from hash table `t`.
delImpl()
proc enlarge[A, B](t: var Table[A, B]) =
var n: KeyValuePairSeq[A, B]
@@ -232,81 +173,26 @@ proc enlarge[A, B](t: var Table[A, B]) =
var j = -1 - rawGetKnownHC(t, n[i].key, n[i].hcode)
rawInsert(t, t.data, n[i].key, n[i].val, n[i].hcode, j)
template addImpl() {.dirty.} =
if mustRehash(len(t.data), t.counter): enlarge(t)
var hc: Hash
var j = rawGetDeep(t, key, hc)
rawInsert(t, t.data, key, val, hc, j)
inc(t.counter)
template maybeRehashPutImpl() {.dirty.} =
if mustRehash(len(t.data), t.counter):
enlarge(t)
index = rawGetKnownHC(t, key, hc)
index = -1 - index # important to transform for mgetOrPutImpl
rawInsert(t, t.data, key, val, hc, index)
inc(t.counter)
template putImpl() {.dirty.} =
var hc: Hash
var index = rawGet(t, key, hc)
if index >= 0: t.data[index].val = val
else: maybeRehashPutImpl()
template mgetOrPutImpl() {.dirty.} =
var hc: Hash
var index = rawGet(t, key, hc)
if index < 0: maybeRehashPutImpl() # not present: insert (flipping index)
result = t.data[index].val # either way return modifiable val
template hasKeyOrPutImpl() {.dirty.} =
var hc: Hash
var index = rawGet(t, key, hc)
if index < 0:
result = false
maybeRehashPutImpl()
else: result = true
proc mgetOrPut*[A, B](t: var Table[A, B], key: A, val: B): var B =
## retrieves value at ``t[key]`` or puts ``val`` if not present, either way
## returning a value which can be modified.
mgetOrPutImpl()
mgetOrPutImpl(enlarge)
proc hasKeyOrPut*[A, B](t: var Table[A, B], key: A, val: B): bool =
## returns true iff `key` is in the table, otherwise inserts `value`.
hasKeyOrPutImpl()
hasKeyOrPutImpl(enlarge)
proc `[]=`*[A, B](t: var Table[A, B], key: A, val: B) =
## puts a (key, value)-pair into `t`.
putImpl()
putImpl(enlarge)
proc add*[A, B](t: var Table[A, B], key: A, val: B) =
## puts a new (key, value)-pair into `t` even if ``t[key]`` already exists.
addImpl()
addImpl(enlarge)
template doWhile(a: expr, b: stmt): stmt =
while true:
b
if not a: break
proc del*[A, B](t: var Table[A, B], key: A) =
## deletes `key` from hash table `t`.
var hc: Hash
var i = rawGet(t, key, hc)
let msk = high(t.data)
if i >= 0:
t.data[i].hcode = 0
dec(t.counter)
while true: # KnuthV3 Algo6.4R adapted for i=i+1 instead of i=i-1
var j = i # The correctness of this depends on (h+1) in nextTry,
var r = j # though may be adaptable to other simple sequences.
t.data[i].hcode = 0 # mark current EMPTY
doWhile ((i >= r and r > j) or (r > j and j > i) or (j > i and i >= r)):
i = (i + 1) and msk # increment mod table size
if isEmpty(t.data[i].hcode): # end of collision cluster; So all done
return
r = t.data[i].hcode and msk # "home" location of key@i
shallowCopy(t.data[j], t.data[i]) # data[j] will be marked EMPTY next loop
proc len*[A, B](t: TableRef[A, B]): int =
## returns the number of keys in `t`.
result = t.counter
proc initTable*[A, B](initialSize=64): Table[A, B] =
## creates a new hash table that is empty.
@@ -360,10 +246,6 @@ proc indexBy*[A, B, C](collection: A, index: proc(x: B): C): Table[C, B] =
for item in collection:
result[index(item)] = item
proc len*[A, B](t: TableRef[A, B]): int =
## returns the number of keys in `t`.
result = t.counter
iterator pairs*[A, B](t: TableRef[A, B]): (A, B) =
## iterates over any (key, value) pair in the table `t`.
for h in 0..high(t.data):
@@ -462,8 +344,7 @@ type
OrderedKeyValuePair[A, B] = tuple[
hcode: Hash, next: int, key: A, val: B]
OrderedKeyValuePairSeq[A, B] = seq[OrderedKeyValuePair[A, B]]
OrderedTable* {.
myShallow.}[A, B] = object ## table that remembers insertion order
OrderedTable* [A, B] = object ## table that remembers insertion order
data: OrderedKeyValuePairSeq[A, B]
counter, first, last: int
OrderedTableRef*[A, B] = ref OrderedTable[A, B]
@@ -569,20 +450,20 @@ proc enlarge[A, B](t: var OrderedTable[A, B]) =
proc `[]=`*[A, B](t: var OrderedTable[A, B], key: A, val: B) =
## puts a (key, value)-pair into `t`.
putImpl()
putImpl(enlarge)
proc add*[A, B](t: var OrderedTable[A, B], key: A, val: B) =
## puts a new (key, value)-pair into `t` even if ``t[key]`` already exists.
addImpl()
addImpl(enlarge)
proc mgetOrPut*[A, B](t: var OrderedTable[A, B], key: A, val: B): var B =
## retrieves value at ``t[key]`` or puts ``value`` if not present, either way
## returning a value which can be modified.
mgetOrPutImpl()
mgetOrPutImpl(enlarge)
proc hasKeyOrPut*[A, B](t: var OrderedTable[A, B], key: A, val: B): bool =
## returns true iff `key` is in the table, otherwise inserts `value`.
hasKeyOrPutImpl()
hasKeyOrPutImpl(enlarge)
proc initOrderedTable*[A, B](initialSize=64): OrderedTable[A, B] =
## creates a new ordered hash table that is empty.
@@ -757,7 +638,7 @@ proc sort*[A, B](t: OrderedTableRef[A, B],
# ------------------------------ count tables -------------------------------
type
CountTable* {.myShallow.}[
CountTable* [
A] = object ## table that counts the number of each key
data: seq[tuple[key: A, val: int]]
counter: int

View File

@@ -128,6 +128,7 @@ type
initialized: bool # whether it has even been initialized
shutdown: bool # the pool requests to shut down this worker thread
q: ToFreeQueue
readyForTask: Semaphore
proc await*(fv: FlowVarBase) =
## waits until the value for the flowVar arrives. Usually it is not necessary
@@ -273,6 +274,10 @@ proc nimArgsPassingDone(p: pointer) {.compilerProc.} =
const
MaxThreadPoolSize* = 256 ## maximal size of the thread pool. 256 threads
## should be good enough for anybody ;-)
MaxDistinguishedThread* = 32 ## maximal number of "distinguished" threads.
type
ThreadId* = range[0..MaxDistinguishedThread-1]
var
currentPoolSize: int
@@ -298,10 +303,25 @@ proc slave(w: ptr Worker) {.thread.} =
w.shutdown = false
atomicDec currentPoolSize
proc distinguishedSlave(w: ptr Worker) {.thread.} =
while true:
when declared(atomicStoreN):
atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
else:
w.ready = true
signal(w.readyForTask)
await(w.taskArrived)
assert(not w.ready)
w.f(w, w.data)
if w.q.len != 0: w.cleanFlowVars
var
workers: array[MaxThreadPoolSize, TThread[ptr Worker]]
workersData: array[MaxThreadPoolSize, Worker]
distinguished: array[MaxDistinguishedThread, TThread[ptr Worker]]
distinguishedData: array[MaxDistinguishedThread, Worker]
proc setMinPoolSize*(size: range[1..MaxThreadPoolSize]) =
## sets the minimal thread pool size. The default value of this is 4.
minPoolSize = size
@@ -315,7 +335,7 @@ proc setMaxPoolSize*(size: range[1..MaxThreadPoolSize]) =
let w = addr(workersData[i])
w.shutdown = true
proc activateThread(i: int) {.noinline.} =
proc activateWorkerThread(i: int) {.noinline.} =
workersData[i].taskArrived = createSemaphore()
workersData[i].taskStarted = createSemaphore()
workersData[i].initialized = true
@@ -323,10 +343,19 @@ proc activateThread(i: int) {.noinline.} =
initLock(workersData[i].q.lock)
createThread(workers[i], slave, addr(workersData[i]))
proc activateDistinguishedThread(i: int) {.noinline.} =
distinguishedData[i].taskArrived = createSemaphore()
distinguishedData[i].taskStarted = createSemaphore()
distinguishedData[i].initialized = true
distinguishedData[i].q.empty = createSemaphore()
initLock(distinguishedData[i].q.lock)
distinguishedData[i].readyForTask = createSemaphore()
createThread(distinguished[i], distinguishedSlave, addr(distinguishedData[i]))
proc setup() =
currentPoolSize = min(countProcessors(), MaxThreadPoolSize)
readyWorker = addr(workersData[0])
for i in 0.. <currentPoolSize: activateThread(i)
for i in 0.. <currentPoolSize: activateWorkerThread(i)
proc preferSpawn*(): bool =
## Use this proc to determine quickly if a 'spawn' or a direct call is
@@ -340,6 +369,13 @@ proc spawn*(call: expr): expr {.magic: "Spawn".}
## is gcsafe and has a return type that is either 'void' or compatible
## with ``FlowVar[T]``.
proc pinnedSpawn*(id: ThreadId; call: expr): expr {.magic: "Spawn".}
## always spawns a new task on the worker thread with ``id``, so that
## the 'call' is **always** executed on
## the this thread. 'call' has to be proc call 'p(...)' where 'p'
## is gcsafe and has a return type that is either 'void' or compatible
## with ``FlowVar[T]``.
template spawnX*(call: expr): expr =
## spawns a new task if a CPU core is ready, otherwise executes the
## call in the calling thread. Usually it is advised to
@@ -360,7 +396,7 @@ var
initLock stateLock
proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
proc nimSpawn3(fn: WorkerProc; data: pointer) {.compilerProc.} =
# implementation of 'spawn' that is used by the code generator.
while true:
if selectWorker(readyWorker, fn, data): return
@@ -377,7 +413,7 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
of doCreateThread:
if currentPoolSize < maxPoolSize:
if not workersData[currentPoolSize].initialized:
activateThread(currentPoolSize)
activateWorkerThread(currentPoolSize)
let w = addr(workersData[currentPoolSize])
atomicInc currentPoolSize
if selectWorker(w, fn, data):
@@ -394,6 +430,21 @@ proc nimSpawn(fn: WorkerProc; data: pointer) {.compilerProc.} =
# other thread succeeded, so we don't need to do anything here.
await(gSomeReady)
var
distinguishedLock: TLock
initLock distinguishedLock
proc nimSpawn4(fn: WorkerProc; data: pointer; id: ThreadId) {.compilerProc.} =
acquire(distinguishedLock)
if not distinguishedData[id].initialized:
activateDistinguishedThread(id)
release(distinguishedLock)
while true:
if selectWorker(addr(distinguishedData[id]), fn, data): break
await(distinguishedData[id].readyForTask)
proc sync*() =
## a simple barrier to wait for all spawn'ed tasks. If you need more elaborate
## waiting, you have to use an explicit barrier.

View File

@@ -11,11 +11,11 @@
import tables, os, unsigned, hashes
when defined(linux):
when defined(linux):
import posix, epoll
elif defined(windows):
elif defined(windows):
import winlean
else:
else:
import posix
proc hash*(x: SocketHandle): Hash {.borrow.}
@@ -25,19 +25,20 @@ type
Event* = enum
EvRead, EvWrite, EvError
SelectorKey* = ref object
SelectorKey* = object
fd*: SocketHandle
events*: set[Event] ## The events which ``fd`` listens for.
data*: RootRef ## User object.
data*: pointer ## User object.
ReadyInfo* = tuple[key: SelectorKey, events: set[Event]]
when defined(nimdoc):
type
Selector* = ref object
## An object which holds file descriptors to be checked for read/write
## status.
fds: Table[SocketHandle, SelectorKey]
proc register*(s: Selector, fd: SocketHandle, events: set[Event],
data: RootRef): SelectorKey {.discardable.} =
@@ -57,7 +58,7 @@ when defined(nimdoc):
proc select*(s: Selector, timeout: int): seq[ReadyInfo] =
## The ``events`` field of the returned ``key`` contains the original events
## for which the ``fd`` was bound. This is contrary to the ``events`` field
## of the ``TReadyInfo`` tuple which determines which events are ready
## of the ``ReadyInfo`` tuple which determines which events are ready
## on the ``fd``.
proc newSelector*(): Selector =
@@ -72,11 +73,11 @@ when defined(nimdoc):
elif defined(linux):
type
Selector* = ref object
Selector* = object
epollFD: cint
events: array[64, epoll_event]
fds: Table[SocketHandle, SelectorKey]
fds: SharedTable[SocketHandle, SelectorKey]
proc createEventStruct(events: set[Event], fd: SocketHandle): epoll_event =
if EvRead in events:
result.events = EPOLLIN
@@ -84,21 +85,19 @@ elif defined(linux):
result.events = result.events or EPOLLOUT
result.events = result.events or EPOLLRDHUP
result.data.fd = fd.cint
proc register*(s: Selector, fd: SocketHandle, events: set[Event],
data: RootRef): SelectorKey {.discardable.} =
data: pointer) =
var event = createEventStruct(events, fd)
if events != {}:
if epoll_ctl(s.epollFD, EPOLL_CTL_ADD, fd, addr(event)) != 0:
raiseOSError(osLastError())
var key = SelectorKey(fd: fd, events: events, data: data)
s.fds[fd] = key
result = key
proc update*(s: Selector, fd: SocketHandle,
events: set[Event]): SelectorKey {.discardable.} =
proc update*(s: var Selector, fd: SocketHandle, events: set[Event]) =
if s.fds[fd].events != events:
if events == {}:
# This fd is idle -- it should not be registered to epoll.
@@ -108,7 +107,7 @@ elif defined(linux):
# are therefore constantly ready. (leading to 100% CPU usage).
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
raiseOSError(osLastError())
s.fds[fd].events = events
s.fds.mget(fd).events = events
else:
var event = createEventStruct(events, fd)
if s.fds[fd].events == {}:
@@ -119,22 +118,20 @@ elif defined(linux):
else:
if epoll_ctl(s.epollFD, EPOLL_CTL_MOD, fd, addr(event)) != 0:
raiseOSError(osLastError())
s.fds[fd].events = events
result = s.fds[fd]
proc unregister*(s: Selector, fd: SocketHandle): SelectorKey {.discardable.} =
s.fds.mget(fd).events = events
proc unregister*(s: var Selector, fd: SocketHandle) =
if epoll_ctl(s.epollFD, EPOLL_CTL_DEL, fd, nil) != 0:
let err = osLastError()
if err.cint notin {ENOENT, EBADF}: # TODO: Why do we sometimes get an EBADF? Is this normal?
if err.cint notin {ENOENT, EBADF}:
# TODO: Why do we sometimes get an EBADF? Is this normal?
raiseOSError(err)
result = s.fds[fd]
s.fds.del(fd)
proc close*(s: Selector) =
proc close*(s: var Selector) =
deinitSharedTable(s.fds)
if s.epollFD.close() != 0: raiseOSError(osLastError())
dealloc(addr s.events) # TODO: Test this
proc epollHasFd(s: Selector, fd: SocketHandle): bool =
result = true
var event = createEventStruct(s.fds[fd].events, fd)
@@ -142,9 +139,9 @@ elif defined(linux):
let err = osLastError()
if err.cint in {ENOENT, EBADF}:
return false
raiseOSError(osLastError())
proc select*(s: Selector, timeout: int): seq[ReadyInfo] =
raiseOSError(err)
proc select*(s: var Selector, timeout: int): seq[ReadyInfo] =
##
## The ``events`` field of the returned ``key`` contains the original events
## for which the ``fd`` was bound. This is contrary to the ``events`` field
@@ -156,11 +153,11 @@ elif defined(linux):
let err = osLastError()
if err.cint == EINTR:
return @[]
raiseOSError(osLastError())
raiseOSError(err)
if evNum == 0: return @[]
for i in 0 .. <evNum:
let fd = s.events[i].data.fd.SocketHandle
var evSet: set[Event] = {}
if (s.events[i].events and EPOLLERR) != 0 or (s.events[i].events and EPOLLHUP) != 0: evSet = evSet + {EvError}
if (s.events[i].events and EPOLLIN) != 0: evSet = evSet + {EvRead}
@@ -170,14 +167,12 @@ elif defined(linux):
result.add((selectorKey, evSet))
#echo("Epoll: ", result[i].key.fd, " ", result[i].events, " ", result[i].key.events)
proc newSelector*(): Selector =
new result
result.epollFD = epoll_create(64)
#result.events = cast[array[64, epoll_event]](alloc0(sizeof(epoll_event)*64))
result.fds = initTable[SocketHandle, SelectorKey]()
if result.epollFD < 0:
raiseOSError(osLastError())
result.fds = initSharedTable[SocketHandle, SelectorKey]()
proc contains*(s: Selector, fd: SocketHandle): bool =
## Determines whether selector contains a file descriptor.
@@ -196,31 +191,27 @@ elif defined(linux):
elif not defined(nimdoc):
# TODO: kqueue for bsd/mac os x.
import sharedtables
type
Selector* = ref object
fds: Table[SocketHandle, SelectorKey]
Selector* = object
fds: SharedTable[SocketHandle, SelectorKey]
proc register*(s: Selector, fd: SocketHandle, events: set[Event],
data: RootRef): SelectorKey {.discardable.} =
if s.fds.hasKey(fd):
proc register*(s: var Selector, fd: SocketHandle, events: set[Event],
data: pointer) =
let result = SelectorKey(fd: fd, events: events, data: data)
if s.fds.hasKeyOrPut(fd, result):
raise newException(ValueError, "File descriptor already exists.")
var sk = SelectorKey(fd: fd, events: events, data: data)
s.fds[fd] = sk
result = sk
proc update*(s: Selector, fd: SocketHandle,
events: set[Event]): SelectorKey {.discardable.} =
if not s.fds.hasKey(fd):
raise newException(ValueError, "File descriptor not found.")
proc update*(s: var Selector, fd: SocketHandle, events: set[Event]) =
#if not s.fds.hasKey(fd):
# raise newException(ValueError, "File descriptor not found.")
s.fds.mget(fd).events = events
s.fds[fd].events = events
result = s.fds[fd]
proc unregister*(s: Selector, fd: SocketHandle): SelectorKey {.discardable.} =
result = s.fds[fd]
proc unregister*(s: var Selector, fd: SocketHandle) =
s.fds.del(fd)
proc close*(s: Selector) = discard
proc close*(s: var Selector) = deinitSharedTable(s.fds)
proc timeValFromMilliseconds(timeout: int): TimeVal =
if timeout != -1:
@@ -228,19 +219,19 @@ elif not defined(nimdoc):
result.tv_sec = seconds.int32
result.tv_usec = ((timeout - seconds * 1000) * 1000).int32
proc createFdSet(rd, wr: var TFdSet, fds: Table[SocketHandle, SelectorKey],
proc createFdSet(rd, wr: var TFdSet, fds: SharedTable[SocketHandle, SelectorKey],
m: var int) =
FD_ZERO(rd); FD_ZERO(wr)
for k, v in pairs(fds):
if EvRead in v.events:
if EvRead in v.events:
m = max(m, int(k))
FD_SET(k, rd)
if EvWrite in v.events:
m = max(m, int(k))
FD_SET(k, wr)
proc getReadyFDs(rd, wr: var TFdSet, fds: Table[SocketHandle, SelectorKey]):
seq[ReadyInfo] =
proc getReadyFDs(rd, wr: var TFdSet,
fds: SharedTable[SocketHandle, SelectorKey]): seq[ReadyInfo] =
result = @[]
for k, v in pairs(fds):
var events: set[Event] = {}
@@ -250,20 +241,20 @@ elif not defined(nimdoc):
events = events + {EvWrite}
result.add((v, events))
proc select(fds: Table[SocketHandle, SelectorKey], timeout = 500):
seq[ReadyInfo] =
proc select(fds: var SharedTable[SocketHandle, SelectorKey],
timeout = 500): seq[ReadyInfo] =
var tv {.noInit.}: TimeVal = timeValFromMilliseconds(timeout)
var rd, wr: TFdSet
var m = 0
createFdSet(rd, wr, fds, m)
var retCode = 0
if timeout != -1:
retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, addr(tv)))
else:
retCode = int(select(cint(m+1), addr(rd), addr(wr), nil, nil))
if retCode < 0:
raiseOSError(osLastError())
elif retCode == 0:
@@ -275,8 +266,7 @@ elif not defined(nimdoc):
result = select(s.fds, timeout)
proc newSelector*(): Selector =
new result
result.fds = initTable[SocketHandle, SelectorKey]()
result.fds = initSharedTable[SocketHandle, SelectorKey]()
proc contains*(s: Selector, fd: SocketHandle): bool =
return s.fds.hasKey(fd)
@@ -300,17 +290,17 @@ when not defined(testing) and isMainModule and not defined(nimdoc):
# Select()
import sockets
type
SockWrapper = ref object of RootObj
SockWrapper = object
sock: Socket
var sock = socket()
if sock == sockets.invalidSocket: raiseOSError(osLastError())
#sock.setBlocking(false)
sock.connect("irc.freenode.net", Port(6667))
var selector = newSelector()
var data = SockWrapper(sock: sock)
let key = selector.register(sock.getFD, {EvWrite}, data)
let key = selector.register(sock.getFD, {EvWrite}, addr data)
var i = 0
while true:
let ready = selector.select(1000)