Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 36 additions & 14 deletions pkg/http/api/status/status.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,3 @@
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package status

import (
Expand All @@ -31,8 +17,28 @@
switch {
case cerrors.Is(err, pipeline.ErrNameMissing):
code = codes.InvalidArgument
case cerrors.Is(err, pipeline.ErrIDMissing):
code = codes.InvalidArgument
case cerrors.Is(err, pipeline.ErrNameAlreadyExists):
code = codes.AlreadyExists
case cerrors.Is(err, pipeline.ErrInvalidCharacters):
code = codes.InvalidArgument
case cerrors.Is(err, pipeline.ErrNameOverLimit):
code = codes.InvalidArgument
case cerrors.Is(err, pipeline.ErrIDOverLimit):
code = codes.InvalidArgument
case cerrors.Is(err, pipeline.ErrDescriptionOverLimit):
code = codes.InvalidArgument
case cerrors.Is(err, pipeline.ErrInstanceNotFound):
code = codes.NotFound
case cerrors.Is(err, pipeline.ErrConnectorIDNotFound):
code = codes.NotFound
case cerrors.Is(err, pipeline.ErrProcessorIDNotFound):
code = codes.NotFound
case cerrors.Is(err, pipeline.ErrInvalidPipelineStructure):
code = codes.FailedPrecondition
case cerrors.Is(err, pipeline.ErrInvalidDLQConfig):
code = codes.InvalidArgument
default:
code = codeFromError(err)
}
Expand All @@ -48,6 +54,14 @@
code = codes.InvalidArgument
case cerrors.Is(err, connector.ErrInstanceNotFound):
code = codes.NotFound
case cerrors.Is(err, connector.ErrNameMissing):
code = codes.InvalidArgument
case cerrors.Is(err, connector.ErrIDMissing):
code = codes.InvalidArgument
case cerrors.Is(err, connector.ErrNameAlreadyExists):

Check failure on line 61 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: connector.ErrNameAlreadyExists

Check failure on line 61 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: connector.ErrNameAlreadyExists

Check failure on line 61 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: connector.ErrNameAlreadyExists

Check failure on line 61 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: connector.ErrNameAlreadyExists

Check failure on line 61 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: connector.ErrNameAlreadyExists

Check failure on line 61 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: connector.ErrNameAlreadyExists
code = codes.AlreadyExists
case cerrors.Is(err, connector.ErrPipelineIDMissing):

Check failure on line 63 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: connector.ErrPipelineIDMissing

Check failure on line 63 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: connector.ErrPipelineIDMissing

Check failure on line 63 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: connector.ErrPipelineIDMissing

Check failure on line 63 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: connector.ErrPipelineIDMissing
code = codes.InvalidArgument
default:
code = codeFromError(err)
}
Expand All @@ -63,6 +77,14 @@
code = codes.InvalidArgument
case cerrors.Is(err, processor.ErrInstanceNotFound):
code = codes.NotFound
case cerrors.Is(err, processor.ErrNameMissing):

Check failure on line 80 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: processor.ErrNameMissing

Check failure on line 80 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: processor.ErrNameMissing

Check failure on line 80 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: processor.ErrNameMissing

Check failure on line 80 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: processor.ErrNameMissing
code = codes.InvalidArgument
case cerrors.Is(err, processor.ErrIDMissing):

Check failure on line 82 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: processor.ErrIDMissing

Check failure on line 82 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: processor.ErrIDMissing

Check failure on line 82 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: processor.ErrIDMissing

Check failure on line 82 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: processor.ErrIDMissing
code = codes.InvalidArgument
case cerrors.Is(err, processor.ErrNameAlreadyExists):

Check failure on line 84 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: processor.ErrNameAlreadyExists

Check failure on line 84 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: processor.ErrNameAlreadyExists

Check failure on line 84 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: processor.ErrNameAlreadyExists

Check failure on line 84 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: processor.ErrNameAlreadyExists
code = codes.AlreadyExists
case cerrors.Is(err, processor.ErrParentIDMissing):

Check failure on line 86 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: processor.ErrParentIDMissing

Check failure on line 86 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: processor.ErrParentIDMissing) (typecheck)

Check failure on line 86 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / test

undefined: processor.ErrParentIDMissing

Check failure on line 86 in pkg/http/api/status/status.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: processor.ErrParentIDMissing) (typecheck)
code = codes.InvalidArgument
default:
code = codeFromError(err)
}
Expand Down
44 changes: 23 additions & 21 deletions pkg/pipeline/errors.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
// Copyright © 2022 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline

