Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/use-cases/http/advanced-use-cases.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,28 @@ functions:
url: true
```

#### Invoke Method RESPONSE_STREAM

In order to support Lambda Function URLs invoke method as `RESPONSE_STREAM` you need to change to settings:

* BREF_STREAMED_MODE = 1 in the lambda environment
* invokeMode: RESPONSE_STREAM in the function url settings

Like the following sample config:

```yml filename="serverless.yml"
functions:
hello:
handler: MyApp\Handlers\MyLambdaUrlHandler
# ...
environment:
BREF_STREAMED_MODE: 1
url:
invokeMode: RESPONSE_STREAM
```

Be aware that you must implement an `HttpHandler` handler, or use something like Laravel Octane handler if you are using Laravel.

### API Gateway v1 REST API

The syntax is slightly different from API Gateway v2 HTTP APIs as we must use a different `events` configuration. Here is an example that sends all requests to a single Lambda function:
Expand Down
2 changes: 1 addition & 1 deletion src/Event/Http/HttpHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ abstract class HttpHandler implements Handler
abstract public function handleRequest(HttpRequestEvent $event, Context $context): HttpResponse;

/** {@inheritDoc} */
public function handle($event, Context $context): array
public function handle($event, Context $context): array|\Generator
{
// See https://bref.sh/docs/runtimes/http.html#cold-starts
if (isset($event['warmer']) && $event['warmer'] === true) {
Expand Down
91 changes: 74 additions & 17 deletions src/Event/Http/HttpResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@
{
private int $statusCode;
private array $headers;
private string $body;
private string|\Generator $body;

/**
* @param array<string|string[]> $headers
*/
public function __construct(string $body, array $headers = [], int $statusCode = 200)
public function __construct(string|\Generator $body, array $headers = [], int $statusCode = 200)
{
$this->body = $body;
$this->headers = $headers;
$this->statusCode = $statusCode;
}

public function toApiGatewayFormat(bool $multiHeaders = false): array
public function toApiGatewayFormat(bool $multiHeaders = false): array|\Generator
{
$isStreamedMode = (bool) getenv('BREF_STREAMED_MODE');
$base64Encoding = (bool) getenv('BREF_BINARY_RESPONSES');

$headers = [];
Expand All @@ -47,19 +48,40 @@

// This is the format required by the AWS_PROXY lambda integration
// See https://stackoverflow.com/questions/43708017/aws-lambda-api-gateway-error-malformed-lambda-proxy-response
return [
'isBase64Encoded' => $base64Encoding,
'statusCode' => $this->statusCode,
$headersKey => $headers,
'body' => $base64Encoding ? base64_encode($this->body) : $this->body,
];

if ($isStreamedMode) {
return $this->yieldBody([
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably early return

'statusCode' => $this->statusCode,
$headersKey => $headers,
]);
} else {
if ($this->body instanceof \Generator) {
$dataChunk = '';

while ($this->body->valid()) {
$dataChunk .= $this->body->current();

$this->body->next();
}
} else {
$dataChunk = $this->body;
}

return [
'isBase64Encoded' => $base64Encoding,
'statusCode' => $this->statusCode,
$headersKey => $headers,
'body' => $base64Encoding ? base64_encode($dataChunk) : $dataChunk,
];
}
}

/**
* See https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html#http-api-develop-integrations-lambda.response
*/
public function toApiGatewayFormatV2(): array
public function toApiGatewayFormatV2(): array|\Generator
{
$isStreamedMode = (bool) getenv('BREF_STREAMED_MODE');
$base64Encoding = (bool) getenv('BREF_BINARY_RESPONSES');

$headers = [];
Expand All @@ -80,13 +102,33 @@
// serialized to `[]` (we want `{}`) so we force it to an empty object.
$headers = empty($headers) ? new \stdClass : $headers;

return [
'cookies' => $cookies,
'isBase64Encoded' => $base64Encoding,
'statusCode' => $this->statusCode,
'headers' => $headers,
'body' => $base64Encoding ? base64_encode($this->body) : $this->body,
];
if ($isStreamedMode) {
return $this->yieldBody([
'cookies' => $cookies,
'statusCode' => $this->statusCode,
'headers' => $headers,
]);
} else {
if ($this->body instanceof \Generator) {
$dataChunk = '';

while ($this->body->valid()) {
$dataChunk .= $this->body->current();

$this->body->next();
}
} else {
$dataChunk = $this->body;
}

return [
'cookies' => $cookies,
'isBase64Encoded' => $base64Encoding,
'statusCode' => $this->statusCode,
'headers' => $headers,
'body' => $base64Encoding ? base64_encode($dataChunk) : $dataChunk,
];
}
}

/**
Expand All @@ -98,4 +140,19 @@
$name = ucwords($name);
return str_replace(' ', '-', $name);
}

private function yieldBody($headersFormat): \Generator

Check failure on line 144 in src/Event/Http/HttpResponse.php

View workflow job for this annotation

GitHub Actions / PHP CodeSniffer

Method \Bref\Event\Http\HttpResponse::yieldBody() does not have parameter type hint nor @param annotation for its parameter $headersFormat.
{
yield json_encode($headersFormat);

yield "\0\0\0\0\0\0\0\0";

if ($this->body instanceof \Generator) {
foreach ($this->body as $dataChunk) {
yield $dataChunk;
}
} else {
yield $this->body;
}
}
}
129 changes: 128 additions & 1 deletion src/Runtime/LambdaRuntime.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Bref\Event\Handler;
use CurlHandle;
use Exception;
use Generator;
use JsonException;
use Psr\Http\Server\RequestHandlerInterface;
use RuntimeException;
Expand Down Expand Up @@ -39,6 +40,8 @@ final class LambdaRuntime
private $curlHandleNext;
/** @var resource|CurlHandle|null */
private $curlHandleResult;
/** @var resource|CurlHandle|null */
private $curlStreamedHandleResult;
private string $apiUrl;
private Invoker $invoker;
private string $layer;
Expand Down Expand Up @@ -201,7 +204,12 @@ private function waitNextInvocation(): array
private function sendResponse(string $invocationId, mixed $responseData): void
{
$url = "http://$this->apiUrl/2018-06-01/runtime/invocation/$invocationId/response";
$this->postJson($url, $responseData);

if ($responseData instanceof Generator) {
$this->postStreamed($url, $responseData);
} else {
$this->postJson($url, $responseData);
}
}

/**
Expand Down Expand Up @@ -281,6 +289,117 @@ public function failInitialization(
exit(1);
}

/**
* @param string[] $headers
* @throws Exception
* @throws ResponseTooBig
*/
private function postStreamed(string $url, Generator $data, array $headers = []): void
{
if ($this->curlStreamedHandleResult === null) {
$this->curlStreamedHandleResult = curl_init();
curl_setopt($this->curlStreamedHandleResult, CURLOPT_UPLOAD, true);
curl_setopt($this->curlStreamedHandleResult, CURLOPT_CUSTOMREQUEST, 'POST');
curl_setopt($this->curlStreamedHandleResult, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
curl_setopt($this->curlStreamedHandleResult, CURLOPT_RETURNTRANSFER, true);
curl_setopt($this->curlStreamedHandleResult, CURLOPT_INFILESIZE, -1);
}

curl_setopt($this->curlStreamedHandleResult, CURLOPT_URL, $url);
curl_setopt($this->curlStreamedHandleResult, CURLOPT_HTTPHEADER, [
'Lambda-Runtime-Function-Response-Mode: streaming',
'Content-Type: application/vnd.awslambda.http-integration-response',
'Transfer-Encoding: chunked',
...$headers,
]);

if (PHP_VERSION_ID < 80100) {
$buffer = '';
curl_setopt(
$this->curlStreamedHandleResult,
CURLOPT_READFUNCTION,
function ($ch, $fd, $length) use (&$data, &$buffer) {
if (strlen($buffer) < $length && $data->valid()) {
$buffer .= (string) $data->current();

/*
As this method needs to return an string, we need to wait for the next generator item to yield.
This can lead to the initial part of the buffer taking longer to load if the next chunk takes longer.
*/
$data->next();
}

$chunk = substr($buffer, 0, $length);
$buffer = substr($buffer, strlen($chunk));

return $chunk;
}
);
} else {
$buffer = '';
/*
* We use Fibers so we can suspend the yields and read data as needed.
* That way we don't block the response as more data comes.
*/
$fiber = new \Fiber(
function () use (&$data): void {
foreach ($data as $dataChunk) {
\Fiber::suspend((string) $dataChunk);
}

\Fiber::suspend(PHP_INT_MIN);
}
);

curl_setopt(
$this->curlStreamedHandleResult,
CURLOPT_READFUNCTION,
function ($ch, $fd, $length) use (&$fiber, &$buffer) {
if ($buffer === '') {
if ($fiber->isStarted() || $fiber->isSuspended()) {
$fiberChunk = $fiber->resume();
} elseif (! $fiber->isTerminated()) {
$fiberChunk = $fiber->start();
} else {
$fiberChunk = PHP_INT_MIN;
}

if ($fiberChunk !== PHP_INT_MIN) {
$buffer .= $fiberChunk;
}
}

$chunk = substr($buffer, 0, $length);
$buffer = substr($buffer, strlen($chunk));

return $chunk;
}
);
}

$body = curl_exec($this->curlStreamedHandleResult);

$statusCode = curl_getinfo($this->curlStreamedHandleResult, CURLINFO_HTTP_CODE);
if ($statusCode >= 400) {
// Re-open the connection in case of failure to start from a clean state
$this->closeCurlStreamedHandleResult();

if ($statusCode === 413) {
throw new ResponseTooBig;
}

try {
$error = json_decode($body, true, 512, JSON_THROW_ON_ERROR);
$errorMessage = "{$error['errorType']}: {$error['errorMessage']}";
} catch (JsonException) {
// In case we didn't get any JSON
$errorMessage = 'unknown error';
}

throw new Exception("Error $statusCode while calling the Lambda runtime API: $errorMessage");
}
}

/**
* @param string[] $headers
* @throws Exception
Expand Down Expand Up @@ -350,6 +469,14 @@ private function closeCurlHandleResult(): void
}
}

private function closeCurlStreamedHandleResult(): void
{
if ($this->curlStreamedHandleResult !== null) {
curl_close($this->curlStreamedHandleResult);
$this->curlStreamedHandleResult = null;
}
}

/**
* Ping a Bref server with a statsd request.
*
Expand Down
Loading
Loading