Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"@version": 1.1,
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"dct": "http://purl.org/dc/terms/",
"dcat": "http://www.w3.org/ns/dcat#",
"QuerySpec": {
"@id": "edc:QuerySpec",
"@context": {
Expand Down Expand Up @@ -31,6 +32,14 @@
"conformsTo": {
"@id": "dct:conformsTo",
"@type": "@id"
},
"endpointURL": {
"@id": "dcat:endpointURL",
"@type": "@id"
},
"format": {
"@id": "dct:format",
"@type": "@id"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"@version": 1.1,
"edc": "https://w3id.org/edc/v0.0.1/ns/",
"dct": "http://purl.org/dc/terms/",
"dcat": "http://www.w3.org/ns/dcat#",
"Asset": {
"@id": "edc:Asset",
"@context": {
Expand Down Expand Up @@ -56,7 +57,15 @@
"properties": {
"@id": "edc:properties",
"@context": {
"@vocab": "https://w3id.org/edc/v0.0.1/ns/"
"@vocab": "https://w3id.org/edc/v0.0.1/ns/",
"endpointURL": {
"@id": "dcat:endpointURL",
"@type": "@id"
},
"format": {
"@id": "dct:format",
"@type": "@id"
}
}
},
"privateProperties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public AssetService assetService() {
var assetObservable = new AssetObservableImpl();
assetObservable.registerListener(new AssetEventListener(eventRouter));
return new AssetServiceImpl(assetIndex, contractNegotiationStore, transactionContext, assetObservable,
new AssetQueryValidator());
new AssetQueryValidator(), monitor);
}

@Provider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.edc.connector.controlplane.contract.spi.types.negotiation.ContractNegotiationStates;
import org.eclipse.edc.connector.controlplane.services.query.QueryValidator;
import org.eclipse.edc.connector.controlplane.services.spi.asset.AssetService;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.ServiceResult;
Expand All @@ -31,6 +32,8 @@
import java.util.function.Predicate;

import static java.lang.String.format;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_ENDPOINT_URL_ATTRIBUTE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCT_FORMAT_ATTRIBUTE;

public class AssetServiceImpl implements AssetService {

Expand All @@ -41,15 +44,17 @@
private final TransactionContext transactionContext;
private final AssetObservable observable;
private final QueryValidator queryValidator;
private final Monitor monitor;

public AssetServiceImpl(AssetIndex index, ContractNegotiationStore contractNegotiationStore,
TransactionContext transactionContext, AssetObservable observable,
QueryValidator queryValidator) {
QueryValidator queryValidator, Monitor monitor) {
this.index = index;
this.contractNegotiationStore = contractNegotiationStore;
this.transactionContext = transactionContext;
this.observable = observable;
this.queryValidator = queryValidator;
this.monitor = monitor;
}

@Override
Expand All @@ -72,14 +77,14 @@
return ServiceResult.badRequest(DUPLICATED_KEYS_MESSAGE);
}

return transactionContext.execute(() -> {
var createResult = index.create(asset);
if (createResult.succeeded()) {
observable.invokeForEach(l -> l.created(asset));
return ServiceResult.success(asset);
}
return ServiceResult.fromFailure(createResult);
});
logWarningWhenCatalogAssetPropertiesAreNotSet(asset);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
AssetServiceImpl.logWarningWhenCatalogAssetPropertiesAreNotSet
should be avoided because it has been deprecated.

return transactionContext.execute(() ->
index.create(asset)
.onSuccess(i -> observable.invokeForEach(l -> l.created(asset)))
.flatMap(ServiceResult::from)
.map(i -> asset)
);
}

@Override
Expand Down Expand Up @@ -108,11 +113,21 @@
return ServiceResult.badRequest(DUPLICATED_KEYS_MESSAGE);
}

return transactionContext.execute(() -> {
var updatedAsset = index.updateAsset(asset);
updatedAsset.onSuccess(a -> observable.invokeForEach(l -> l.updated(a)));
return ServiceResult.from(updatedAsset);
});
logWarningWhenCatalogAssetPropertiesAreNotSet(asset);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
AssetServiceImpl.logWarningWhenCatalogAssetPropertiesAreNotSet
should be avoided because it has been deprecated.

return transactionContext.execute(() ->
index.updateAsset(asset)
.onSuccess(a -> observable.invokeForEach(l -> l.updated(a)))
.flatMap(ServiceResult::from)
);
}

