Nathaniel Wesley Filardo dcc1ea2a49 A generic fifo and fifosock wrapper, under telnet and http server (#2650)
* lua_modules/fifo: a generic queue & socket wrapper

One occasionally wants a generic fifo, so here's a plausible
implementation that's reasonably flexible in its usage.

One possible consumer of this is a variant of TerryE's two-level fifo
trick currently in the telnetd example.  Factor that out to fifosock for
more general use.

* lua_examples/telnet: use factored out fifosock

* lua_modules/http: improve implementation

Switch to fifosock for in-order sending and waiting for everything to be
sent before closing.

Fix header callback by moving the invocation of the handler higher

* fifosock: optimistically cork and delay tx

If we just pushed a little bit of data into a fifosock that had idled,
wait a tick (1 ms) before transmitting.  Hopefully, this means that
we let the rest of the system push more data in before we send the first
packet.  But in a high-throughput situation, where we are streaming data
without idling the fifo, there won't be any additional delay and we'll
coalesce during operation as usual.

The fifosocktest mocks up enough of tmr for this to run, but assumes
an arbitrarily slow processor. ;)
2019-02-16 13:51:40 +01:00

135 lines
4.1 KiB
Lua

-- Wrap a two-staged fifo around a socket's send; see
-- docs/lua-modules/fifosock.lua for more documentation.
--
-- See fifosocktest.lua for some examples of use or tricky cases.
--
-- Our fifos can take functions; these can be useful for either lazy
-- generators or callbacks for parts of the stream having been sent.
local BIGTHRESH = 256 -- how big is a "big" string?
local SPLITSLOP = 16 -- any slop in the big question?
local FSMALLLIM = 32 -- maximum number of small strings held
local COALIMIT = 3
local concat = table.concat
local insert = table.insert
local gc = collectgarbage
local fifo = require "fifo"
return function(sock)
-- the two fifos
local fsmall, lsmall, fbig = {}, 0, fifo.new()
-- ssend last aggregation string and aggregate count
local ssla, sslan = nil, 0
local ssend = function(s,islast)
local ns = nil
-- Optimistically, try coalescing FIFO dequeues. But, don't try to
-- coalesce function outputs, since functions might be staging their
-- execution on the send event implied by being called.
if type(s) == "function" then
if sslan ~= 0 then
sock:send(ssla)
ssla, sslan = nil, 0; gc()
return s, false -- stay as is and wait for :on("sent")
end
s, ns = s()
elseif type(s) == "string" and sslan < COALIMIT then
if sslan == 0
then ssla, sslan = s, 1
else ssla, sslan = ssla .. s, sslan + 1
end
if islast then
-- this is shipping; if there's room, steal the small fifo, too
if sslan < COALIMIT then
sock:send(ssla .. concat(fsmall))
fsmall, lsmall = {}, 0
else
sock:send(ssla)
end
ssla, sslan = "", 0; gc()
return nil, false
else
return nil, true
end
end
-- Either that was a function or we've hit our coalescing limit or
-- we didn't ship above. Ship now, if there's something to ship.
if s ~= nil then
if sslan == 0 then sock:send(s) else sock:send(ssla .. s) end
ssla, sslan = nil, 0; gc()
return ns or nil, false
elseif sslan ~= 0 then
assert (ns == nil)
sock:send(ssla)
ssla, sslan = nil, 0; gc()
return nil, false
else
assert (ns == nil)
return nil, true
end
end
-- Move fsmall to fbig; might send if fbig empty
local function promote(f)
if #fsmall == 0 then return end
local str = concat(fsmall)
fsmall, lsmall = {}, 0
fbig:queue(str, f or ssend)
end
local function sendnext()
if not fbig:dequeue(ssend) then promote() end
end
sock:on("sent", sendnext)
return function(s)
-- don't sweat the petty things
if s == nil or s == "" then return end
-- Function? Go ahead and queue this thing in the right place.
if type(s) == "function" then promote(); fbig:queue(s, ssend); return; end
s = tostring(s)
-- cork sending until the end in case we're the head of line
local corked = false
local function corker(t) corked = true; return t end
-- small fifo would overfill? promote it
if lsmall + #s > BIGTHRESH or #fsmall >= FSMALLLIM then promote(corker) end
-- big string? chunk and queue big components immediately
-- behind any promotion that just took place
while #s > BIGTHRESH + SPLITSLOP do
local pfx
pfx, s = s:sub(1,BIGTHRESH), s:sub(BIGTHRESH+1)
fbig:queue(pfx, corker)
end
-- Big string? queue and maybe tx now
if #s > BIGTHRESH then fbig:queue(s, corker)
-- small and fifo in immediate dequeue mode
elseif fbig._go and lsmall == 0 then fbig:queue(s, corker)
-- small and queue already moving; let it linger in the small fifo
else insert(fsmall, s) ; lsmall = lsmall + #s
end
-- if it happened that we corked the transmission above...
-- if we queued a good amount of data, go ahead and start transmitting;
-- otherwise, wait a tick and hopefully we will queue more in the interim
-- before transmitting.
if corked then
if #fbig <= COALIMIT
then tmr.create():alarm(1, tmr.ALARM_SINGLE, sendnext)
else sendnext()
end
end
end
end