* * * * *
Timing out Lua coroutines
Time for “that's for another post [1]”—this time, handling timeouts in Lua
[2] coroutines.
So, I have this Lua coroutine running, and I want to make a DNS (Domain Name
Service) request:
> host = dns.request("www.conman.org.","a")
>
It takes a non-trivial amount of time—time that could be used to run other
coroutines. But the operation could take longer than expected (say, two
seconds, one second longer than we want to wait) or never even complete
(because we lost the request packet, or the reply packet). In that case, we
want to timeout the operation.
Step one—let's set a limit to how long we can run:
> timeout(1) -- time out in one second
> host,err = dns.request("www.conman.org.","a") -- DNS query
> timeout(0) -- no longer need the timeout
>
The trick now is to implement the code behind timeout(). First we need to
have some way of storing the coroutines (technically, a reference to the
coroutine) that are waiting to timeout, and some easy way to determine which
ones are timed out. For this, I used a binary heap [3], or techically, a
“binary min heap” to store the coroutine reference. The “min” variation
because the nodes are ordered by the “awake” time, and times in the future
are larger (and thus, the next coroutine that will timeout next will always
appear in the first spot in the binary min heap).
Along with the awake value and the coroutine reference, I have a trigger
flag. This flag is important for the “cancelling a timeout” case. An earlier
version of the code searched sequentially through the list, making
“cancelling” an expensive operation (O(n) [4]) compared to “setting” a
timeout (O(log(n))). I decided it was easier to just have a flag, and keep a
secondary index, keyed by coroutine, to the correct node in the binary min
tree. That way, cancelling a timeout is cheap (an amortized O(1)) with the
following code:
> function timeout(when)
> local co = coroutine.running()
>
> if when == 0 then
> if TQUEUE[co] then -- guard to make sure we have an entry
> TQUEUE[co].trigger = false -- disarm timeout trigger
> end
> else
> TQUEUE:insert(when,co) -- otherwise, set the timeout
> end
> end
>
(I should note that the code I presented last Thursday was buggy [5]—I
realized I wasn't keeping the invariant condition necessary for a binary min
heap (child nodes have a larger value than the parent node) by setting the
awake field to 0 (a child would then have a smaller value than its parent)—it
didn't break the code but it did make it behave a bit oddly)
I also maintain a run queue—a list of coroutines that are ready to run, used
in the main loop:
> function mainloop()
> local timeout = -1 -- the timeout to wait for new packets
> local now = clock.get()
>
> while #TQUEUE > 0 do -- check timeout queue
> local obj = TQUEUE[1] -- next coroutine to timeout
>
> if obj.trigger then -- do we trigger?
> timeout = obj.awake - now -- if so, when
> if timeout > 0 then -- if now now (or before)
> break -- stop looking through the timeout queue
> end
>
> table.insert(RQUEUE,obj) -- insert coroutine into the run queue
> end
>
> TQUEUE:delete() -- delete the entry from the timeout queue
> end
>
> if #RQUEUE > 0 then -- if there's anything in the run queue
> timeout = 0 -- we set our timeout
> end
>
> -- ---------------------------------------------------
> -- Receive packets and process---see this post [6] for some details; the actual
> -- code this isn't
> -- ---------------------------------------------------
>
> for _,event in ipairs(poll:events(timeout)) do
> event.obj(event)
> end
>
> -- -----------------------------------------------------------
> -- run each coroutine in the run queue (some details snipped)
> -- -----------------------------------------------------------
>
> while #RQUEUE > 0 do
> local obj = table.remove(RQUEUE,1)
> if coroutine.status(obj.co) == 'suspended' then
> coroutine.resume(obj.co)
> end
> end
>
> return mainloop()
> end
>
The code initially sets the timeout to wait for network activity to -1, or
“wait indefinitely.” It then runs through the timeout queue, checking each
item to see if there's a coroutine that's triggered to run. If it comes
across a coroutine that has been “disarmed” then it's simply ignored and
deleted from the timeout queue. Otherwise, it checks to see if the awake time
is still in the future and if so, it's done with the timeout queue;
otherwise, it moves the coroutine reference to the run queue and deletes it
from the timeout queue.
Eventually, it'll get out of tha loop with a timeout of “indefinite” (if
there were no coroutines waiting for a timeout) or some fixed amount of time
until the next coroutine times out. It then checks the run queue, and if
there is anything in that list, it sets the network activity timeout to 0 (if
there is no packets, return immedately).
It then checks for network packets and for each one, process the packet
(which in turn might schedule more coroutines to run). After that, it then
goes through the run queue, resuming each coroutine in the list. After that,
it starts over again.
[1]
gopher://gopher.conman.org/0Phlog:2015/02/12.1
[2]
http://www.lua.org/
[3]
http://en.wikipedia.org/wiki/Binary_heap
[4]
http://en.wikipedia.org/wiki/Big_O_notation
[5]
gopher://gopher.conman.org/0Phlog:2015/02/12.1
[6]
gopher://gopher.conman.org/0Phlog:2015/02/05.1
Email author at
[email protected]