A PHP 7.4+ library to consume the Confluent Schema Registry REST API. It provides low level functions to create PSR-7 compliant requests that can be used as well as high level abstractions to ease developer experience.
Dependency | Version | Reason |
---|---|---|
php |
~7.4 | Anything lower has reached EOL |
guzzlephp/guzzle |
~7.0 | Using Request to build PSR-7 RequestInterface |
beberlei/assert |
~2.7|~3.0 | The de-facto standard assertions library for PHP |
flix-tech/avro-php |
^4.1 | Maintained fork of the only Avro PHP implementation: rg/avro-php |
Dependency | Version | Reason |
---|---|---|
doctrine/cache |
~1.3 | If you want to use the DoctrineCacheAdapter |
psr/cache |
^1.0 | If you want to use the CacheItemPoolAdapter |
psr/simple-cache |
^1.0 | If you want to use the SimpleCacheAdapter |
This library is installed via composer
.
composer require "flix-tech/confluent-schema-registry-api=^7.4"
NOTE
If you are still running on a version less than
5.0.3
we recommend updating it immediately since there was a critical bug with the exception handling.
This library follows strict semantic versioning, so you can expect any minor and patch release to be compatible, while major version upgrades will have incompatibilities that will be released in the UPGRADE.md file.
<?php
use GuzzleHttp\Client;
use FlixTech\SchemaRegistryApi\Registry\GuzzlePromiseAsyncRegistry;
use FlixTech\SchemaRegistryApi\Exception\SchemaRegistryException;
$registry = new GuzzlePromiseAsyncRegistry(
new Client(['base_uri' => 'registry.example.com'])
);
// Register a schema with a subject
$schema = AvroSchema::parse('{"type": "string"}');
// The promise will either contain a schema id as int when fulfilled,
// or a SchemaRegistryException instance when rejected.
// If the subject does not exist, it will be created implicitly
$promise = $registry->register('test-subject', $schema);
// If you want to resolve the promise, you might either get the value or an instance of a SchemaRegistryException
// It is more like an Either Monad, since returning Exceptions from rejection callbacks will throw them.
// We want to leave that decision to the user of the lib.
// TODO: Maybe return an Either Monad instead
$promise = $promise->then(
static function ($schemaIdOrSchemaRegistryException) {
if ($schemaIdOrSchemaRegistryException instanceof SchemaRegistryException) {
throw $schemaIdOrSchemaRegistryException;
}
return $schemaIdOrSchemaRegistryException;
}
);
// Resolve the promise
$schemaId = $promise->wait();
// Get a schema by schema id
$promise = $registry->schemaForId($schemaId);
// As above you could add additional callbacks to the promise
$schema = $promise->wait();
// Get the version of a schema for a given subject.
$version = $registry->schemaVersion(
'test-subject',
$schema
)->wait();
// You can also get a schema by subject and version
$schema = $registry->schemaForSubjectAndVersion('test-subject', $version)->wait();
// You can additionally just query for the currently latest schema within a subject.
// *NOTE*: Once you requested this it might not be the latest version anymore.
$latestSchema = $registry->latestVersion('test-subject')->wait();
// Sometimes you want to find out the global schema id for a given schema
$schemaId = $registry->schemaId('test-subject', $schema)->wait();
<?php
use FlixTech\SchemaRegistryApi\Registry\Decorators\BlockingDecorator;
use FlixTech\SchemaRegistryApi\Registry\GuzzlePromiseAsyncRegistry;
use GuzzleHttp\Client;
$registry = new BlockingDecorator(
new GuzzlePromiseAsyncRegistry(
new Client(['base_uri' => 'registry.example.com'])
)
);
// What the blocking registry does is actually resolving the promises
// with `wait` and adding a throwing rejection callback.
$schema = AvroSchema::parse('{"type": "string"}');
// This will be an int, and not a promise
$schemaId = $registry->register('test-subject', $schema);
There is a CachedRegistry
that accepts a CacheAdapter
together with a Registry
.
It supports both async and sync APIs.
NOTE:
From version 4.x of this library the API for the
CacheAdapterInterface
has been changed in order to allow caching of schema ids by hash of a given schema.
<?php
use FlixTech\SchemaRegistryApi\Registry\Decorators\BlockingDecorator;
use FlixTech\SchemaRegistryApi\Registry\GuzzlePromiseAsyncRegistry;
use FlixTech\SchemaRegistryApi\Registry\Decorators\CachingDecorator;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\Cache\DoctrineCacheAdapter;
use Doctrine\Common\Cache\ArrayCache;
use GuzzleHttp\Client;
$asyncApi = new GuzzlePromiseAsyncRegistry(
new Client(['base_uri' => 'registry.example.com'])
);
$syncApi = new BlockingDecorator($asyncApi);
$doctrineCachedSyncApi = new CachingDecorator(
$asyncApi,
new DoctrineCacheAdapter(
new ArrayCache()
)
);
// All adapters support both APIs, for async APIs additional fulfillment callbacks will be registered.
$avroObjectCachedAsyncApi = new CachingDecorator(
$asyncApi,
new AvroObjectCacheAdapter()
);
// NEW in version 4.x, passing in custom hash functions to cache schema ids via the schema hash
// By default the following function is used internally
$defaultHashFunction = static function (AvroSchema $schema) {
return md5((string) $schema);
};
// You can also define your own hash callable
$sha1HashFunction = static function (AvroSchema $schema) {
return sha1((string) $schema);
};
// Pass the hash function as optional 3rd parameter to the CachedRegistry constructor
$avroObjectCachedAsyncApi = new CachingDecorator(
$syncApi,
new AvroObjectCacheAdapter(),
$sha1HashFunction
);
There is a low-level API that provides simple functions that return PSR-7 request objects for the different endpoints of the registry. See Requests/Functions for more information.
There are also requests to use the new DELETE
API of the schema registry.
This library uses a Makefile
to run the test suite and requires docker
.
You can set the default variables by copying variables.mk.dist
to variables.mk
and change them to your liking.
PHP_VERSION=7.3 XDEBUG_VERSION=2.9.8 make docker
PHP_VERSION=7.3 make ci-local
This library uses a docker-compose
configuration to fire up a schema registry for integration testing, hence
docker-compose
from version 1.18.x is required to run those tests.
CONFLUENT_VERSION=latest
CONFLUENT_NETWORK_SUBNET=172.68.0.0/24
SCHEMA_REGISTRY_IPV4=172.68.0.103
KAFKA_BROKER_IPV4=172.68.0.102
ZOOKEEPER_IPV4=172.68.0.101
CONFLUENT_VERSION=5.2.3 make platform
make phpunit-integration
make clean
In order to contribute to this library, follow this workflow:
- Fork the repository
- Create a feature branch
- Work on the feature
- Run tests to verify that the tests are passing
- Open a PR to the upstream
- Be happy about contributing to open source!