@Deprecated(since = "management-api:v4")
private void logWarningWhenCatalogAssetPropertiesAreNotSet(Asset asset) {
if (asset.isCatalog() && (asset.getProperty(DCAT_ENDPOINT_URL_ATTRIBUTE) == null || asset.getProperty(DCT_FORMAT_ATTRIBUTE) == null)) {
monitor.warning("The 'CatalogAsset' type is expecting 'endpointURL' and 'format' properties " +
"please adapt your clients accordingly");
}
}

private List<Asset> queryAssets(QuerySpec query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.edc.connector.controlplane.services.query.QueryValidator;
import org.eclipse.edc.connector.controlplane.services.spi.asset.AssetService;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.result.ServiceFailure;
Expand Down Expand Up @@ -51,6 +52,7 @@
import static org.eclipse.edc.spi.result.ServiceFailure.Reason.NOT_FOUND;
import static org.mockito.AdditionalMatchers.and;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
Expand All @@ -69,9 +71,10 @@ class AssetServiceImplTest {
private final TransactionContext dummyTransactionContext = new NoopTransactionContext();
private final AssetObservable observable = mock();
private final QueryValidator queryValidator = mock();
private final Monitor monitor = mock();

private final AssetService service = new AssetServiceImpl(index, contractNegotiationStore, dummyTransactionContext,
observable, queryValidator);
observable, queryValidator, monitor);

@Test
void findById_shouldRelyOnAssetIndex() {
Expand Down Expand Up @@ -146,6 +149,16 @@ void shouldNotCreateAssetIfItAlreadyExists() {
assertThat(inserted).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT);
}

@Test
void shouldLogWarning_whenAssetCatalogAndPropertiesNotSet() {
var asset = createAssetBuilder("assetId").property(Asset.PROPERTY_IS_CATALOG, "true").build();
when(index.create(asset)).thenReturn(StoreResult.success());

service.create(asset);

verify(monitor).warning(anyString());
}

@Test
void shouldFail_whenPropertiesAreDuplicated() {
var asset = createAssetBuilder("assetId").property("property", "value").privateProperty("property", "other-value").build();
Expand Down Expand Up @@ -246,41 +259,54 @@ private static Stream<Arguments> nonFinalStates() {
}
}

@Test
void updateAsset_shouldUpdateWhenExists() {
var asset = createAsset("assetId");
when(index.updateAsset(asset)).thenReturn(StoreResult.success(asset));
@Nested
class Update {
@Test
void shouldUpdateWhenExists() {
var asset = createAsset("assetId");
when(index.updateAsset(asset)).thenReturn(StoreResult.success(asset));

var updated = service.update(asset);
var updated = service.update(asset);

assertThat(updated.succeeded()).isTrue();
verify(index).updateAsset(eq(asset));
verifyNoMoreInteractions(index);
verify(observable).invokeForEach(any());
}
assertThat(updated.succeeded()).isTrue();
verify(index).updateAsset(eq(asset));
verifyNoMoreInteractions(index);
verify(observable).invokeForEach(any());
}

@Test
void updateAsset_shouldReturnNotFound_whenNotExists() {
var asset = createAsset("assetId");
when(index.updateAsset(eq(asset))).thenReturn(StoreResult.notFound("test"));
@Test
void shouldReturnNotFound_whenNotExists() {
var asset = createAsset("assetId");
when(index.updateAsset(eq(asset))).thenReturn(StoreResult.notFound("test"));

var updated = service.update(asset);
var updated = service.update(asset);

assertThat(updated.failed()).isTrue();
assertThat(updated.reason()).isEqualTo(NOT_FOUND);
verify(index, times(1)).updateAsset(asset);
verifyNoMoreInteractions(index);
verify(observable, never()).invokeForEach(any());
}
assertThat(updated.failed()).isTrue();
assertThat(updated.reason()).isEqualTo(NOT_FOUND);
verify(index, times(1)).updateAsset(asset);
verifyNoMoreInteractions(index);
verify(observable, never()).invokeForEach(any());
}

@Test
void updateAsset_shouldFail_whenPropertiesAreDuplicated() {
var asset = createAssetBuilder("assetId").property("property", "value").privateProperty("property", "other-value").build();
@Test
void shouldLogWarning_whenAssetCatalogAndPropertiesNotSet() {
var asset = createAssetBuilder("assetId").property(Asset.PROPERTY_IS_CATALOG, "true").build();
when(index.updateAsset(asset)).thenReturn(StoreResult.success(asset));

var result = service.update(asset);
service.update(asset);

verify(monitor).warning(anyString());
}

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(BAD_REQUEST);
verifyNoInteractions(index);
@Test
void shouldFail_whenPropertiesAreDuplicated() {
var asset = createAssetBuilder("assetId").property("property", "value").privateProperty("property", "other-value").build();

var result = service.update(asset);

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(BAD_REQUEST);
verifyNoInteractions(index);
}
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ plugins {
}

dependencies {
api(project(":spi:common:json-ld-spi"))
api(project(":spi:control-plane:catalog-spi"))
api(project(":spi:control-plane:contract-spi"))
api(project(":spi:control-plane:transfer-spi"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
Expand Down Expand Up @@ -52,6 +53,8 @@ public class CatalogCoreExtension implements ServiceExtension {

@Inject
private PolicyEngine policyEngine;
@Inject
private Monitor monitor;

@Override
public String name() {
Expand All @@ -67,7 +70,7 @@ public void initialize(ServiceExtensionContext context) {
public DatasetResolver datasetResolver() {
var contractDefinitionResolver = new ContractDefinitionResolverImpl(contractDefinitionStore, policyEngine, policyDefinitionStore);
return new DatasetResolverImpl(contractDefinitionResolver, assetIndex, policyDefinitionStore,
distributionResolver, criterionOperatorRegistry);
distributionResolver, criterionOperatorRegistry, monitor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;

Expand All @@ -30,6 +31,8 @@ public class CatalogDefaultServicesExtension implements ServiceExtension {

@Inject
private DataFlowController dataFlowController;
@Inject
private Monitor monitor;

private DataServiceRegistry dataServiceRegistry;

Expand All @@ -50,7 +53,7 @@ public DataServiceRegistry dataServiceRegistry() {

@Provider(isDefault = true)
public DistributionResolver distributionResolver() {
return new DefaultDistributionResolver(dataServiceRegistry, dataFlowController);
return new DefaultDistributionResolver(dataServiceRegistry, dataFlowController, monitor);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition;
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema;
import org.eclipse.edc.participant.spi.ParticipantAgent;
import org.eclipse.edc.participantcontext.spi.types.ParticipantContext;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.policy.model.PolicyType;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.query.QuerySpec;
import org.jetbrains.annotations.NotNull;
Expand All @@ -43,7 +43,9 @@
import java.util.stream.Stream;

import static java.lang.Integer.MAX_VALUE;
import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_ENDPOINT_URL_ATTRIBUTE;
import static org.eclipse.edc.participantcontext.spi.types.ParticipantResource.filterByParticipantContextId;
import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;

public class DatasetResolverImpl implements DatasetResolver {

Expand All @@ -52,15 +54,17 @@
private final PolicyDefinitionStore policyDefinitionStore;
private final DistributionResolver distributionResolver;
private final CriterionOperatorRegistry criterionOperatorRegistry;
private final Monitor monitor;

public DatasetResolverImpl(ContractDefinitionResolver contractDefinitionResolver, AssetIndex assetIndex,
PolicyDefinitionStore policyDefinitionStore, DistributionResolver distributionResolver,
CriterionOperatorRegistry criterionOperatorRegistry) {
CriterionOperatorRegistry criterionOperatorRegistry, Monitor monitor) {
this.contractDefinitionResolver = contractDefinitionResolver;
this.assetIndex = assetIndex;
this.policyDefinitionStore = policyDefinitionStore;
this.distributionResolver = distributionResolver;
this.criterionOperatorRegistry = criterionOperatorRegistry;
this.monitor = monitor;
}

@Override
Expand Down Expand Up @@ -104,11 +108,20 @@
return Dataset.Builder.newInstance();
}

var endpointUrl = asset.getPropertyAsString(DCAT_ENDPOINT_URL_ATTRIBUTE);
if (endpointUrl == null) {
monitor.warning("""
Asset %s has no 'endpointURL' property and the DataAddress baseUrl is used instead, please adapt it as the
DataAddress will be removed from Asset in the forthcoming versions"""
.formatted(asset.getId()));
endpointUrl = asset.getDataAddress().getStringProperty(EDC_NAMESPACE + "baseUrl", null);

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
Asset.getDataAddress
should be avoided because it has been deprecated.
}

return Catalog.Builder.newInstance()
.dataService(DataService.Builder.newInstance()
.id(Base64.getUrlEncoder().encodeToString(asset.getId().getBytes()))
.endpointDescription(asset.getDescription())
.endpointUrl(asset.getDataAddress().getStringProperty(HttpDataAddressSchema.BASE_URL, null))
.endpointUrl(endpointUrl)
.build());
}

Expand Down
Loading
Loading