-
Notifications
You must be signed in to change notification settings - Fork 1.2k
improve http graceful termination #4957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,8 @@ | |
|
|
||
| package org.apache.openwhisk.http | ||
|
|
||
| import akka.actor.ActorSystem | ||
| import akka.Done | ||
| import akka.actor.{ActorSystem, CoordinatedShutdown} | ||
| import akka.event.Logging | ||
| import akka.http.scaladsl.{Http, HttpConnectionContext} | ||
| import akka.http.scaladsl.model.{HttpRequest, _} | ||
|
|
@@ -29,17 +30,23 @@ import kamon.metric.MeasurementUnit | |
| import spray.json._ | ||
| import org.apache.openwhisk.common.Https.HttpsConfig | ||
| import org.apache.openwhisk.common._ | ||
|
|
||
| import akka.pattern.after | ||
| import org.apache.openwhisk.common.TransactionId.systemPrefix | ||
| import pureconfig._ | ||
| import pureconfig.generic.auto._ | ||
| import scala.collection.immutable.Seq | ||
| import scala.concurrent.duration.DurationInt | ||
| import scala.concurrent.{Await, Future} | ||
| import scala.concurrent.duration.{DurationInt, FiniteDuration} | ||
| import scala.concurrent.Future | ||
| import scala.util.{Failure, Success} | ||
|
|
||
| case class BasicHttpServiceConfig(shutdownUnreadyDelay: FiniteDuration, shutdownTerminationLimit: FiniteDuration) | ||
|
|
||
| /** | ||
| * This trait extends the Akka Directives and Actor with logging and transaction counting | ||
| * facilities common to all OpenWhisk REST services. | ||
| */ | ||
| trait BasicHttpService extends Directives { | ||
|
|
||
| implicit val logging: Logging | ||
| val OW_EXTRA_LOGGING_HEADER = "X-OW-EXTRA-LOGGING" | ||
|
|
||
| /** | ||
|
|
@@ -156,24 +163,50 @@ trait BasicHttpService extends Directives { | |
| } | ||
|
|
||
| object BasicHttpService { | ||
| implicit val tid = TransactionId(systemPrefix + "http_service") | ||
| //start with ready true | ||
| protected[http] var ready = true | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this used for anything?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
|
|
||
| /** | ||
| * Starts an HTTP(S) route handler on given port and registers a shutdown hook. | ||
| */ | ||
| def startHttpService(route: Route, port: Int, config: Option[HttpsConfig] = None, interface: String = "0.0.0.0")( | ||
| def startHttpService(route: Route, port: Int, httpsConfig: Option[HttpsConfig] = None, interface: String = "0.0.0.0")( | ||
| implicit actorSystem: ActorSystem, | ||
| materializer: ActorMaterializer): Unit = { | ||
| val connectionContext = config.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext) | ||
| materializer: ActorMaterializer, | ||
| logging: Logging): Unit = { | ||
| val connectionContext = httpsConfig.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext) | ||
| val httpBinding = Http().bindAndHandle(route, interface, port, connectionContext = connectionContext) | ||
| logging.info(this, "starting http service...") | ||
| addShutdownHook(httpBinding) | ||
| } | ||
|
|
||
| def addShutdownHook(binding: Future[Http.ServerBinding])(implicit actorSystem: ActorSystem, | ||
| materializer: ActorMaterializer): Unit = { | ||
| def addShutdownHook(binding: Future[Http.ServerBinding], | ||
| httpServiceConfig: BasicHttpServiceConfig = | ||
| loadConfigOrThrow[BasicHttpServiceConfig]("whisk.http"))(implicit actorSystem: ActorSystem, | ||
| materializer: ActorMaterializer, | ||
| logging: Logging): Unit = { | ||
| implicit val executionContext = actorSystem.dispatcher | ||
| sys.addShutdownHook { | ||
| Await.result(binding.map(_.unbind()), 30.seconds) | ||
| Await.result(actorSystem.whenTerminated, 30.seconds) | ||
|
|
||
| CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "http_unready") { () => | ||
| logging.info(this, "shutdown unready...") | ||
| //return 503 status at /ready endpoint for some time before actual termination begins | ||
| ready = false | ||
| after(httpServiceConfig.shutdownUnreadyDelay, actorSystem.scheduler) { | ||
| logging.info(this, "shutdown unready complete...") | ||
| Future.successful(Done) | ||
| } | ||
| } | ||
| CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseServiceUnbind, "http_termination") { () => | ||
| logging.info(this, "shutdown terminating...") | ||
| binding | ||
| .flatMap(_.terminate(hardDeadline = httpServiceConfig.shutdownTerminationLimit)) | ||
| .andThen { | ||
| case Success(_) => logging.info(this, "shutdown termination complete...") | ||
| case Failure(t) => logging.info(this, s"shutdown termination failed... ${t}") | ||
| } | ||
| .map { _ => | ||
| Done | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,15 +18,16 @@ | |
| package org.apache.openwhisk.http | ||
|
|
||
| import akka.event.Logging | ||
| import org.apache.openwhisk.common.{MetricsRoute, TransactionId} | ||
| import akka.http.scaladsl.model.StatusCodes | ||
| import org.apache.openwhisk.common.{Logging, MetricsRoute, TransactionId} | ||
|
|
||
| /** | ||
| * This trait extends the BasicHttpService with a standard "ping" endpoint which | ||
| * responds to health queries, intended for monitoring. | ||
| */ | ||
| trait BasicRasService extends BasicHttpService { | ||
| class BasicRasService(implicit val logging: Logging) extends BasicHttpService { | ||
|
|
||
| override def routes(implicit transid: TransactionId) = ping ~ MetricsRoute() | ||
| override def routes(implicit transid: TransactionId) = ping ~ ready ~ MetricsRoute() | ||
|
|
||
| override def loglevelForRoute(route: String): Logging.LogLevel = { | ||
| if (route == "/ping" || route == "/metrics") { | ||
|
|
@@ -39,4 +40,14 @@ trait BasicRasService extends BasicHttpService { | |
| val ping = path("ping") { | ||
| get { complete("pong") } | ||
| } | ||
| val ready = path("ready") { | ||
| get { | ||
| if (BasicHttpService.ready) { | ||
| complete("ok") | ||
| } else { | ||
| logging.warn(this, "not ready...") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this just be a debug? I think ready route returning 503 should be self explanatory |
||
| complete(StatusCodes.ServiceUnavailable, "notready") | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems it would wait for 15 seconds, and start the termination phase.
But if there are still client connections connected, are they getting connection closed error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only allows an unready stage, which only affects the added /ready endpoint to return 503 (no other endpoints affected). This does not affect how akka http handles existing client connections during shutdown, so if akka allows gracefully terminating in flight requests, then no connection closed errors, but no new connections allowed. The purpose here is to allow external systems (e.g. kubernetes or reverse proxies) to probe the service and stop routing to this instance for some time before akka shutdown initiates.