mirror of
https://github.com/nim-lang/Nim.git
synced 2026-04-19 14:00:35 +00:00
Don't need ref string; use PPipeline instead of ref TPipeline
This commit is contained in:
@@ -19,18 +19,18 @@ import sockets, os, strutils, parseutils
|
||||
const
|
||||
redisNil* = "\0\0"
|
||||
|
||||
type
|
||||
TPipeline = object
|
||||
type
|
||||
PPipeline = ref object
|
||||
enabled: bool
|
||||
buffer: ref string
|
||||
buffer: string
|
||||
expected: int ## number of replies expected if pipelined
|
||||
|
||||
type
|
||||
TRedis* {.pure, final.} = object
|
||||
socket: TSocket
|
||||
connected: bool
|
||||
pipeline: ref TPipeline
|
||||
|
||||
pipeline: PPipeline
|
||||
|
||||
TRedisStatus* = string
|
||||
TRedisInteger* = biggestInt
|
||||
TRedisString* = string ## Bulk reply
|
||||
@@ -39,10 +39,9 @@ type
|
||||
EInvalidReply* = object of ESynch ## Invalid reply from redis
|
||||
ERedis* = object of ESynch ## Error in redis
|
||||
|
||||
proc newPipeline(): ref TPipeLine =
|
||||
proc newPipeline(): PPipeline =
|
||||
new(result)
|
||||
result.buffer = new string
|
||||
result.buffer[] = ""
|
||||
result.buffer = ""
|
||||
result.enabled = false
|
||||
result.expected = 0
|
||||
|
||||
@@ -52,10 +51,10 @@ proc open*(host = "localhost", port = 6379.TPort): TRedis =
|
||||
if result.socket == InvalidSocket:
|
||||
OSError(OSLastError())
|
||||
result.socket.connect(host, port)
|
||||
result.pipeline = newPipeline()
|
||||
result.pipeline = newPipeline()
|
||||
|
||||
proc raiseInvalidReply(expected, got: char) =
|
||||
raise newException(EInvalidReply,
|
||||
raise newException(EInvalidReply,
|
||||
"Expected '$1' at the beginning of a status reply got '$2'" %
|
||||
[$expected, $got])
|
||||
|
||||
@@ -77,12 +76,12 @@ proc parseStatus(r: TRedis, lineIn: string = ""): TRedisStatus =
|
||||
raise newException(ERedis, strip(line))
|
||||
if line[0] != '+':
|
||||
raiseInvalidReply('+', line[0])
|
||||
|
||||
|
||||
return line.substr(1) # Strip '+'
|
||||
|
||||
|
||||
proc parseInteger(r: TRedis, lineIn: string = ""): TRedisInteger =
|
||||
if r.pipeline.enabled: return -1
|
||||
|
||||
|
||||
var line = lineIn
|
||||
if line == "":
|
||||
r.socket.readLine(line)
|
||||
@@ -97,10 +96,10 @@ proc parseInteger(r: TRedis, lineIn: string = ""): TRedisInteger =
|
||||
raise newException(ERedis, strip(line))
|
||||
if line[0] != ':':
|
||||
raiseInvalidReply(':', line[0])
|
||||
|
||||
|
||||
# Strip ':'
|
||||
if parseBiggestInt(line, result, 1) == 0:
|
||||
raise newException(EInvalidReply, "Unable to parse integer.")
|
||||
raise newException(EInvalidReply, "Unable to parse integer.")
|
||||
|
||||
proc recv(sock: TSocket, size: int): TaintedString =
|
||||
result = newString(size).TaintedString
|
||||
@@ -109,19 +108,19 @@ proc recv(sock: TSocket, size: int): TaintedString =
|
||||
|
||||
proc parseSingleString(r: TRedis, line:string, allowMBNil = False): TRedisString =
|
||||
if r.pipeline.enabled: return ""
|
||||
|
||||
|
||||
# Error.
|
||||
if line[0] == '-':
|
||||
raise newException(ERedis, strip(line))
|
||||
|
||||
|
||||
# Some commands return a /bulk/ value or a /multi-bulk/ nil. Odd.
|
||||
if allowMBNil:
|
||||
if line == "*-1":
|
||||
return RedisNil
|
||||
|
||||
|
||||
if line[0] != '$':
|
||||
raiseInvalidReply('$', line[0])
|
||||
|
||||
|
||||
var numBytes = parseInt(line.substr(1))
|
||||
if numBytes == -1:
|
||||
return RedisNil
|
||||
@@ -144,7 +143,7 @@ proc parseArrayLines(r: TRedis, countLine:string): TRedisList =
|
||||
if not isNil(parsed):
|
||||
for item in parsed:
|
||||
result.add(item)
|
||||
|
||||
|
||||
proc parseBulkString(r: TRedis, allowMBNil = False, lineIn:string = ""): TRedisString =
|
||||
if r.pipeline.enabled: return ""
|
||||
|
||||
@@ -158,7 +157,7 @@ proc parseArray(r: TRedis): TRedisList =
|
||||
if r.pipeline.enabled: return @[]
|
||||
var line = TaintedString""
|
||||
r.socket.readLine(line)
|
||||
|
||||
|
||||
return r.parseArrayLines(line)
|
||||
|
||||
proc parseNext(r: TRedis): TRedisList =
|
||||
@@ -172,7 +171,7 @@ proc parseNext(r: TRedis): TRedisList =
|
||||
of ':': @[$(r.parseInteger(line))]
|
||||
of '$': @[r.parseBulkString(true,line)]
|
||||
of '*': r.parseArrayLines(line)
|
||||
else:
|
||||
else:
|
||||
raise newException(EInvalidReply, "parseNext failed on line: " & line)
|
||||
nil
|
||||
r.pipeline.expected -= 1
|
||||
@@ -180,14 +179,14 @@ proc parseNext(r: TRedis): TRedisList =
|
||||
|
||||
proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList =
|
||||
## Send buffered commands, clear buffer, return results
|
||||
if r.pipeline.buffer[].len > 0:
|
||||
r.socket.send(r.pipeline.buffer[])
|
||||
r.pipeline.buffer[] = ""
|
||||
|
||||
if r.pipeline.buffer.len > 0:
|
||||
r.socket.send(r.pipeline.buffer)
|
||||
r.pipeline.buffer = ""
|
||||
|
||||
var prevState = r.pipeline.enabled
|
||||
r.pipeline.enabled = false
|
||||
result = @[]
|
||||
|
||||
|
||||
var tot = r.pipeline.expected
|
||||
|
||||
for i in 0..tot-1:
|
||||
@@ -195,13 +194,13 @@ proc flushPipeline*(r: TRedis, wasMulti = false): TRedisList =
|
||||
if ret.len == 1 and (ret[0] == "OK" or ret[0] == "QUEUED"):
|
||||
# Skip acknowledgement replies in multi
|
||||
if not wasMulti: result.add(ret)
|
||||
else:
|
||||
else:
|
||||
result.add(ret)
|
||||
|
||||
r.pipeline.expected = 0
|
||||
r.pipeline.enabled = prevState
|
||||
|
||||
proc setPipeline*(r: TRedis, state: bool) =
|
||||
proc sePPipeline*(r: TRedis, state: bool) =
|
||||
## Enable or disable command pipelining (reduces network roundtrips).
|
||||
## Note that when enabled, you must call flushPipeline to actually send commands, except
|
||||
## for multi/exec() which enable and flush the pipeline automatically.
|
||||
@@ -217,9 +216,9 @@ proc sendCommand(r: TRedis, cmd: string, args: varargs[string]) =
|
||||
for i in items(args):
|
||||
request.add("$" & $i.len() & "\c\L")
|
||||
request.add(i & "\c\L")
|
||||
|
||||
|
||||
if r.pipeline.enabled:
|
||||
r.pipeline.buffer[].add(request)
|
||||
r.pipeline.buffer.add(request)
|
||||
r.pipeline.expected += 1
|
||||
else:
|
||||
r.socket.send(request)
|
||||
@@ -234,10 +233,10 @@ proc sendCommand(r: TRedis, cmd: string, arg1: string,
|
||||
for i in items(args):
|
||||
request.add("$" & $i.len() & "\c\L")
|
||||
request.add(i & "\c\L")
|
||||
|
||||
|
||||
if r.pipeline.enabled:
|
||||
r.pipeline.expected += 1
|
||||
r.pipeline.buffer[].add(request)
|
||||
r.pipeline.buffer.add(request)
|
||||
else:
|
||||
r.socket.send(request)
|
||||
|
||||
@@ -280,7 +279,7 @@ proc persist*(r: TRedis, key: string): bool =
|
||||
## Returns `true` when the timeout was removed.
|
||||
r.sendCommand("PERSIST", key)
|
||||
return r.parseInteger() == 1
|
||||
|
||||
|
||||
proc randomKey*(r: TRedis): TRedisString =
|
||||
## Return a random key from the keyspace
|
||||
r.sendCommand("RANDOMKEY")
|
||||
@@ -292,7 +291,7 @@ proc rename*(r: TRedis, key, newkey: string): TRedisStatus =
|
||||
## **WARNING:** Overwrites `newkey` if it exists!
|
||||
r.sendCommand("RENAME", key, newkey)
|
||||
raiseNoOK(r.parseStatus())
|
||||
|
||||
|
||||
proc renameNX*(r: TRedis, key, newkey: string): bool =
|
||||
## Same as ``rename`` but doesn't continue if `newkey` exists.
|
||||
## Returns `true` if key was renamed.
|
||||
@@ -303,12 +302,12 @@ proc ttl*(r: TRedis, key: string): TRedisInteger =
|
||||
## Get the time to live for a key
|
||||
r.sendCommand("TTL", key)
|
||||
return r.parseInteger()
|
||||
|
||||
|
||||
proc keyType*(r: TRedis, key: string): TRedisStatus =
|
||||
## Determine the type stored at key
|
||||
r.sendCommand("TYPE", key)
|
||||
return r.parseStatus()
|
||||
|
||||
|
||||
|
||||
# Strings
|
||||
|
||||
@@ -321,12 +320,12 @@ proc decr*(r: TRedis, key: string): TRedisInteger =
|
||||
## Decrement the integer value of a key by one
|
||||
r.sendCommand("DECR", key)
|
||||
return r.parseInteger()
|
||||
|
||||
|
||||
proc decrBy*(r: TRedis, key: string, decrement: int): TRedisInteger =
|
||||
## Decrement the integer value of a key by the given number
|
||||
r.sendCommand("DECRBY", key, $decrement)
|
||||
return r.parseInteger()
|
||||
|
||||
|
||||
proc get*(r: TRedis, key: string): TRedisString =
|
||||
## Get the value of a key. Returns `redisNil` when `key` doesn't exist.
|
||||
r.sendCommand("GET", key)
|
||||
@@ -358,7 +357,7 @@ proc incrBy*(r: TRedis, key: string, increment: int): TRedisInteger =
|
||||
r.sendCommand("INCRBY", key, $increment)
|
||||
return r.parseInteger()
|
||||
|
||||
proc setk*(r: TRedis, key, value: string) =
|
||||
proc setk*(r: TRedis, key, value: string) =
|
||||
## Set the string value of a key.
|
||||
##
|
||||
## NOTE: This function had to be renamed due to a clash with the `set` type.
|
||||
@@ -371,18 +370,18 @@ proc setNX*(r: TRedis, key, value: string): bool =
|
||||
r.sendCommand("SETNX", key, value)
|
||||
return r.parseInteger() == 1
|
||||
|
||||
proc setBit*(r: TRedis, key: string, offset: int,
|
||||
proc setBit*(r: TRedis, key: string, offset: int,
|
||||
value: string): TRedisInteger =
|
||||
## Sets or clears the bit at offset in the string value stored at key
|
||||
r.sendCommand("SETBIT", key, $offset, value)
|
||||
return r.parseInteger()
|
||||
|
||||
|
||||
proc setEx*(r: TRedis, key: string, seconds: int, value: string): TRedisStatus =
|
||||
## Set the value and expiration of a key
|
||||
r.sendCommand("SETEX", key, $seconds, value)
|
||||
raiseNoOK(r.parseStatus())
|
||||
|
||||
proc setRange*(r: TRedis, key: string, offset: int,
|
||||
proc setRange*(r: TRedis, key: string, offset: int,
|
||||
value: string): TRedisInteger =
|
||||
## Overwrite part of a string at key starting at the specified offset
|
||||
r.sendCommand("SETRANGE", key, $offset, value)
|
||||
@@ -435,7 +434,7 @@ proc hMGet*(r: TRedis, key: string, fields: varargs[string]): TRedisList =
|
||||
r.sendCommand("HMGET", key, fields)
|
||||
return r.parseArray()
|
||||
|
||||
proc hMSet*(r: TRedis, key: string,
|
||||
proc hMSet*(r: TRedis, key: string,
|
||||
fieldValues: openarray[tuple[field, value: string]]) =
|
||||
## Set multiple hash fields to multiple values
|
||||
var args = @[key]
|
||||
@@ -449,7 +448,7 @@ proc hSet*(r: TRedis, key, field, value: string): TRedisInteger =
|
||||
## Set the string value of a hash field
|
||||
r.sendCommand("HSET", key, field, value)
|
||||
return r.parseInteger()
|
||||
|
||||
|
||||
proc hSetNX*(r: TRedis, key, field, value: string): TRedisInteger =
|
||||
## Set the value of a hash field, only if the field does **not** exist
|
||||
r.sendCommand("HSETNX", key, field, value)
|
||||
@@ -459,7 +458,7 @@ proc hVals*(r: TRedis, key: string): TRedisList =
|
||||
## Get all the values in a hash
|
||||
r.sendCommand("HVALS", key)
|
||||
return r.parseArray()
|
||||
|
||||
|
||||
# Lists
|
||||
|
||||
proc bLPop*(r: TRedis, keys: varargs[string], timeout: int): TRedisList =
|
||||
@@ -500,7 +499,7 @@ proc lInsert*(r: TRedis, key: string, before: bool, pivot, value: string):
|
||||
var pos = if before: "BEFORE" else: "AFTER"
|
||||
r.sendCommand("LINSERT", key, pos, pivot, value)
|
||||
return r.parseInteger()
|
||||
|
||||
|
||||
proc lLen*(r: TRedis, key: string): TRedisInteger =
|
||||
## Get the length of a list
|
||||
r.sendCommand("LLEN", key)
|
||||
@@ -548,12 +547,12 @@ proc rPop*(r: TRedis, key: string): TRedisString =
|
||||
## Remove and get the last element in a list
|
||||
r.sendCommand("RPOP", key)
|
||||
return r.parseBulkString()
|
||||
|
||||
|
||||
proc rPopLPush*(r: TRedis, source, destination: string): TRedisString =
|
||||
## Remove the last element in a list, append it to another list and return it
|
||||
r.sendCommand("RPOPLPUSH", source, destination)
|
||||
return r.parseBulkString()
|
||||
|
||||
|
||||
proc rPush*(r: TRedis, key, value: string, create: bool = True): TRedisInteger =
|
||||
## Append a value to a list. Returns the length of the list after the push.
|
||||
## The ``create`` param specifies whether a list should be created if it
|
||||
@@ -671,16 +670,16 @@ proc zinterstore*(r: TRedis, destination: string, numkeys: string,
|
||||
## a new key
|
||||
var args = @[destination, numkeys]
|
||||
for i in items(keys): args.add(i)
|
||||
|
||||
|
||||
if weights.len != 0:
|
||||
args.add("WITHSCORE")
|
||||
for i in items(weights): args.add(i)
|
||||
if aggregate.len != 0:
|
||||
args.add("AGGREGATE")
|
||||
args.add(aggregate)
|
||||
|
||||
|
||||
r.sendCommand("ZINTERSTORE", args)
|
||||
|
||||
|
||||
return r.parseInteger()
|
||||
|
||||
proc zrange*(r: TRedis, key: string, start: string, stop: string,
|
||||
@@ -692,18 +691,18 @@ proc zrange*(r: TRedis, key: string, start: string, stop: string,
|
||||
r.sendCommand("ZRANGE", "WITHSCORES", key, start, stop)
|
||||
return r.parseArray()
|
||||
|
||||
proc zrangebyscore*(r: TRedis, key: string, min: string, max: string,
|
||||
proc zrangebyscore*(r: TRedis, key: string, min: string, max: string,
|
||||
withScore: bool = false, limit: bool = False,
|
||||
limitOffset: int = 0, limitCount: int = 0): TRedisList =
|
||||
## Return a range of members in a sorted set, by score
|
||||
var args = @[key, min, max]
|
||||
|
||||
|
||||
if withScore: args.add("WITHSCORE")
|
||||
if limit:
|
||||
if limit:
|
||||
args.add("LIMIT")
|
||||
args.add($limitOffset)
|
||||
args.add($limitCount)
|
||||
|
||||
|
||||
r.sendCommand("ZRANGEBYSCORE", args)
|
||||
return r.parseArray()
|
||||
|
||||
@@ -738,19 +737,19 @@ proc zrevrange*(r: TRedis, key: string, start: string, stop: string,
|
||||
else: r.sendCommand("ZREVRANGE", key, start, stop)
|
||||
return r.parseArray()
|
||||
|
||||
proc zrevrangebyscore*(r: TRedis, key: string, min: string, max: string,
|
||||
proc zrevrangebyscore*(r: TRedis, key: string, min: string, max: string,
|
||||
withScore: bool = false, limit: bool = False,
|
||||
limitOffset: int = 0, limitCount: int = 0): TRedisList =
|
||||
## Return a range of members in a sorted set, by score, with
|
||||
## scores ordered from high to low
|
||||
var args = @[key, min, max]
|
||||
|
||||
|
||||
if withScore: args.add("WITHSCORE")
|
||||
if limit:
|
||||
if limit:
|
||||
args.add("LIMIT")
|
||||
args.add($limitOffset)
|
||||
args.add($limitCount)
|
||||
|
||||
|
||||
r.sendCommand("ZREVRANGEBYSCORE", args)
|
||||
return r.parseArray()
|
||||
|
||||
@@ -771,16 +770,16 @@ proc zunionstore*(r: TRedis, destination: string, numkeys: string,
|
||||
## Add multiple sorted sets and store the resulting sorted set in a new key
|
||||
var args = @[destination, numkeys]
|
||||
for i in items(keys): args.add(i)
|
||||
|
||||
|
||||
if weights.len != 0:
|
||||
args.add("WEIGHTS")
|
||||
for i in items(weights): args.add(i)
|
||||
if aggregate.len != 0:
|
||||
args.add("AGGREGATE")
|
||||
args.add(aggregate)
|
||||
|
||||
|
||||
r.sendCommand("ZUNIONSTORE", args)
|
||||
|
||||
|
||||
return r.parseInteger()
|
||||
|
||||
|
||||
@@ -809,7 +808,7 @@ proc subscribe*(r: TRedis, channel: openarray[string]): ???? =
|
||||
return ???
|
||||
|
||||
proc unsubscribe*(r: TRedis, [channel: openarray[string], : string): ???? =
|
||||
## Stop listening for messages posted to the given channels
|
||||
## Stop listening for messages posted to the given channels
|
||||
r.socket.send("UNSUBSCRIBE $# $#\c\L" % [[channel.join(), ])
|
||||
return ???
|
||||
|
||||
@@ -824,15 +823,16 @@ proc discardMulti*(r: TRedis) =
|
||||
|
||||
proc exec*(r: TRedis): TRedisList =
|
||||
## Execute all commands issued after MULTI
|
||||
r.sendCommand("EXEC")
|
||||
r.sendCommand("EXEC")
|
||||
r.pipeline.enabled = false
|
||||
# Will reply with +OK for MULTI/EXEC and +QUEUED for every command
|
||||
# between, then with the results
|
||||
return r.flushPipeline(true)
|
||||
|
||||
|
||||
proc multi*(r: TRedis) =
|
||||
## Mark the start of a transaction block
|
||||
r.setPipeline(true)
|
||||
r.sePPipeline(true)
|
||||
r.sendCommand("MULTI")
|
||||
raiseNoOK(r.parseStatus())
|
||||
|
||||
@@ -960,7 +960,7 @@ proc slaveof*(r: TRedis, host: string, port: string) =
|
||||
|
||||
iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] =
|
||||
## Iterator for keys and values in a hash.
|
||||
var
|
||||
var
|
||||
contents = r.hGetAll(key)
|
||||
k = ""
|
||||
for i in items(contents):
|
||||
@@ -969,7 +969,7 @@ iterator hPairs*(r: TRedis, key: string): tuple[key, value: string] =
|
||||
else:
|
||||
yield (k, i)
|
||||
k = ""
|
||||
|
||||
|
||||
proc someTests(r: TRedis) =
|
||||
#r.auth("pass")
|
||||
|
||||
@@ -989,22 +989,22 @@ proc someTests(r: TRedis) =
|
||||
discard r.lpush("mylist","itemb")
|
||||
r.ltrim("mylist",0,1)
|
||||
var p = r.lrange("mylist", 0, -1)
|
||||
|
||||
|
||||
for i in items(p):
|
||||
if not isNil(i):
|
||||
echo(" ", i)
|
||||
|
||||
echo(" ", i)
|
||||
|
||||
echo(r.debugObject("mylist"))
|
||||
|
||||
r.configSet("timeout", "299")
|
||||
for i in items(r.configGet("timeout")): echo ">> ", i
|
||||
|
||||
echo r.echoServ("BLAH")
|
||||
|
||||
|
||||
|
||||
|
||||
when false:
|
||||
var r = open()
|
||||
|
||||
|
||||
# Test with no pipelining
|
||||
echo("----------------------------------------------")
|
||||
echo("Testing without pipelining.")
|
||||
@@ -1015,15 +1015,15 @@ when false:
|
||||
echo()
|
||||
echo("Testing with pipelining.")
|
||||
echo()
|
||||
r.setPipeline(true)
|
||||
r.sePPipeline(true)
|
||||
r.someTests()
|
||||
var list = r.flushPipeline()
|
||||
r.setPipeline(false)
|
||||
r.sePPipeline(false)
|
||||
echo("-- list length is " & $list.len & " --")
|
||||
for item in list:
|
||||
if not isNil(item):
|
||||
echo item
|
||||
|
||||
|
||||
# Test with multi/exec() (automatic pipelining)
|
||||
echo("************************************************")
|
||||
echo("Testing with transaction (automatic pipelining)")
|
||||
|
||||
Reference in New Issue
Block a user