Allow QUEUED reply only if pipelined; don't return status replies from flushPipeline; Rewrite someTests

This commit is contained in:
Jason Livesay
2014-04-17 03:47:44 -07:00
parent ebe174c868
commit 1068022dfb

View File

@@ -58,8 +58,10 @@ proc raiseInvalidReply(expected, got: char) =
"Expected '$1' at the beginning of a status reply got '$2'" %
[$expected, $got])
proc raiseNoOK(status: string) =
if status != "QUEUED" and status != "OK":
proc raiseNoOK(status: string, pipelineEnabled:bool) =
if pipelineEnabled and not (status == "QUEUED" or status == "PIPELINED"):
raise newException(EInvalidReply, "Expected \"QUEUED\" or \"PIPELINED\" got \"$1\"" % status)
elif not pipelineEnabled and status != "OK":
raise newException(EInvalidReply, "Expected \"OK\" got \"$1\"" % status)
template readSocket(r: TRedis, dummyVal:expr): stmt =
@@ -71,7 +73,7 @@ template readSocket(r: TRedis, dummyVal:expr): stmt =
proc parseStatus(r: TRedis, line: string = ""): TRedisStatus =
if r.pipeline.enabled:
return "OK"
return "PIPELINED"
if line == "":
raise newException(ERedis, "Server closed connection prematurely")
@@ -84,14 +86,14 @@ proc parseStatus(r: TRedis, line: string = ""): TRedisStatus =
return line.substr(1) # Strip '+'
proc readStatus(r:TRedis): TRedisStatus =
r.readSocket("OK")
r.readSocket("PIPELINED")
return r.parseStatus(line)
proc parseInteger(r: TRedis, line: string = ""): TRedisInteger =
if r.pipeline.enabled: return -1
if line == "+QUEUED": # inside of multi
return -1
#if line == "+QUEUED": # inside of multi
# return -1
if line == "":
raise newException(ERedis, "Server closed connection prematurely")
@@ -193,7 +195,6 @@ proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList =
r.socket.send(r.pipeline.buffer)
r.pipeline.buffer = ""
var prevState = r.pipeline.enabled
r.pipeline.enabled = false
result = @[]
@@ -201,23 +202,21 @@ proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList =
for i in 0..tot-1:
var ret = r.readNext()
if ret.len == 1 and (ret[0] == "OK" or ret[0] == "QUEUED"):
# Skip acknowledgement replies in multi
if not wasMulti: result.add(ret)
else:
result.add(ret)
for item in ret:
var isOK = item.contains("OK")
if not (item.contains("OK") or item.contains("QUEUED")):
result.add(item)
r.pipeline.expected = 0
r.pipeline.enabled = prevState
proc setPipeline*(r: TRedis, state: bool) =
## Enable or disable command pipelining (reduces network roundtrips).
proc startPipelining*(r: TRedis) =
## Enable command pipelining (reduces network roundtrips).
## Note that when enabled, you must call flushPipeline to actually send commands, except
## for multi/exec() which enable and flush the pipeline automatically.
## Commands return immediately with dummy values; actual results returned from
## flushPipeline() or exec()
r.pipeline.expected = 0
r.pipeline.enabled = state
r.pipeline.enabled = true
proc sendCommand(r: TRedis, cmd: string, args: varargs[string]) =
var request = "*" & $(1 + args.len()) & "\c\L"
@@ -300,7 +299,7 @@ proc rename*(r: TRedis, key, newkey: string): TRedisStatus =
##
## **WARNING:** Overwrites `newkey` if it exists!
r.sendCommand("RENAME", key, newkey)
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc renameNX*(r: TRedis, key, newkey: string): bool =
## Same as ``rename`` but doesn't continue if `newkey` exists.
@@ -372,7 +371,7 @@ proc setk*(r: TRedis, key, value: string) =
##
## NOTE: This function had to be renamed due to a clash with the `set` type.
r.sendCommand("SET", key, value)
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc setNX*(r: TRedis, key, value: string): bool =
## Set the value of a key, only if the key does not exist. Returns `true`
@@ -389,7 +388,7 @@ proc setBit*(r: TRedis, key: string, offset: int,
proc setEx*(r: TRedis, key: string, seconds: int, value: string): TRedisStatus =
## Set the value and expiration of a key
r.sendCommand("SETEX", key, $seconds, value)
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc setRange*(r: TRedis, key: string, offset: int,
value: string): TRedisInteger =
@@ -452,7 +451,7 @@ proc hMSet*(r: TRedis, key: string,
args.add(field)
args.add(value)
r.sendCommand("HMSET", args)
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc hSet*(r: TRedis, key, field, value: string): TRedisInteger =
## Set the string value of a hash field
@@ -546,12 +545,12 @@ proc lRem*(r: TRedis, key: string, value: string, count: int = 0): TRedisInteger
proc lSet*(r: TRedis, key: string, index: int, value: string) =
## Set the value of an element in a list by its index
r.sendCommand("LSET", key, $index, value)
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc lTrim*(r: TRedis, key: string, start, stop: int) =
proc lTrim*(r: TRedis, key: string, start, stop: int) =
## Trim a list to the specified range
r.sendCommand("LTRIM", key, $start, $stop)
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc rPop*(r: TRedis, key: string): TRedisString =
## Remove and get the last element in a list
@@ -829,7 +828,7 @@ proc unsubscribe*(r: TRedis, [channel: openarray[string], : string): ???? =
proc discardMulti*(r: TRedis) =
## Discard all commands issued after MULTI
r.sendCommand("DISCARD")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc exec*(r: TRedis): TRedisList =
## Execute all commands issued after MULTI
@@ -842,26 +841,26 @@ proc exec*(r: TRedis): TRedisList =
proc multi*(r: TRedis) =
## Mark the start of a transaction block
r.setPipeline(true)
r.startPipelining()
r.sendCommand("MULTI")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc unwatch*(r: TRedis) =
## Forget about all watched keys
r.sendCommand("UNWATCH")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc watch*(r: TRedis, key: varargs[string]) =
## Watch the given keys to determine execution of the MULTI/EXEC block
r.sendCommand("WATCH", key)
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
# Connection
proc auth*(r: TRedis, password: string) =
## Authenticate to the server
r.sendCommand("AUTH", password)
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc echoServ*(r: TRedis, message: string): TRedisString =
## Echo the given string
@@ -876,7 +875,7 @@ proc ping*(r: TRedis): TRedisStatus =
proc quit*(r: TRedis) =
## Close the connection
r.sendCommand("QUIT")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc select*(r: TRedis, index: int): TRedisStatus =
## Change the selected database for the current connection
@@ -888,12 +887,12 @@ proc select*(r: TRedis, index: int): TRedisStatus =
proc bgrewriteaof*(r: TRedis) =
## Asynchronously rewrite the append-only file
r.sendCommand("BGREWRITEAOF")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc bgsave*(r: TRedis) =
## Asynchronously save the dataset to disk
r.sendCommand("BGSAVE")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc configGet*(r: TRedis, parameter: string): TRedisList =
## Get the value of a configuration parameter
@@ -903,12 +902,12 @@ proc configGet*(r: TRedis, parameter: string): TRedisList =
proc configSet*(r: TRedis, parameter: string, value: string) =
## Set a configuration parameter to the given value
r.sendCommand("CONFIG", "SET", parameter, value)
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc configResetStat*(r: TRedis) =
## Reset the stats returned by INFO
r.sendCommand("CONFIG", "RESETSTAT")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc dbsize*(r: TRedis): TRedisInteger =
## Return the number of keys in the selected database
@@ -927,12 +926,12 @@ proc debugSegfault*(r: TRedis) =
proc flushall*(r: TRedis): TRedisStatus =
## Remove all keys from all databases
r.sendCommand("FLUSHALL")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc flushdb*(r: TRedis): TRedisStatus =
## Remove all keys from the current database
r.sendCommand("FLUSHDB")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc info*(r: TRedis): TRedisString =
## Get information and statistics about the server
@@ -948,13 +947,13 @@ discard """
proc monitor*(r: TRedis) =
## Listen for all requests received by the server in real time
r.socket.send("MONITOR\c\L")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
"""
proc save*(r: TRedis) =
## Synchronously save the dataset to disk
r.sendCommand("SAVE")
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
proc shutdown*(r: TRedis) =
## Synchronously save the dataset to disk and then shut down the server
@@ -966,7 +965,7 @@ proc shutdown*(r: TRedis) =
proc slaveof*(r: TRedis, host: string, port: string) =
## Make the server a slave of another instance, or promote it as master
r.sendCommand("SLAVEOF", host, port)
raiseNoOK(r.readStatus())
raiseNoOK(r.readStatus(), r.pipeline.enabled)
iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] =
## Iterator for keys and values in a hash.
@@ -979,68 +978,73 @@ iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] =
else:
yield (k, i)
k = ""
proc someTests(r: TRedis) =
#r.auth("pass")
proc someTests(r: TRedis, how: string):seq[string] =
var list:seq[string] = @[]
case how
of "pipelined":
r.startPipelining()
of "multi":
r.multi()
r.setk("nim:test", "Testing something.")
r.setk("nim:utf8", "こんにちは")
r.setk("nim:esc", "\\ths ągt\\")
r.setk("nim:int", "1")
echo(r.get("nim:esc"))
echo(r.incr("nim:int"))
echo r.get("nim:int")
echo r.get("nim:utf8")
echo r.hSet("test1", "name", "A Test")
list.add(r.get("nim:esc"))
list.add($(r.incr("nim:int")))
list.add(r.get("nim:int"))
list.add(r.get("nim:utf8"))
list.add($(r.hSet("test1", "name", "A Test")))
var res = r.hGetAll("test1")
echo repr(r.get("blahasha"))
echo r.randomKey()
discard r.lpush("mylist","itema")
discard r.lpush("mylist","itemb")
for r in res:
list.add(r)
list.add(r.get("invalid_key"))
list.add($(r.lpush("mylist","itema")))
list.add($(r.lpush("mylist","itemb")))
r.ltrim("mylist",0,1)
var p = r.lrange("mylist", 0, -1)
for i in items(p):
if not isNil(i):
echo(" ", i)
echo(r.debugObject("mylist"))
list.add(i)
list.add(r.debugObject("mylist"))
r.configSet("timeout", "299")
for i in items(r.configGet("timeout")): echo ">> ", i
var g = r.configGet("timeout")
for i in items(g):
list.add(i)
echo r.echoServ("BLAH")
when false:
var r = open()
# Test with no pipelining
echo("----------------------------------------------")
echo("Testing without pipelining.")
r.someTests()
list.add(r.echoServ("BLAH"))
# Test with pipelining enabled
echo("//////////////////////////////////////////////")
echo()
echo("Testing with pipelining.")
echo()
r.setPipeline(true)
r.someTests()
var list = r.flushPipeline()
r.setPipeline(false)
echo("-- list length is " & $list.len & " --")
for item in list:
if not isNil(item):
echo item
# Test with multi/exec() (automatic pipelining)
echo("************************************************")
echo("Testing with transaction (automatic pipelining)")
r.multi()
r.someTests()
list = r.exec()
echo("-- list length is " & $list.len & " --")
for item in list:
if not isNil(item):
echo item
case how
of "normal":
return list
of "pipelined":
return r.flushPipeline()
of "multi":
return r.exec()
proc assertListsIdentical(listA, listB: seq[string]) =
assert(listA.len == listB.len)
var i = 0
for item in listA:
assert(item == listB[i])
i = i + 1
when isMainModule:
when false:
var r = open()
# Test with no pipelining
var listNormal = r.someTests("normal")
# Test with pipelining enabled
var listPipelined = r.someTests("pipelined")
assertListsIdentical(listNormal, listPipelined)
# Test with multi/exec() (automatic pipelining)
var listMulti = r.someTests("multi")
assertListsIdentical(listNormal, listMulti)