mirror of
https://github.com/nim-lang/Nim.git
synced 2026-04-21 14:55:24 +00:00
This commit is contained in:
committed by
Andreas Rumpf
parent
6f13184e40
commit
81f920a4ee
@@ -10,6 +10,7 @@
|
||||
include "system/inclrtl"
|
||||
|
||||
import os, tables, strutils, times, heapqueue, lists, options, asyncstreams
|
||||
import options, math
|
||||
import asyncfutures except callSoon
|
||||
|
||||
import nativesockets, net, deques
|
||||
@@ -157,9 +158,6 @@ export asyncfutures, asyncstreams
|
||||
## ----------------
|
||||
##
|
||||
## * The effect system (``raises: []``) does not work with async procedures.
|
||||
## * Can't await in a ``except`` body
|
||||
## * Forward declarations for async procs are broken,
|
||||
## link includes workaround: https://github.com/nim-lang/Nim/issues/3182.
|
||||
|
||||
# TODO: Check if yielded future is nil and throw a more meaningful exception
|
||||
|
||||
@@ -168,8 +166,10 @@ type
|
||||
timers*: HeapQueue[tuple[finishAt: float, fut: Future[void]]]
|
||||
callbacks*: Deque[proc ()]
|
||||
|
||||
proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} =
|
||||
#Process just part if timers at a step
|
||||
proc processTimers(
|
||||
p: PDispatcherBase, didSomeWork: var bool
|
||||
): Option[int] {.inline.} =
|
||||
# Pop the timers in the order in which they will expire (smaller `finishAt`).
|
||||
var count = p.timers.len
|
||||
let t = epochTime()
|
||||
while count > 0 and t >= p.timers[0].finishAt:
|
||||
@@ -177,22 +177,25 @@ proc processTimers(p: PDispatcherBase; didSomeWork: var bool) {.inline.} =
|
||||
dec count
|
||||
didSomeWork = true
|
||||
|
||||
# Return the number of miliseconds in which the next timer will expire.
|
||||
if p.timers.len == 0: return
|
||||
|
||||
let milisecs = (p.timers[0].finishAt - epochTime()) * 1000
|
||||
return some(ceil(milisecs).int)
|
||||
|
||||
proc processPendingCallbacks(p: PDispatcherBase; didSomeWork: var bool) =
|
||||
while p.callbacks.len > 0:
|
||||
var cb = p.callbacks.popFirst()
|
||||
cb()
|
||||
didSomeWork = true
|
||||
|
||||
proc adjustedTimeout(p: PDispatcherBase, timeout: int): int {.inline.} =
|
||||
# If dispatcher has active timers this proc returns the timeout
|
||||
# of the nearest timer. Returns `timeout` otherwise.
|
||||
result = timeout
|
||||
if p.timers.len > 0:
|
||||
let timerTimeout = p.timers[0].finishAt
|
||||
let curTime = epochTime()
|
||||
if timeout == -1 or (curTime + (timeout / 1000)) > timerTimeout:
|
||||
result = int((timerTimeout - curTime) * 1000)
|
||||
if result < 0: result = 0
|
||||
proc adjustTimeout(pollTimeout: int, nextTimer: Option[int]): int {.inline.} =
|
||||
if nextTimer.isNone():
|
||||
return pollTimeout
|
||||
|
||||
result = nextTimer.get()
|
||||
if pollTimeout == -1: return
|
||||
result = min(pollTimeout, result)
|
||||
|
||||
proc callSoon(cbproc: proc ()) {.gcsafe.}
|
||||
|
||||
@@ -299,7 +302,8 @@ when defined(windows) or defined(nimdoc):
|
||||
"No handles or timers registered in dispatcher.")
|
||||
|
||||
result = false
|
||||
let at = p.adjustedTimeout(timeout)
|
||||
let nextTimer = processTimers(p, result)
|
||||
let at = adjustTimeout(timeout, nextTimer)
|
||||
var llTimeout =
|
||||
if at == -1: winlean.INFINITE
|
||||
else: at.int32
|
||||
@@ -344,7 +348,7 @@ when defined(windows) or defined(nimdoc):
|
||||
else: raiseOSError(errCode)
|
||||
|
||||
# Timer processing.
|
||||
processTimers(p, result)
|
||||
discard processTimers(p, result)
|
||||
# Callback queue processing
|
||||
processPendingCallbacks(p, result)
|
||||
|
||||
@@ -1231,7 +1235,8 @@ else:
|
||||
|
||||
result = false
|
||||
var keys: array[64, ReadyKey]
|
||||
var count = p.selector.selectInto(p.adjustedTimeout(timeout), keys)
|
||||
let nextTimer = processTimers(p, result)
|
||||
var count = p.selector.selectInto(adjustTimeout(timeout, nextTimer), keys)
|
||||
for i in 0..<count:
|
||||
var custom = false
|
||||
let fd = keys[i].fd
|
||||
@@ -1270,7 +1275,7 @@ else:
|
||||
p.selector.updateHandle(SocketHandle(fd), newEvents)
|
||||
|
||||
# Timer processing.
|
||||
processTimers(p, result)
|
||||
discard processTimers(p, result)
|
||||
# Callback queue processing
|
||||
processPendingCallbacks(p, result)
|
||||
|
||||
|
||||
@@ -1,18 +1,19 @@
|
||||
discard """
|
||||
file: "t7758.nim"
|
||||
exitcode: 0
|
||||
disabled: true
|
||||
"""
|
||||
import asyncdispatch
|
||||
|
||||
proc task() {.async.} =
|
||||
await sleepAsync(1000)
|
||||
await sleepAsync(40)
|
||||
|
||||
when isMainModule:
|
||||
proc main() =
|
||||
var counter = 0
|
||||
var f = task()
|
||||
while not f.finished:
|
||||
inc(counter)
|
||||
poll()
|
||||
poll(10)
|
||||
|
||||
doAssert counter == 2
|
||||
doAssert counter <= 4
|
||||
|
||||
for i in 0 .. 10: main()
|
||||
Reference in New Issue
Block a user