Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-2928: Ruby implementation of single object encoding #956

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lang/ruby/Manifest
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ lib/avro/data_file.rb
lib/avro/io.rb
lib/avro/ipc.rb
lib/avro/logical_types.rb
lib/avro/message.rb
lib/avro/protocol.rb
lib/avro/schema.rb
lib/avro/schema_compatibility.rb
Expand All @@ -27,6 +28,7 @@ test/test_fingerprints.rb
test/test_help.rb
test/test_io.rb
test/test_logical_types.rb
test/test_message.rb
test/test_protocol.rb
test/test_schema.rb
test/test_schema_compatibility.rb
Expand Down
97 changes: 97 additions & 0 deletions lang/ruby/lib/avro/message.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Avro
module Message
ENCODING_MODE = "BINARY".freeze
FINGERPRINT_PACK_MODE = "Q<".freeze
MAGIC_NUMBER_PACK_MODE = "C*".freeze
READ_MODE = "r".freeze
SINGLE_OBJECT_MAGIC_NUMBER = [0xC3, 0x01].freeze
WRITE_MODE = "w".freeze

class BadHeaderError < AvroError; end
class MissingSchemaError < AvroError; end

class BinaryMessageEncoder
def initialize(schema)
@schema = schema
@writer = Avro::IO::DatumWriter.new(schema)
end

def encode(datum)
buffer = StringIO.new("", WRITE_MODE)
buffer.set_encoding(ENCODING_MODE) if buffer.respond_to?(:set_encoding)

encoder = Avro::IO::BinaryEncoder.new(buffer)

buffer.write(SINGLE_OBJECT_MAGIC_NUMBER.pack(MAGIC_NUMBER_PACK_MODE))
buffer.write([@schema.crc_64_avro_fingerprint].pack(FINGERPRINT_PACK_MODE))
@writer.write(datum, encoder)

buffer.string
end
end

class BinaryMessageDecoder
def initialize(schema_store, reader_schema = nil)
@schema_store = schema_store
@reader_schema = reader_schema
end

def decode(encoded_datum)
buffer = StringIO.new(encoded_datum, READ_MODE)
magic_number = buffer.read(2).unpack(MAGIC_NUMBER_PACK_MODE)

if magic_number != SINGLE_OBJECT_MAGIC_NUMBER
raise BadHeaderError.new("Unrecognized header bytes #{magic_number.map { |i| i.to_s(16).rjust(2, "0").upcase }}")
end

writer_schema_fingerprint = buffer.read(8).unpack(FINGERPRINT_PACK_MODE).first
writer_schema = @schema_store.find_by_fingerprint(writer_schema_fingerprint)

if !writer_schema
raise MissingSchemaError.new("Cannot resolve schema for fingerprint: #{writer_schema_fingerprint}")
end

decoder = Avro::IO::BinaryDecoder.new(buffer)

reader = Avro::IO::DatumReader.new(writer_schema, @reader_schema)
reader.read(decoder)
end
end

class SchemaStore
def self.new
Cache.new
end

class Cache
def initialize
@schemas = {}
end

def add_schema(schema)
@schemas[schema.crc_64_avro_fingerprint] = schema
end

def find_by_fingerprint(fingerprint)
@schemas[fingerprint]
end
end
end
end
end
166 changes: 166 additions & 0 deletions lang/ruby/test/test_message.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'test_help'
require 'avro/message'

class TestMessage < Test::Unit::TestCase
def test_decoder_without_reader_schema
writer_schema = Avro::Schema.parse(<<-JSON)
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
JSON

schema_store = Avro::Message::SchemaStore.new
schema_store.add_schema(writer_schema)

message_writer = Avro::Message::BinaryMessageEncoder.new(writer_schema)
message_reader = Avro::Message::BinaryMessageDecoder.new(schema_store)

datum_to_encode = { "name" => "Bob", "favorite_number" => 1, "favorite_color" => "Blue" }
encoded_datum = message_writer.encode(datum_to_encode)
decoded_datum = message_reader.decode(encoded_datum)

assert_equal(["C3", "01", "B2", "D1", "D8", "D3", "DE", "28", "33", "CE", "06", "42", "6F", "62", "00", "02", "00", "08", "42", "6C", "75", "65"].map { |hex| hex.to_i(16) },
encoded_datum.bytes)
assert_equal(datum_to_encode, decoded_datum)
end

def test_decoder_with_reader_schema
writer_schema = Avro::Schema.parse(<<-JSON)
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
JSON

reader_schema = Avro::Schema.parse(<<-JSON)
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"}
]
}
JSON

schema_store = Avro::Message::SchemaStore.new
schema_store.add_schema(writer_schema)

message_writer = Avro::Message::BinaryMessageEncoder.new(writer_schema)
message_reader = Avro::Message::BinaryMessageDecoder.new(schema_store, reader_schema)

datum_to_encode = { "name" => "Bob", "favorite_number" => 1, "favorite_color" => "Blue" }
encoded_datum = message_writer.encode(datum_to_encode)
decoded_datum = message_reader.decode(encoded_datum)

assert_equal({ "name" => "Bob" }, decoded_datum)
end

def test_decoder_with_incompatible_reader_schema
writer_schema = Avro::Schema.parse(<<-JSON)
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
JSON

reader_schema = Avro::Schema.parse(<<-JSON)
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "field_not_in_written_datum", "type": "string"}
]
}
JSON

schema_store = Avro::Message::SchemaStore.new
schema_store.add_schema(writer_schema)

message_writer = Avro::Message::BinaryMessageEncoder.new(writer_schema)
message_reader = Avro::Message::BinaryMessageDecoder.new(schema_store, reader_schema)

datum_to_encode = { "name" => "Bob", "favorite_number" => 1, "favorite_color" => "Blue" }
encoded_datum = message_writer.encode(datum_to_encode)
exception = assert_raise(Avro::AvroError) do
message_reader.decode(encoded_datum)
end

assert_match(/Missing data for "string" with no default/, exception.to_s)
end

def test_missing_schema
writer_schema = Avro::Schema.parse(<<-JSON)
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
JSON

schema_store = Avro::Message::SchemaStore.new

message_writer = Avro::Message::BinaryMessageEncoder.new(writer_schema)
message_reader = Avro::Message::BinaryMessageDecoder.new(schema_store)

datum_to_encode = { "name" => "Bob", "favorite_number" => 1, "favorite_color" => "Blue" }
encoded_datum = message_writer.encode(datum_to_encode)
exception = assert_raise(Avro::Message::MissingSchemaError) do
message_reader.decode(encoded_datum)
end

assert_match(/Cannot resolve schema for fingerprint: 14858264533127451058/, exception.to_s)
end

def test_bad_header
schema_store = Avro::Message::SchemaStore.new
message_reader = Avro::Message::BinaryMessageDecoder.new(schema_store)
exception = assert_raise(Avro::Message::BadHeaderError) do
message_reader.decode("invalid-data")
end

assert_match(/Unrecognized header bytes \[\"69\", \"6E\"\]/, exception.to_s)
end
end