proof that refcounting can handle Nim's async (#12533)

This commit is contained in:
Andreas Rumpf
2019-10-28 16:13:38 +01:00
committed by GitHub
parent 160c07be00
commit c52a2c3ab0
3 changed files with 99 additions and 19 deletions

View File

@@ -263,7 +263,7 @@ proc liftIterSym*(g: ModuleGraph; n: PNode; owner: PSym): PNode =
proc freshVarForClosureIter*(g: ModuleGraph; s, owner: PSym): PNode =
let envParam = getHiddenParam(g, owner)
let obj = envParam.typ.skipTypes({tyOwned, tyRef})
let obj = envParam.typ.skipTypes({tyOwned, tyRef, tyPtr})
addField(obj, s, g.cache)
var access = newSymNode(envParam)
@@ -335,16 +335,16 @@ proc getEnvTypeForOwnerUp(c: var DetectionPass; owner: PSym;
info: TLineInfo): PType =
var r = c.getEnvTypeForOwner(owner, info)
result = newType(tyPtr, owner)
rawAddSon(result, r.skipTypes({tyOwned, tyRef}))
rawAddSon(result, r.skipTypes({tyOwned, tyRef, tyPtr}))
proc createUpField(c: var DetectionPass; dest, dep: PSym; info: TLineInfo) =
let refObj = c.getEnvTypeForOwner(dest, info) # getHiddenParam(dest).typ
let obj = refObj.skipTypes({tyOwned, tyRef})
let obj = refObj.skipTypes({tyOwned, tyRef, tyPtr})
# The assumption here is that gcDestructors means we cannot deal
# with cycles properly, so it's better to produce a weak ref (=ptr) here.
# This seems to be generally correct but since it's a bit risky it's disabled
# for now.
let fieldType = if false: # optSeqDestructors in c.graph.config.globalOptions:
let fieldType = if isDefined(c.graph.config, "nimCycleBreaker"):
c.getEnvTypeForOwnerUp(dep, info) #getHiddenParam(dep).typ
else:
c.getEnvTypeForOwner(dep, info)
@@ -354,7 +354,7 @@ proc createUpField(c: var DetectionPass; dest, dep: PSym; info: TLineInfo) =
let upIdent = getIdent(c.graph.cache, upName)
let upField = lookupInRecord(obj.n, upIdent)
if upField != nil:
if upField.typ.skipTypes({tyOwned, tyRef}) != fieldType.skipTypes({tyOwned, tyRef}):
if upField.typ.skipTypes({tyOwned, tyRef, tyPtr}) != fieldType.skipTypes({tyOwned, tyRef, tyPtr}):
localError(c.graph.config, dep.info, "internal error: up references do not agree")
else:
let result = newSym(skField, upIdent, obj.owner, obj.owner.info)
@@ -425,8 +425,8 @@ proc detectCapturedVars(n: PNode; owner: PSym; c: var DetectionPass) =
addClosureParam(c, owner, n.info)
if interestingIterVar(s):
if not c.capturedVars.containsOrIncl(s.id):
let obj = getHiddenParam(c.graph, owner).typ.skipTypes({tyOwned, tyRef})
#let obj = c.getEnvTypeForOwner(s.owner).skipTypes({tyOwned, tyRef})
let obj = getHiddenParam(c.graph, owner).typ.skipTypes({tyOwned, tyRef, tyPtr})
#let obj = c.getEnvTypeForOwner(s.owner).skipTypes({tyOwned, tyRef, tyPtr})
if s.name.id == getIdent(c.graph.cache, ":state").id:
obj.n[0].sym.id = -s.id
@@ -451,8 +451,8 @@ proc detectCapturedVars(n: PNode; owner: PSym; c: var DetectionPass) =
#echo "capturing ", n.info
# variable 's' is actually captured:
if interestingVar(s) and not c.capturedVars.containsOrIncl(s.id):
let obj = c.getEnvTypeForOwner(ow, n.info).skipTypes({tyOwned, tyRef})
#getHiddenParam(owner).typ.skipTypes({tyOwned, tyRef})
let obj = c.getEnvTypeForOwner(ow, n.info).skipTypes({tyOwned, tyRef, tyPtr})
#getHiddenParam(owner).typ.skipTypes({tyOwned, tyRef, tyPtr})
addField(obj, s, c.graph.cache)
# create required upFields:
var w = owner.skipGenericOwner
@@ -552,7 +552,7 @@ proc getUpViaParam(g: ModuleGraph; owner: PSym): PNode =
let p = getHiddenParam(g, owner)
result = p.newSymNode
if owner.isIterator:
let upField = lookupInRecord(p.typ.skipTypes({tyOwned, tyRef}).n, getIdent(g.cache, upName))
let upField = lookupInRecord(p.typ.skipTypes({tyOwned, tyRef, tyPtr}).n, getIdent(g.cache, upName))
if upField == nil:
localError(g.config, owner.info, "could not find up reference for closure iter")
else:
@@ -595,10 +595,10 @@ proc rawClosureCreation(owner: PSym;
# add ``env.param = param``
result.add(newAsgnStmt(fieldAccess, newSymNode(local), env.info))
let upField = lookupInRecord(env.typ.skipTypes({tyOwned, tyRef}).n, getIdent(d.graph.cache, upName))
let upField = lookupInRecord(env.typ.skipTypes({tyOwned, tyRef, tyPtr}).n, getIdent(d.graph.cache, upName))
if upField != nil:
let up = getUpViaParam(d.graph, owner)
if up != nil and upField.typ.skipTypes({tyOwned, tyRef}) == up.typ.skipTypes({tyOwned, tyRef}):
if up != nil and upField.typ.skipTypes({tyOwned, tyRef, tyPtr}) == up.typ.skipTypes({tyOwned, tyRef, tyPtr}):
result.add(newAsgnStmt(rawIndirectAccess(env, upField, env.info),
up, env.info))
#elif oldenv != nil and oldenv.typ == upField.typ:
@@ -629,7 +629,7 @@ proc closureCreationForIter(iter: PNode;
var vnode: PNode
if owner.isIterator:
let it = getHiddenParam(d.graph, owner)
addUniqueField(it.typ.skipTypes({tyOwned, tyRef}), v, d.graph.cache)
addUniqueField(it.typ.skipTypes({tyOwned, tyRef, tyPtr}), v, d.graph.cache)
vnode = indirectAccess(newSymNode(it), v, v.info)
else:
vnode = v.newSymNode
@@ -640,10 +640,10 @@ proc closureCreationForIter(iter: PNode;
if optOwnedRefs in d.graph.config.globalOptions:
createTypeBoundOps(d.graph, nil, vnode.typ, iter.info)
let upField = lookupInRecord(v.typ.skipTypes({tyOwned, tyRef}).n, getIdent(d.graph.cache, upName))
let upField = lookupInRecord(v.typ.skipTypes({tyOwned, tyRef, tyPtr}).n, getIdent(d.graph.cache, upName))
if upField != nil:
let u = setupEnvVar(owner, d, c, iter.info)
if u.typ.skipTypes({tyOwned, tyRef}) == upField.typ.skipTypes({tyOwned, tyRef}):
if u.typ.skipTypes({tyOwned, tyRef, tyPtr}) == upField.typ.skipTypes({tyOwned, tyRef, tyPtr}):
result.add(newAsgnStmt(rawIndirectAccess(vnode, upField, iter.info),
u, iter.info))
else:
@@ -655,7 +655,7 @@ proc accessViaEnvVar(n: PNode; owner: PSym; d: DetectionPass;
var access = setupEnvVar(owner, d, c, n.info)
if optOwnedRefs in d.graph.config.globalOptions:
access = c.unownedEnvVars[owner.id]
let obj = access.typ.skipTypes({tyOwned, tyRef})
let obj = access.typ.skipTypes({tyOwned, tyRef, tyPtr})
let field = getFieldFromObj(obj, n.sym)
if field != nil:
result = rawIndirectAccess(access, field, n.info)
@@ -664,7 +664,7 @@ proc accessViaEnvVar(n: PNode; owner: PSym; d: DetectionPass;
result = n
proc getStateField*(g: ModuleGraph; owner: PSym): PSym =
getHiddenParam(g, owner).typ.skipTypes({tyOwned, tyRef}).n.sons[0].sym
getHiddenParam(g, owner).typ.skipTypes({tyOwned, tyRef, tyPtr}).n.sons[0].sym
proc liftCapturedVars(n: PNode; owner: PSym; d: DetectionPass;
c: var LiftingPass): PNode
@@ -689,7 +689,7 @@ proc symToClosure(n: PNode; owner: PSym; d: DetectionPass;
while true:
if access.typ == wanted:
return makeClosure(d.graph, s, access, n.info)
let obj = access.typ.skipTypes({tyOwned, tyRef})
let obj = access.typ.skipTypes({tyOwned, tyRef, tyPtr})
let upField = lookupInRecord(obj.n, getIdent(d.graph.cache, upName))
if upField == nil:
localError(d.graph.config, n.info, "internal error: no environment found")

View File

@@ -16,7 +16,8 @@
const
CycleIncrease = 2 # is a multiplicative increase
InitialCycleThreshold = 4*1024*1024 # X MB because cycle checking is slow
InitialCycleThreshold = when defined(nimCycleBreaker): high(int)
else: 4*1024*1024 # X MB because cycle checking is slow
InitialZctThreshold = 500 # we collect garbage if the ZCT's size
# reaches this threshold
# this seems to be a good value
@@ -792,6 +793,12 @@ proc collectCT(gch: var GcHeap) =
collectCTBody(gch)
gch.zctThreshold = max(InitialZctThreshold, gch.zct.len * CycleIncrease)
proc GC_collectZct*() =
## Collect the ZCT (zero count table). Unstable, experimental API for
## testing purposes.
## DO NOT USE!
collectCTBody(gch)
when withRealTime:
proc toNano(x: int): Nanos {.inline.} =
result = x * 1000

View File

@@ -0,0 +1,73 @@
discard """
output: "50000"
cmd: "nim c -d:nimTypeNames -d:nimCycleBreaker $file"
"""
import asyncdispatch, asyncnet, nativesockets, net, strutils, os
var msgCount = 0
const
swarmSize = 500
messagesToSend = 100
var clientCount = 0
proc sendMessages(client: AsyncFD) {.async.} =
for i in 0 ..< messagesToSend:
await send(client, "Message " & $i & "\c\L")
proc launchSwarm(port: Port) {.async.} =
for i in 0 ..< swarmSize:
var sock = createAsyncNativeSocket()
await connect(sock, "localhost", port)
await sendMessages(sock)
closeSocket(sock)
proc readMessages(client: AsyncFD) {.async.} =
# wrapping the AsyncFd into a AsyncSocket object
var sockObj = newAsyncSocket(client)
var (ipaddr, port) = sockObj.getPeerAddr()
doAssert ipaddr == "127.0.0.1"
(ipaddr, port) = sockObj.getLocalAddr()
doAssert ipaddr == "127.0.0.1"
while true:
var line = await recvLine(sockObj)
if line == "":
closeSocket(client)
clientCount.inc
break
else:
if line.startswith("Message "):
msgCount.inc
else:
doAssert false
proc createServer(port: Port) {.async.} =
var server = createAsyncNativeSocket()
block:
var name: Sockaddr_in
name.sin_family = typeof(name.sin_family)(toInt(AF_INET))
name.sin_port = htons(uint16(port))
name.sin_addr.s_addr = htonl(INADDR_ANY)
if bindAddr(server.SocketHandle, cast[ptr SockAddr](addr(name)),
sizeof(name).Socklen) < 0'i32:
raiseOSError(osLastError())
discard server.SocketHandle.listen()
while true:
asyncCheck readMessages(await accept(server))
asyncCheck createServer(Port(10335))
asyncCheck launchSwarm(Port(10335))
while true:
poll()
GC_collectZct()
let (allocs, deallocs) = getMemCounters()
doAssert allocs < deallocs + 1000
if clientCount == swarmSize: break
assert msgCount == swarmSize * messagesToSend
echo msgCount