Build Your Own Redis: ECHO [4/4]
This is the fourth article in a series where we’ll build a toy Redis clone in Ruby. If you’d like to code-along, try the Build your own Redis challenge!
Previous article: Concurrent Clients
Sections in this article:
The ECHO command
The ECHO command does exactly what it says:
➜ rohitpaulk.com git:(master) ✗ redis-cli
127.0.0.1:6379> echo hey
"hey"
127.0.0.1:6379> echo hello
"hello"
Since this commands involves user input, we’ll need to be able to decode RESP first.
Integrated Test
Like we did in the previous article, let’s start out with an integrated test.
require "redis" require "minitest/autorun" # 6379 for official redis, 6380 for ours SERVER_PORT = ENV["SERVER_PORT"] class TestRedisServer < Minitest::Test def test_responds_to_ping r = Redis.new(port: SERVER_PORT) assert_equal "PONG", r.ping end def test_multiple_commands_from_same_client r = Redis.new(port: SERVER_PORT) # The Redis client re-connects on timeout by default, without_reconnect # prevents that. r.without_reconnect do assert_equal "PONG", r.ping assert_equal "PONG", r.ping end end def test_multiple_clients r1 = Redis.new(port: SERVER_PORT) r2 = Redis.new(port: SERVER_PORT) assert_equal "PONG", r1.ping assert_equal "PONG", r2.ping end+ + def test_responds_to_echo+ r = Redis.new(port: SERVER_PORT)+ assert_equal "hey", r.echo("hey")+ assert_equal "hello", r.echo("hello")+ end end
When run against our version of redis, this test fails:
➜ SERVER_PORT=6380 ruby _files/redis/integrated_test_4.rb
Run options: --seed 31312
# Running:
..F.
1) Failure:
TestRedisServer#test_responds_to_echo [_files/redis/integrated_test_4.rb:34]:
Expected: "hey"
Actual: "PONG"
4 runs, 6 assertions, 1 failures, 0 errors, 0 skips
This is expected. In the second article of this series we
returned PONG
as a reply for every command, remember?
Decoding RESP
In the last article, we looked at how RESP works. Now that we need to decode
RESP in our program, let’s implement a RESPDecoder
.
For a command like ECHO hey
, here’s what the client will send us:
*2\r\n$4\r\nECHO\r\n$3\r\nhey\r\n
Quick recap of what this means:
*
- denotes that the data type is an array2
- the number of elements in the array\r\n
- delimiter, the actual elements start after this$4\r\nECHO\r\n
-"ECHO"
, encoded as a Bulk String$3\r\nhey\r\n
-"hey"
, encoded as a Bulk String
That’s 23 (1 + 1 + 2 + 10 + 9) bytes in total.
Since we’re using TCP, there’s no guarantee that all these bytes will arrive at
the same time. Maybe we’ll receive *2\r\n
first, and
$4\r\nECHO\r\n$3\r\nhey\r\n
later?
In an event loop, we don’t have the luxury of waiting till the client sends us complete RESP. We’ll have to maintain a buffer, and design a RESP decoder that is capable of identifying incomplete input.
Let’s now start out with writing our RESP decoder. It’ll support decoding Simple
String, Bulk String and Arrays. If it encounters an incomplete RESP string,
it’ll throw an IncompleteRESP
exception.
Note: What follows is a LOT of code. Feel free to skip over to the implementation of ECHO if you aren’t interested in the nitty-gritty of parsing RESP.
Simple Strings
Starting out with a basic spec for decoding simple strings:
require "minitest/autorun"
class TestRESPDecoder < Minitest::Test
def test_simple_string
assert_equal "OK", RESPDecoder.decode("+OK\r\n")
assert_equal "HEY", RESPDecoder.decode("+HEY\r\n")
assert_raises(IncompleteRESP) { RESPDecoder.decode("+") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK") }
assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK\r") }
end
end
Now the implementation:
class IncompleteRESP < Exception; end
class RESPDecoder
def self.decode(resp_str)
resp_io = StringIO.new(resp_str)
first_char = resp_io.read(1)
if first_char == "+"
self.decode_simple_string(resp_io)
else
raise RuntimeError.new("Unhandled first_char: #{first_char}")
end
rescue EOFError
raise IncompleteRESP
end
def self.decode_simple_string(resp_io)
read = resp_io.readline(sep = "\r\n")
if read[-2..-1] != "\r\n"
raise IncompleteRESP
end
read[0..-3]
end
end
Bulk Strings
Let’s amend our test suite to handle bulk strings:
require "minitest/autorun" class TestRESPDecoder < Minitest::Test def test_simple_string assert_equal "OK", RESPDecoder.decode("+OK\r\n") assert_equal "HEY", RESPDecoder.decode("+HEY\r\n") assert_raises(IncompleteRESP) { RESPDecoder.decode("+") } assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK") } assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK\r") } end+ + def test_bulk_string+ assert_equal "OK", RESPDecoder.decode("$2\r\nOK\r\n")+ assert_equal "HEY", RESPDecoder.decode("$3\r\nHEY\r\n")+ assert_equal "HEY", RESPDecoder.decode("$3\r\nHEY\r\n")+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$") }+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$2") }+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r") }+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\n") }+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\nOK") }+ assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\nOK\r") }+ end end
Now the implementation:
class IncompleteRESP < Exception; end class RESPDecoder def self.decode(resp_str) resp_io = StringIO.new(resp_str) first_char = resp_io.read(1) if first_char == "+" self.decode_simple_string(resp_io)+ elsif first_char == "$"+ self.decode_bulk_string(resp_io) else raise RuntimeError.new("Unhandled first_char: #{first_char}") end rescue EOFError raise IncompleteRESP end def self.decode_simple_string(resp_io) read = resp_io.readline(sep = "\r\n") if read[-2..-1] != "\r\n" raise IncompleteRESP end read[0..-3] end+ + def self.decode_bulk_string(resp_io)+ byte_count_with_clrf = resp_io.readline(sep = "\r\n")+ if byte_count_with_clrf[-2..-1] != "\r\n"+ raise IncompleteRESP+ end+ byte_count = byte_count_with_clrf.to_i+ str = resp_io.read(byte_count)+ + # Exactly the advertised number of bytes must be present+ raise IncompleteRESP unless str && str.length == byte_count+ + # Consume the ending CLRF+ raise IncompleteRESP unless resp_io.read(2) == "\r\n"+ + str+ end end
Arrays
And now for the last data type: Arrays.
require "minitest/autorun" class TestRESPDecoder < Minitest::Test def test_simple_string assert_equal "OK", RESPDecoder.decode("+OK\r\n") assert_equal "HEY", RESPDecoder.decode("+HEY\r\n") assert_raises(IncompleteRESP) { RESPDecoder.decode("+") } assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK") } assert_raises(IncompleteRESP) { RESPDecoder.decode("+OK\r") } end def test_bulk_string assert_equal "OK", RESPDecoder.decode("$2\r\nOK\r\n") assert_equal "HEY", RESPDecoder.decode("$3\r\nHEY\r\n") assert_equal "HEY", RESPDecoder.decode("$3\r\nHEY\r\n") assert_raises(IncompleteRESP) { RESPDecoder.decode("$") } assert_raises(IncompleteRESP) { RESPDecoder.decode("$2") } assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r") } assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\n") } assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\nOK") } assert_raises(IncompleteRESP) { RESPDecoder.decode("$2\r\nOK\r") } end+ + def test_arrays+ assert_equal ["PING"], RESPDecoder.decode("*1\r\n$4\r\nPING\r\n")+ assert_equal ["ECHO", "hey"], RESPDecoder.decode("*2\r\n$4\r\nECHO\r\n$3\r\nhey\r\n")+ assert_raises(IncompleteRESP) { RESPDecoder.decode("*") }+ assert_raises(IncompleteRESP) { RESPDecoder.decode("*1") }+ assert_raises(IncompleteRESP) { RESPDecoder.decode("*1\r\n") }+ assert_raises(IncompleteRESP) { RESPDecoder.decode("*1\r\n$4") }+ assert_raises(IncompleteRESP) { RESPDecoder.decode("*2\r\n$4\r\nECHO\r\n") }+ end end
Since arrays can contain anything inside them (simple strings, bulk strings, or other arrays themselves), we’ll try to re-use the existing code for parsing simple strings & bulk strings.
class IncompleteRESP < Exception; end class RESPDecoder def self.decode(resp_str) resp_io = StringIO.new(resp_str)+ self.do_decode(resp_io)+ end+ + def self.do_decode(resp_io) first_char = resp_io.read(1)+ raise IncompleteRESP if first_char.nil?+ if first_char == "+" self.decode_simple_string(resp_io) elsif first_char == "$" self.decode_bulk_string(resp_io)+ elsif first_char == "*"+ self.decode_array(resp_io) else raise RuntimeError.new("Unhandled first_char: #{first_char}") end rescue EOFError raise IncompleteRESP end def self.decode_simple_string(resp_io) read = resp_io.readline(sep = "\r\n") if read[-2..-1] != "\r\n" raise IncompleteRESP end read[0..-3] end def self.decode_bulk_string(resp_io)- byte_count_with_clrf = resp_io.readline(sep = "\r\n")- if byte_count_with_clrf[-2..-1] != "\r\n"- raise IncompleteRESP- end- byte_count = byte_count_with_clrf.to_i+ byte_count = read_int_with_clrf(resp_io) str = resp_io.read(byte_count) # Exactly the advertised number of bytes must be present raise IncompleteRESP unless str && str.length == byte_count # Consume the ending CLRF raise IncompleteRESP unless resp_io.read(2) == "\r\n" str end+ + def self.decode_array(resp_io)+ element_count = read_int_with_clrf(resp_io)+ + # Recurse, using do_decode+ element_count.times.map { self.do_decode(resp_io) }+ end+ + def self.read_int_with_clrf(resp_io)+ int_with_clrf = resp_io.readline(sep = "\r\n")+ if int_with_clrf[-2..-1] != "\r\n"+ raise IncompleteRESP+ end+ int_with_clrf.to_i+ end end
Implementing ECHO
Now that we’ve got a RESP decoder in place, let’s get back to the original task:
Implementing the ECHO
command.
Refactor: Extract out a Client class
Since we know we’ll need to read small chunks and maintain a buffer, let’s start
with refactoring out a Client
class that holds that state.
require "socket" + class Client+ attr_reader :socket+ + def initialize(socket)+ @socket = socket+ @buffer = ""+ end+ + def read_available+ @buffer += @socket.readpartial(1024)+ end+ + def write(msg)+ @socket.write(msg)+ end+ end+ class RedisServer def initialize(port) @server = TCPServer.new(port)- @clients = []+ @sockets_to_clients = {} end def listen loop do- fds_to_watch = [@server, *@clients]+ fds_to_watch = [@server, *@sockets_to_clients.keys] ready_to_read, _, _ = IO.select(fds_to_watch) ready_to_read.each do |ready| if ready == @server- @clients << @server.accept+ client_socket = @server.accept+ @sockets_to_clients[client_socket] = Client.new(client_socket) else # If not the server, this must be one of the existing clients+ client = @sockets_to_clients[ready] handle_client(client) end end end end def handle_client(client)- client.readpartial(1024) # TODO: Read actual command+ client.read_available # TODO: Read actual command # TODO: Handle commands other than PING client.write("+PONG\r\n")+ rescue Errno::ECONNRESET, EOFError+ # If the client has disconnected, let's+ # remove from our list of active clients+ @sockets_to_clients.delete(client.socket) end end
Running our test suite to make sure the refactor worked:
➜ SERVER_PORT=6380 ruby _files/redis/integrated_test_4.rb
Run options: --seed 4190
# Running:
..F.
1) Failure:
TestRedisServer#test_responds_to_echo [_files/redis/integrated_test_4.rb:34]:
Expected: "hey"
Actual: "PONG"
4 runs, 6 assertions, 1 failures, 0 errors, 0 skips
The failure is expected, let’s fix that next by actually parsing commands.
Parsing commands using RESPDecoder
We’ll use the RESPDecoder
we built to try decoding the buffer in a client.
require "socket"+ require_relative "resp_decoder_3"+ + class Command+ attr_reader :action+ attr_reader :args+ + def initialize(action, args)+ @action = action+ @args = args+ end+ end class Client attr_reader :socket def initialize(socket) @socket = socket @buffer = "" end def read_available @buffer += @socket.readpartial(1024) end + def consume_command!+ array = RESPDecoder.decode(@buffer)+ @buffer = "" # Reset buffer after consuming command+ Command.new(array.first, array[1..-1])+ rescue IncompleteRESP+ # The client hasn't sent us a complete command yet+ return nil+ end+ def write(msg) @socket.write(msg) end end class RedisServer def initialize(port) @server = TCPServer.new(port) @sockets_to_clients = {} end def listen loop do fds_to_watch = [@server, *@sockets_to_clients.keys] ready_to_read, _, _ = IO.select(fds_to_watch) ready_to_read.each do |ready| if ready == @server client_socket = @server.accept @sockets_to_clients[client_socket] = Client.new(client_socket) else # If not the server, this must be one of the existing clients client = @sockets_to_clients[ready] handle_client(client) end end end end def handle_client(client)- client.read_available # TODO: Read actual command- - # TODO: Handle commands other than PING- client.write("+PONG\r\n")+ client.read_available+ loop do+ command = client.consume_command!+ break unless command+ handle_command(client, command)+ end rescue Errno::ECONNRESET, EOFError # If the client has disconnected, let's # remove from our list of active clients @sockets_to_clients.delete(client.socket) end+ + def handle_command(client, command)+ if command.action.downcase == "ping"+ client.write("+PONG\r\n")+ elsif command.action.downcase == "echo"+ client.write("+#{command.args.first}\r\n")+ else+ raise RuntimeError.new("Unhandled command: #{command.action}")+ end+ end end
Each time we read from the client, we try to decode contents the buffer. If we
run into IncompleteRESP
, we’ll skip the action and get back to the event loop.
Tests pass!
➜ SERVER_PORT=6380 ruby _files/redis/integrated_test_4.rb
Run options: --seed 17051
# Running:
....
4 runs, 7 assertions, 0 failures, 0 errors, 0 skips
Our Redis server is now capable of serving multiple clients with the PING
and
ECHO
commands.
The next article is in progress, it’ll be about implementing the GET
& SET
commands. If you’d like to get access to a draft version, please let me
know.