Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
477b438
Added support for Active/Passive LivyHA
RogPodge Aug 24, 2019
1a2bd91
Fixed error with the conf
RogPodge Aug 26, 2019
61e06f2
fixed error with livy conf
RogPodge Aug 26, 2019
945f5ce
formatting fixes
RogPodge Aug 26, 2019
de6f885
added back another missing configuration
RogPodge Aug 26, 2019
048c1a2
fixed style errors
RogPodge Aug 27, 2019
31df79a
spelling comment resolution
RogPodge Aug 27, 2019
dd5da8b
fixed string format error
RogPodge Aug 28, 2019
c73bdea
reverted spelling change and addressed threading code comment
RogPodge Sep 5, 2019
b8a4d17
style fix
RogPodge Sep 5, 2019
af0cf82
style fixes
RogPodge Sep 19, 2019
46276f2
Merge remote-tracking branch 'upstream/master'
RogPodge Feb 4, 2020
5c806fc
merged with master
RogPodge Mar 9, 2020
19e266b
added a unit test spec suite for the curator elector service
RogPodge Mar 10, 2020
972ca3d
retooled livy HA to also do domain redirection
RogPodge Mar 11, 2020
fdf449d
updated specs to account for the start()/init() split
RogPodge Mar 11, 2020
636ac05
scalastyle fixes
RogPodge Mar 12, 2020
2423e4d
fixed typo
RogPodge Mar 12, 2020
14741eb
fixed remaining scalastyle error
RogPodge Mar 12, 2020
de250f5
updated minicluster.scala to reflect the init() and start() split
RogPodge Mar 13, 2020
988c219
refactored code to idenitfy the current server based on hostname and …
RogPodge Mar 17, 2020
ff3fd42
Merge branch 'LivyHA' of https://github.com/RogPodge/incubator-livy i…
RogPodge Mar 17, 2020
dd47e37
fixed typo
RogPodge Mar 17, 2020
fe79e75
changed code to account for zookeeper manager changes
RogPodge Mar 17, 2020
8178eef
removed extraneous file
RogPodge Mar 17, 2020
eb44f46
fixed scalastyle error
RogPodge Mar 18, 2020
9b1f21b
reduced verbosity of redirect messages
RogPodge Mar 24, 2020
1643f05
Merge branch 'LivyHA' of https://github.com/RogPodge/incubator-livy i…
RogPodge Mar 24, 2020
6caf0cc
improved descriptiveness of log messages
RogPodge Mar 24, 2020
0e2379b
refactored variable name
RogPodge Mar 24, 2020
dd87210
A few renamings (Synchronizing with local commit 2ca2ae69)
Jul 23, 2020
112ab27
Fixing Livy HA by appending the query string to the redirect URL
Jul 23, 2020
cbdaaaf
Fixing Livy leader domain omission issue
Jul 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@
# on user request and then livy server classpath automatically.
# livy.repl.enable-hive-context =

# High Availability mode of Livy. Possible values:
# off: Default. Turn off High Availability.
# on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the
# correct state.
# Must set livy.server.ha.zookeeper-url to configure HA
# livy.server.ha.mode = off
Comment thread
RogPodge marked this conversation as resolved.

# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
# livy.server.ha.zookeeper-url =

