diff --git a/lang/ruby/Manifest b/lang/ruby/Manifest index 6ce443580a0..b93841d20a1 100644 --- a/lang/ruby/Manifest +++ b/lang/ruby/Manifest @@ -11,6 +11,7 @@ lib/avro/data_file.rb lib/avro/io.rb lib/avro/ipc.rb lib/avro/logical_types.rb +lib/avro/message.rb lib/avro/protocol.rb lib/avro/schema.rb lib/avro/schema_compatibility.rb @@ -27,6 +28,7 @@ test/test_fingerprints.rb test/test_help.rb test/test_io.rb test/test_logical_types.rb +test/test_message.rb test/test_protocol.rb test/test_schema.rb test/test_schema_compatibility.rb diff --git a/lang/ruby/lib/avro/message.rb b/lang/ruby/lib/avro/message.rb new file mode 100644 index 00000000000..406772f0ea0 --- /dev/null +++ b/lang/ruby/lib/avro/message.rb @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +module Avro + module Message + ENCODING_MODE = "BINARY".freeze + FINGERPRINT_PACK_MODE = "Q<".freeze + MAGIC_NUMBER_PACK_MODE = "C*".freeze + READ_MODE = "r".freeze + SINGLE_OBJECT_MAGIC_NUMBER = [0xC3, 0x01].freeze + WRITE_MODE = "w".freeze + + class BadHeaderError < AvroError; end + class MissingSchemaError < AvroError; end + + class BinaryMessageEncoder + def initialize(schema) + @schema = schema + @writer = Avro::IO::DatumWriter.new(schema) + end + + def encode(datum) + buffer = StringIO.new("", WRITE_MODE) + buffer.set_encoding(ENCODING_MODE) if buffer.respond_to?(:set_encoding) + + encoder = Avro::IO::BinaryEncoder.new(buffer) + + buffer.write(SINGLE_OBJECT_MAGIC_NUMBER.pack(MAGIC_NUMBER_PACK_MODE)) + buffer.write([@schema.crc_64_avro_fingerprint].pack(FINGERPRINT_PACK_MODE)) + @writer.write(datum, encoder) + + buffer.string + end + end + + class BinaryMessageDecoder + def initialize(schema_store, reader_schema = nil) + @schema_store = schema_store + @reader_schema = reader_schema + end + + def decode(encoded_datum) + buffer = StringIO.new(encoded_datum, READ_MODE) + magic_number = buffer.read(2).unpack(MAGIC_NUMBER_PACK_MODE) + + if magic_number != SINGLE_OBJECT_MAGIC_NUMBER + raise BadHeaderError.new("Unrecognized header bytes #{magic_number.map { |i| i.to_s(16).rjust(2, "0").upcase }}") + end + + writer_schema_fingerprint = buffer.read(8).unpack(FINGERPRINT_PACK_MODE).first + writer_schema = @schema_store.find_by_fingerprint(writer_schema_fingerprint) + + if !writer_schema + raise MissingSchemaError.new("Cannot resolve schema for fingerprint: #{writer_schema_fingerprint}") + end + + decoder = Avro::IO::BinaryDecoder.new(buffer) + + reader = Avro::IO::DatumReader.new(writer_schema, @reader_schema) + reader.read(decoder) + end + end + + class SchemaStore + def self.new + Cache.new + end + + class Cache + def initialize + @schemas = {} + end + + def add_schema(schema) + @schemas[schema.crc_64_avro_fingerprint] = schema + end + + def find_by_fingerprint(fingerprint) + @schemas[fingerprint] + end + end + end + end +end diff --git a/lang/ruby/test/test_message.rb b/lang/ruby/test/test_message.rb new file mode 100644 index 00000000000..1f767c30df7 --- /dev/null +++ b/lang/ruby/test/test_message.rb @@ -0,0 +1,166 @@ +# -*- coding: utf-8 -*- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +require 'test_help' +require 'avro/message' + +class TestMessage < Test::Unit::TestCase + def test_decoder_without_reader_schema + writer_schema = Avro::Schema.parse(<<-JSON) + { + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] + } + JSON + + schema_store = Avro::Message::SchemaStore.new + schema_store.add_schema(writer_schema) + + message_writer = Avro::Message::BinaryMessageEncoder.new(writer_schema) + message_reader = Avro::Message::BinaryMessageDecoder.new(schema_store) + + datum_to_encode = { "name" => "Bob", "favorite_number" => 1, "favorite_color" => "Blue" } + encoded_datum = message_writer.encode(datum_to_encode) + decoded_datum = message_reader.decode(encoded_datum) + + assert_equal(["C3", "01", "B2", "D1", "D8", "D3", "DE", "28", "33", "CE", "06", "42", "6F", "62", "00", "02", "00", "08", "42", "6C", "75", "65"].map { |hex| hex.to_i(16) }, + encoded_datum.bytes) + assert_equal(datum_to_encode, decoded_datum) + end + + def test_decoder_with_reader_schema + writer_schema = Avro::Schema.parse(<<-JSON) + { + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] + } + JSON + + reader_schema = Avro::Schema.parse(<<-JSON) + { + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"} + ] + } + JSON + + schema_store = Avro::Message::SchemaStore.new + schema_store.add_schema(writer_schema) + + message_writer = Avro::Message::BinaryMessageEncoder.new(writer_schema) + message_reader = Avro::Message::BinaryMessageDecoder.new(schema_store, reader_schema) + + datum_to_encode = { "name" => "Bob", "favorite_number" => 1, "favorite_color" => "Blue" } + encoded_datum = message_writer.encode(datum_to_encode) + decoded_datum = message_reader.decode(encoded_datum) + + assert_equal({ "name" => "Bob" }, decoded_datum) + end + + def test_decoder_with_incompatible_reader_schema + writer_schema = Avro::Schema.parse(<<-JSON) + { + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] + } + JSON + + reader_schema = Avro::Schema.parse(<<-JSON) + { + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "field_not_in_written_datum", "type": "string"} + ] + } + JSON + + schema_store = Avro::Message::SchemaStore.new + schema_store.add_schema(writer_schema) + + message_writer = Avro::Message::BinaryMessageEncoder.new(writer_schema) + message_reader = Avro::Message::BinaryMessageDecoder.new(schema_store, reader_schema) + + datum_to_encode = { "name" => "Bob", "favorite_number" => 1, "favorite_color" => "Blue" } + encoded_datum = message_writer.encode(datum_to_encode) + exception = assert_raise(Avro::AvroError) do + message_reader.decode(encoded_datum) + end + + assert_match(/Missing data for "string" with no default/, exception.to_s) + end + + def test_missing_schema + writer_schema = Avro::Schema.parse(<<-JSON) + { + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] + } + JSON + + schema_store = Avro::Message::SchemaStore.new + + message_writer = Avro::Message::BinaryMessageEncoder.new(writer_schema) + message_reader = Avro::Message::BinaryMessageDecoder.new(schema_store) + + datum_to_encode = { "name" => "Bob", "favorite_number" => 1, "favorite_color" => "Blue" } + encoded_datum = message_writer.encode(datum_to_encode) + exception = assert_raise(Avro::Message::MissingSchemaError) do + message_reader.decode(encoded_datum) + end + + assert_match(/Cannot resolve schema for fingerprint: 14858264533127451058/, exception.to_s) + end + + def test_bad_header + schema_store = Avro::Message::SchemaStore.new + message_reader = Avro::Message::BinaryMessageDecoder.new(schema_store) + exception = assert_raise(Avro::Message::BadHeaderError) do + message_reader.decode("invalid-data") + end + + assert_match(/Unrecognized header bytes \[\"69\", \"6E\"\]/, exception.to_s) + end +end