Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@
import software.amazon.awssdk.services.glue.model.RegisterSchemaVersionResponse;
import software.amazon.awssdk.services.glue.model.RegistryId;
import software.amazon.awssdk.services.glue.model.SchemaId;
import software.amazon.awssdk.services.glue.model.SchemaVersionNumber;
import software.amazon.awssdk.services.glue.model.Compatibility;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;

import com.amazonaws.services.schemaregistry.common.compatibility.JsonSchemaCompatibilityChecker;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.UUID;
Expand Down Expand Up @@ -293,6 +299,11 @@ public UUID registerSchemaVersion(String schemaDefinition, String schemaName, St
*/
public GetSchemaVersionResponse registerSchemaVersion(String schemaDefinition, String schemaName, String dataFormat) throws AWSSchemaRegistryException {

// Perform client-side JSON Schema compatibility check before registering
if ("JSON".equals(dataFormat)) {
validateJsonSchemaCompatibility(schemaDefinition, schemaName);
}

GetSchemaVersionResponse schemaVersionResponse = null;

try {
Expand Down Expand Up @@ -320,6 +331,51 @@ public GetSchemaVersionResponse registerSchemaVersion(String schemaDefinition, S
return schemaVersionResponse;
}

/**
* Validates JSON Schema compatibility client-side before registration.
* This is needed because AWS Glue service does not correctly enforce compatibility for JSON Schema.
*/
private void validateJsonSchemaCompatibility(String newSchemaDefinition, String schemaName) {
Compatibility compatibility = glueSchemaRegistryConfiguration.getCompatibilitySetting();
if (compatibility == null || compatibility == Compatibility.NONE) {
return;
}

try {
// Get the latest schema version
GetSchemaVersionRequest request = GetSchemaVersionRequest.builder()
.schemaId(getSchemaIdRequestObject(schemaName, glueSchemaRegistryConfiguration.getRegistryName()))
.schemaVersionNumber(SchemaVersionNumber.builder().latestVersion(true).build())
.build();

GetSchemaVersionResponse latestVersion = client.getSchemaVersion(request);
String previousSchemaDefinition = latestVersion.schemaDefinition();

// Perform compatibility check
JsonSchemaCompatibilityChecker checker = new JsonSchemaCompatibilityChecker();
List<String> errors = checker.checkCompatibility(newSchemaDefinition, previousSchemaDefinition, compatibility);

if (!errors.isEmpty()) {
String errorMessage = String.format(
"Schema compatibility check failed for schema '%s' with %s compatibility. Errors: %s",
schemaName, compatibility, String.join("; ", errors));
throw new AWSSchemaRegistryException(errorMessage);
}

log.debug("JSON Schema compatibility check passed for schema '{}'", schemaName);

} catch (EntityNotFoundException e) {
// No previous version exists, this is the first version - skip compatibility check
log.debug("No previous schema version found for '{}', skipping compatibility check", schemaName);
} catch (AWSSchemaRegistryException e) {
// Re-throw our own exceptions
throw e;
} catch (Exception e) {
// Log warning but don't fail - let AWS Glue handle it
log.warn("Could not perform client-side JSON Schema compatibility check: {}", e.getMessage());
}
}

private GetSchemaVersionResponse transformToGetSchemaVersionResponse(RegisterSchemaVersionResponse registerSchemaVersionResponse) {
return GetSchemaVersionResponse.builder()
.schemaVersionId(registerSchemaVersionResponse.schemaVersionId())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.amazonaws.services.schemaregistry.common.compatibility;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.awssdk.services.glue.model.Compatibility;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Client-side JSON Schema compatibility checker.
*
* This provides compatibility validation that AWS Glue service should perform but doesn't
* correctly implement for JSON Schema (as of Dec 2024).
*
* Compatibility modes:
* - BACKWARD: New schema can read old data (new consumers, old producers)
* - FORWARD: Old schema can read new data (old consumers, new producers)
* - FULL: Both backward and forward compatible
*/
public class JsonSchemaCompatibilityChecker {

private static final ObjectMapper MAPPER = new ObjectMapper();

/**
* Check if newSchema is compatible with previousSchema according to the compatibility mode.
*
* @param newSchemaDefinition The new schema definition (JSON string)
* @param previousSchemaDefinition The previous schema definition (JSON string)
* @param compatibility The compatibility mode to check
* @return List of compatibility errors (empty if compatible)
*/
public List<String> checkCompatibility(String newSchemaDefinition,
String previousSchemaDefinition,
Compatibility compatibility) {
List<String> errors = new ArrayList<>();

if (compatibility == null || compatibility == Compatibility.NONE
|| compatibility == Compatibility.UNKNOWN_TO_SDK_VERSION) {
return errors;
}

try {
JsonNode newSchema = MAPPER.readTree(newSchemaDefinition);
JsonNode previousSchema = MAPPER.readTree(previousSchemaDefinition);

String compatStr = compatibility.toString();
boolean checkBackward = compatStr.startsWith("BACKWARD") || compatStr.startsWith("FULL");
boolean checkForward = compatStr.startsWith("FORWARD") || compatStr.startsWith("FULL");

if (checkBackward) {
errors.addAll(checkBackwardCompatibility(newSchema, previousSchema, ""));
}
if (checkForward) {
errors.addAll(checkForwardCompatibility(newSchema, previousSchema, ""));
}
} catch (Exception e) {
errors.add("Failed to parse schema: " + e.getMessage());
}

return errors;
}

/**
* BACKWARD: New schema can read old data.
* - Cannot add required fields (old data won't have them)
* - Can remove required fields (making them optional is fine)
* - Can add optional fields
*/
private List<String> checkBackwardCompatibility(JsonNode newSchema, JsonNode previousSchema, String path) {
List<String> errors = new ArrayList<>();

Set<String> newRequired = getRequiredFields(newSchema);
Set<String> previousRequired = getRequiredFields(previousSchema);

// Check for new required fields that weren't required before
for (String field : newRequired) {
if (!previousRequired.contains(field)) {
errors.add(String.format(
"BACKWARD incompatible: Field '%s%s' is now required but was not required in previous schema. " +
"Old data may not have this field.",
path.isEmpty() ? "" : path + ".", field));
}
}

// Check nested definitions
errors.addAll(checkDefinitionsCompatibility(newSchema, previousSchema, true));

return errors;
}

/**
* FORWARD: Old schema can read new data.
* - Cannot make required fields optional (old consumers expect them)
* - Cannot remove required fields (old consumers expect them)
* - Can add new required fields (old consumers ignore extra fields)
*/
private List<String> checkForwardCompatibility(JsonNode newSchema, JsonNode previousSchema, String path) {
List<String> errors = new ArrayList<>();

Set<String> newRequired = getRequiredFields(newSchema);
Set<String> previousRequired = getRequiredFields(previousSchema);

// Check for required fields that became optional or were removed
for (String field : previousRequired) {
if (!newRequired.contains(field)) {
errors.add(String.format(
"FORWARD incompatible: Field '%s%s' was required but is now optional or removed. " +
"Old consumers expect this field to be present.",
path.isEmpty() ? "" : path + ".", field));
}
}

// Check nested definitions
errors.addAll(checkDefinitionsCompatibility(newSchema, previousSchema, false));

return errors;
}

/**
* Check compatibility of nested definitions (for $ref support).
*/
private List<String> checkDefinitionsCompatibility(JsonNode newSchema, JsonNode previousSchema,
boolean isBackward) {
List<String> errors = new ArrayList<>();

JsonNode newDefs = getDefinitions(newSchema);
JsonNode prevDefs = getDefinitions(previousSchema);

if (newDefs == null || prevDefs == null) {
return errors;
}

Iterator<Map.Entry<String, JsonNode>> fields = prevDefs.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> entry = fields.next();
String defName = entry.getKey();
JsonNode prevDef = entry.getValue();
JsonNode newDef = newDefs.get(defName);

if (newDef != null) {
String defPath = "definitions." + defName;
if (isBackward) {
errors.addAll(checkBackwardCompatibility(newDef, prevDef, defPath));
} else {
errors.addAll(checkForwardCompatibility(newDef, prevDef, defPath));
}
}
}

return errors;
}

/**
* Extract required fields from a JSON Schema node.
*/
private Set<String> getRequiredFields(JsonNode schema) {
Set<String> required = new HashSet<>();

// Handle wrapper object (e.g., {"MyType": {...schema...}})
JsonNode schemaNode = unwrapSchema(schema);

JsonNode requiredNode = schemaNode.get("required");
if (requiredNode != null && requiredNode.isArray()) {
for (JsonNode field : requiredNode) {
required.add(field.asText());
}
}

return required;
}

/**
* Get definitions node (supports both "definitions" and "$defs").
*/
private JsonNode getDefinitions(JsonNode schema) {
JsonNode schemaNode = unwrapSchema(schema);

JsonNode defs = schemaNode.get("definitions");
if (defs != null) {
return defs;
}
return schemaNode.get("$defs");
}

/**
* Unwrap schema if it's wrapped in a named object.
* Handles schemas like: {"MyTypeName": {"type": "object", ...}}
*/
private JsonNode unwrapSchema(JsonNode schema) {
if (schema.has("type") || schema.has("$schema") || schema.has("properties")) {
return schema;
}

// Check if it's a wrapper with a single named schema
Iterator<Map.Entry<String, JsonNode>> fields = schema.fields();
JsonNode firstValue = null;
int count = 0;

while (fields.hasNext()) {
Map.Entry<String, JsonNode> entry = fields.next();
String key = entry.getKey();
// Skip known non-wrapper keys
if (key.equals("definitions") || key.equals("$defs")) {
continue;
}
firstValue = entry.getValue();
count++;
}

// If there's exactly one non-definition field and it looks like a schema, unwrap it
if (count == 1 && firstValue != null &&
(firstValue.has("type") || firstValue.has("properties"))) {
return firstValue;
}

return schema;
}
}
Loading
Loading