# Recovery mode of Livy. Possible values:
# off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions.
# recovery: Livy persists session info to the state store. When Livy restarts, it recovers
Expand Down
13 changes: 13 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,19 @@ object LivyConf {
* zookeeper: Store state in a Zookeeper instance.
*/
val RECOVERY_STATE_STORE = Entry("livy.server.recovery.state-store", null)

/**
* High Availability mode of Livy. Possible values:
* off: Default. Turn off High Availability.
* on: Livy uses Zookeeper as a state store to ensure a livy server is always available with the
* correct state.
* Must set livy.server.ha.zookeeper-url to configure HA
*/
val HA_MODE = Entry("livy.server.ha.mode", "off")

// For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
val HA_ZOOKEEPER_URL = Entry("livy.server.ha.zookeeper-url", "")

/**
* For filesystem state store, the path of the state store directory. Please don't use a
* filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.server

import java.io.Closeable
import java.io.IOException
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.leader.LeaderLatch
import org.apache.curator.framework.recipes.leader.LeaderLatchListener
import org.apache.curator.retry.RetryNTimes

import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.LivyConf.Entry

object CuratorElectorService {
val HA_KEY_PREFIX_CONF = Entry("livy.server.ha.key-prefix", "livy_ha")
val HA_RETRY_CONF = Entry("livy.server.ha.retry-policy", "5,100")
}

class CuratorElectorService(livyConf : LivyConf, livyServer : LivyServer)
extends LeaderLatchListener
with Logging
{

import CuratorElectorService._

val haAddress = livyConf.get(LivyConf.HA_ZOOKEEPER_URL)
require(!haAddress.isEmpty, s"Please configure ${LivyConf.HA_ZOOKEEPER_URL.key}.")
val haKeyPrefix = livyConf.get(HA_KEY_PREFIX_CONF)
val retryValue = livyConf.get(HA_RETRY_CONF)
// a regex to match patterns like "m, n" where m and n both are integer values
val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be easier to have two config values?
retry_count and sleep_between_retries_ms

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2 configs should be paired together in the config for clarity I believe. Also we're following the example ins the ZooKeeperStateStore.scala

@o-shevchenko o-shevchenko Aug 28, 2019

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,
just FYI
I have moved this configuration to ZooKeeperManager in #189. Replaced with two separated configs.
Perhaps, we need to merge this PR after #189 or I need to refactor my code to use your approach if this PR will be merged first (but I think that two separated properties more clear then parsing one property) to don't do the same work (like configuration refactoring).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, we can adjust based on whichever PR is merged first.

val retryPolicy = retryValue match {
case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
case _ => throw new IllegalArgumentException(
s"$HA_KEY_PREFIX_CONF contains bad value: $retryValue. " +
"Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
}

val client: CuratorFramework = CuratorFrameworkFactory.newClient(haAddress, retryPolicy)
val leaderKey = s"/$haKeyPrefix/leader"

var server : LivyServer = livyServer
Comment thread
RogPodge marked this conversation as resolved.
Outdated

var leaderLatch = new LeaderLatch(client, leaderKey)
leaderLatch.addListener(this)

object HAState extends Enumeration{
type HAState = Value
val Active, Standby = Value
}
var currentState = HAState.Standby

def isLeader() {
Comment thread
RogPodge marked this conversation as resolved.
Outdated
transitionToActive();
}

def notLeader(){
Comment thread
RogPodge marked this conversation as resolved.
Outdated
Comment thread
RogPodge marked this conversation as resolved.
Outdated
transitionToStandby();
}

def start() : Unit = {
Comment thread
RogPodge marked this conversation as resolved.
Outdated
transitionToStandby()

client.start()
leaderLatch.start()
leaderLatch.await()

// This instance is now the leader. Joining the webserver to the main thread
info("starting join")
Comment thread
RogPodge marked this conversation as resolved.
Outdated
server.join()
info("join completed?")
close()
}

def close() : Unit = {
Comment thread
RogPodge marked this conversation as resolved.
Outdated
transitionToStandby();
leaderLatch.close();
}

def transitionToActive() : Unit = {
Comment thread
RogPodge marked this conversation as resolved.
Outdated
info("Transitioning to Active state")
if(currentState == HAState.Active){
Comment thread
RogPodge marked this conversation as resolved.
Outdated
info("Already in Active State")
}
else {
server.start()
currentState = HAState.Active
info("Transition complete")
}
}

def transitionToStandby() : Unit = {
Comment thread
RogPodge marked this conversation as resolved.
Outdated
info("Transitioning to Standby state")
if(currentState == HAState.Standby){
Comment thread
RogPodge marked this conversation as resolved.
Outdated
info("Already in Standby State");
}
else {
server.stop();
currentState = HAState.Standby
info("Transition complete");
}
}
}
28 changes: 21 additions & 7 deletions server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

import org.apache.curator.utils.CloseableUtils
import org.apache.hadoop.security.{SecurityUtil, UserGroupInformation}
import org.apache.hadoop.security.authentication.server._
import org.eclipse.jetty.servlet.FilterHolder
Expand All @@ -38,7 +39,7 @@ import org.scalatra.servlet.{MultipartConfig, ServletApiImplicits}
import org.apache.livy._
import org.apache.livy.server.batch.BatchSessionServlet
import org.apache.livy.server.interactive.InteractiveSessionServlet
import org.apache.livy.server.recovery.{SessionStore, StateStore}
import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperStateStore}
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
Expand Down Expand Up @@ -394,16 +395,29 @@ class LivyServer extends Logging {
}
}

object HighAvailabilitySettings {
val HA_ON = "on"
val HA_OFF = "off"
}

object LivyServer {

def main(args: Array[String]): Unit = {
val server = new LivyServer()
try {
server.start()
server.join()
} finally {
server.stop()
val livyConf = new LivyConf().loadFromFile("livy.conf")

if(livyConf.get(LivyConf.HA_MODE) == HighAvailabilitySettings.HA_ON){
Comment thread
RogPodge marked this conversation as resolved.
Outdated
info("Starting HA connection")
val electorService: CuratorElectorService = new CuratorElectorService(livyConf, server)
electorService.start()
}
else {
try {
server.start()
server.join()
} finally {
server.stop()
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ZooKeeperStateStore(
}

private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
require(!zkAddress.isEmpty, s"Please configure ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
Comment thread
RogPodge marked this conversation as resolved.
Outdated
private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF)
private val retryValue = livyConf.get(ZK_RETRY_CONF)
// a regex to match patterns like "m, n" where m and n both are integer values
Expand Down