import "github.com/conduitio/conduit/pkg/foundation/cerrors"
import (
"fmt"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
)

var (
ErrGracefulShutdown = cerrors.New("graceful shutdown")
Expand All @@ -26,10 +16,22 @@ var (
ErrNameMissing = cerrors.New("must provide a pipeline name")
ErrIDMissing = cerrors.New("must provide a pipeline ID")
ErrNameAlreadyExists = cerrors.New("pipeline name already exists")
ErrInvalidCharacters = cerrors.New("pipeline ID contains invalid characters")
ErrNameOverLimit = cerrors.New("pipeline name is over the character limit (64)")
ErrIDOverLimit = cerrors.New("pipeline ID is over the character limit (64)")
ErrDescriptionOverLimit = cerrors.New("pipeline description is over the character limit (8192)")
ErrConnectorIDNotFound = cerrors.New("connector ID not found")
ErrProcessorIDNotFound = cerrors.New("processor ID not found")

// ErrInvalidCharacters is returned when a pipeline ID contains invalid characters.
ErrInvalidCharacters = fmt.Errorf("pipeline ID contains invalid characters, allowed characters: %s", idRegex.String())
// ErrNameOverLimit is returned when a pipeline name is over the character limit.
ErrNameOverLimit = fmt.Errorf("pipeline name is over the character limit (%d)", NameLengthLimit)
// ErrIDOverLimit is returned when a pipeline ID is over the character limit.
ErrIDOverLimit = fmt.Errorf("pipeline ID is over the character limit (%d)", IDLengthLimit)
// ErrDescriptionOverLimit is returned when a pipeline description is over the character limit.
ErrDescriptionOverLimit = fmt.Errorf("pipeline description is over the character limit (%d)", DescriptionLengthLimit)

ErrConnectorIDNotFound = cerrors.New("connector ID not found in pipeline")
ErrProcessorIDNotFound = cerrors.New("processor ID not found in pipeline")

// ErrInvalidPipelineStructure is returned when a pipeline has an invalid structure,
// e.g. trying to start a pipeline without any source connectors.
ErrInvalidPipelineStructure = cerrors.New("invalid pipeline structure")
// ErrInvalidDLQConfig is returned when the DLQ configuration is invalid.
ErrInvalidDLQConfig = cerrors.New("invalid DLQ configuration")
)
18 changes: 14 additions & 4 deletions pkg/pipeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,16 @@ func (s *Service) UpdateDLQ(ctx context.Context, pipelineID string, cfg DLQ) (*I
}

if cfg.Plugin == "" {
return nil, cerrors.New("DLQ plugin must be provided")
return nil, cerrors.Errorf("%w: plugin must be provided", ErrInvalidDLQConfig)
}
if cfg.WindowSize < 0 {
return nil, cerrors.New("DLQ window size must be non-negative")
return nil, cerrors.Errorf("%w: window size must be non-negative", ErrInvalidDLQConfig)
}
if cfg.WindowNackThreshold < 0 {
return nil, cerrors.New("DLQ window nack threshold must be non-negative")
return nil, cerrors.Errorf("%w: window nack threshold must be non-negative", ErrInvalidDLQConfig)
}
if cfg.WindowSize > 0 && cfg.WindowSize <= cfg.WindowNackThreshold {
return nil, cerrors.New("DLQ window nack threshold must be lower than window size")
return nil, cerrors.Errorf("%w: window nack threshold must be lower than window size", ErrInvalidDLQConfig)
}

pl.DLQ = cfg
Expand All @@ -206,6 +206,11 @@ func (s *Service) AddConnector(ctx context.Context, pipelineID string, connector
if err != nil {
return nil, err
}
for _, id := range pl.ConnectorIDs {
if id == connectorID {
return nil, cerrors.Errorf("connector with ID %q already exists in pipeline %q", connectorID, pipelineID)
}
}
pl.ConnectorIDs = append(pl.ConnectorIDs, connectorID)
pl.UpdatedAt = time.Now()
err = s.store.Set(ctx, pl.ID, pl)
Expand Down Expand Up @@ -250,6 +255,11 @@ func (s *Service) AddProcessor(ctx context.Context, pipelineID string, processor
if err != nil {
return nil, err
}
for _, id := range pl.ProcessorIDs {
if id == processorID {
return nil, cerrors.Errorf("processor with ID %q already exists in pipeline %q", processorID, pipelineID)
}
}
pl.ProcessorIDs = append(pl.ProcessorIDs, processorID)
pl.UpdatedAt = time.Now()
err = s.store.Set(ctx, pl.ID, pl)
Expand Down
Loading