Skip to content

Commit 2a5d0cb

Browse files
committed
feat(#541): upload dataset in chunks
Upload datasets in chunks and fix max payload-size in kubernetes-setup. Signed-off-by: Tobias Anker <tobias.anker@kitsunemimi.moe>
1 parent 2099830 commit 2a5d0cb

9 files changed

Lines changed: 53 additions & 20 deletions

File tree

.github/workflows/build_test.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,7 @@ jobs:
831831

832832

833833
docker_build:
834-
# if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' || github.event_name == 'push' && startsWith(github.ref, 'refs/tags/') }}
834+
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' || github.event_name == 'push' && startsWith(github.ref, 'refs/tags/') }}
835835
name: "Build Docker-images"
836836
needs: [ sdk_api_tests, cli_api_tests ]
837837
runs-on: ubuntu-24.04
@@ -912,7 +912,7 @@ jobs:
912912

913913

914914
docker_merge:
915-
# if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' || github.event_name == 'push' && startsWith(github.ref, 'refs/tags/') }}
915+
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' || github.event_name == 'push' && startsWith(github.ref, 'refs/tags/') }}
916916
name: "Merge and push Docker-image"
917917
runs-on: ubuntu-24.04
918918
needs: [ docker_build ]
@@ -964,7 +964,7 @@ jobs:
964964
965965
966966
kubernetes_test:
967-
# if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' || github.event_name == 'push' && startsWith(github.ref, 'refs/tags/') }}
967+
if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/develop' || github.event_name == 'push' && startsWith(github.ref, 'refs/tags/') }}
968968
needs: [ docker_merge ]
969969
runs-on: ubuntu-22.04
970970
strategy:

deploy/k8s/openhanami/templates/hanami-ingress.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ metadata:
66
nginx.ingress.kubernetes.io/ssl-passthrough: "true"
77
nginx.ingress.kubernetes.io/force-ssl-redirect: "true"
88
nginx.ingress.kubernetes.io/backend-protocol: HTTPS
9+
nginx.ingress.kubernetes.io/proxy-body-size: 1G
910
spec:
1011
ingressClassName: nginx
1112
rules:

deploy/k8s/openhanami/templates/hanami-nginx-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ data:
4646
# HSTS (ngx_http_headers_module is required) (15768000 seconds = 6 months)
4747
add_header Strict-Transport-Security max-age=15768000;
4848
49+
client_max_body_size 1G;
4950
# OCSP Stapling ---
5051
# fetch OCSP records from URL in ssl_certificate and cache them
5152
# ssl_stapling on;

src/hanami/src/api/http_endpoints/dataset/create_dataset_v1_0.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,10 @@ pub async fn upload_binary(mut payload: Multipart, path: Path<(String, String)>,
108108
while let Some(chunk) = field.next().await {
109109
let data = match chunk {
110110
Ok(value) => value,
111-
Err(_) => return Err(ErrorResponse::BadRequest("Failed to read chunk.".to_string())),
111+
Err(e) => {
112+
error!("{}", e);
113+
return Err(ErrorResponse::BadRequest("Failed to read chunk.".to_string()));
114+
}
112115
};
113116

114117
let _ = f.write_all(&data).await;

src/hanami/src/api/http_server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use actix_web::{App, HttpServer};
1616
use actix_web::middleware::{Logger, from_fn};
17+
use actix_web::web::PayloadConfig;
1718
use apistos::app::OpenApiWrapper;
1819
use apistos::spec::Spec;
1920
use apistos::web::{post, put, get, delete, resource, scope, Scope};
@@ -173,6 +174,7 @@ pub async fn run_server() -> Result<(), impl Error> {
173174
.document(spec)
174175
.wrap(from_fn(authorization_middleware))
175176
.wrap(Logger::default())
177+
.app_data(PayloadConfig::new(1 << 30)) // 1GB max payload-size
176178
.service(v1alpha_routes())
177179
.build("/openapi.json")
178180
})

src/sdk/go/hanami_sdk/http_request.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@ import (
3131
"io"
3232
"os"
3333
"mime/multipart"
34+
"path/filepath"
3435
)
3536

37+
const chunkSize = 1024 * 1024 // 1 MiB
38+
3639
type RequestError struct {
3740
StatusCode int
3841
Err string
@@ -192,20 +195,31 @@ func UploadFiles(address, token, path string, filePaths []string, skipTlsVerific
192195
writer := multipart.NewWriter(body)
193196
outputMap := map[string]interface{}{}
194197

195-
for _, path := range filePaths {
198+
// Open and stream each file into multipart writer
199+
for idx, path := range filePaths {
196200
file, err := os.Open(path)
197201
if err != nil {
198-
return outputMap, fmt.Errorf("failed to open file %s: %w", path, err)
202+
return outputMap, fmt.Errorf("failed to open file %s: %v", path, err)
199203
}
200204
defer file.Close()
201205

202-
part, err := writer.CreateFormFile("file", path)
206+
part, err := writer.CreateFormFile(fmt.Sprintf("file%d", idx), filepath.Base(path))
203207
if err != nil {
204-
return outputMap, fmt.Errorf("failed to create form file for %s: %w", path, err)
208+
return outputMap, fmt.Errorf("failed to create form part for %s: %v", path, err)
205209
}
206210

207-
if _, err := io.Copy(part, file); err != nil {
208-
return outputMap, fmt.Errorf("failed to copy file data for %s: %w", path, err)
211+
buf := make([]byte, chunkSize)
212+
for {
213+
n, err := file.Read(buf)
214+
if err != nil && err != io.EOF {
215+
return outputMap, fmt.Errorf("error reading file %s: %v", path, err)
216+
}
217+
if n == 0 {
218+
break
219+
}
220+
if _, err := part.Write(buf[:n]); err != nil {
221+
return outputMap, fmt.Errorf("error writing part for %s: %v", path, err)
222+
}
209223
}
210224
}
211225

src/sdk/python/hanami_sdk/hanami_sdk/hanami_request.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import requests
1616
import json
1717
import os
18+
from requests_toolbelt import MultipartEncoder
1819

1920
from . import hanami_exceptions
2021

@@ -95,19 +96,28 @@ def upload_files(token: str,
9596
path: str,
9697
file_paths,
9798
verify: bool):
98-
files = []
99-
for file_path in file_paths:
100-
file = open(file_path, 'rb')
101-
files.append(('file', (os.path.basename(file_path), file)))
102-
10399
url = f'{address}{path}'
104100
bearer_token = "Bearer " + token
105-
headers = {'Authorization': bearer_token}
101+
102+
fields = {}
103+
open_files = []
104+
105+
for i, file_path in enumerate(file_paths):
106+
f = open(file_path, 'rb')
107+
open_files.append(f) # Keep open until after upload!
108+
fields[f'file{i}'] = (os.path.basename(file_path), f, 'application/octet-stream')
109+
110+
encoder = MultipartEncoder(fields=fields)
111+
headers = {
112+
'Authorization': bearer_token,
113+
'Content-Type': encoder.content_type
114+
}
115+
106116
try:
107-
response = requests.post(url, headers=headers, files=files, verify=verify)
117+
response = requests.post(url, data=encoder, headers=headers, verify=verify)
108118
return json.loads(_handle_response(response))
109119
except requests.exceptions.RequestException as e:
110120
raise e
111121
finally:
112-
for _, (_, fileobj) in files:
113-
fileobj.close()
122+
for f in open_files:
123+
f.close()

src/sdk/python/hanami_sdk/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ protobuf==3.20.3
33
requests==2.32.3
44
simplejson==3.20.1
55
websockets==15.0.1
6+
requests_toolbelt==1.0.0

src/sdk/python/hanami_sdk/setup.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ def run(self):
3232
'protobuf==3.20.3',
3333
'requests==2.32.3',
3434
'simplejson==3.20.1',
35-
'websockets==15.0.1'],
35+
'websockets==15.0.1',
36+
'requests_toolbelt==1.0.0'],
3637
cmdclass={
3738
'install': GenerateProtobufMessages,
3839
},

0 commit comments

Comments
 (0)