Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions api/src/main/java/org/apache/livy/LivyClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public final class LivyClientBuilder {

public static final String LIVY_URI_KEY = "livy.uri";
public static final String LIVY_SESSION_ID_KEY = "livy.sessionId";

private static final ServiceLoader<LivyClientFactory> CLIENT_FACTORY_LOADER =
ServiceLoader.load(LivyClientFactory.class, classLoader());
Expand Down Expand Up @@ -96,11 +97,33 @@ public LivyClientBuilder(boolean loadDefaults) throws IOException {
}
}

/**
* Set the URI of the Livy server the client will connect to.
Comment thread
tmnd1991 marked this conversation as resolved.
Outdated
* If the URI contains <pre>sessions/{sessionId}</pre>,
* the client will connect to the specified existing session,
Comment thread
tmnd1991 marked this conversation as resolved.
Outdated
* otherwise it will create a new session.
*
Comment thread
mgaido91 marked this conversation as resolved.
* @param uri The URI of Livy server.
* @return The builder itself.
*/
public LivyClientBuilder setURI(URI uri) {
config.setProperty(LIVY_URI_KEY, uri.toString());
return this;
}

/**
* Sets the sessionId the client will connect to. If sessionId is set,
Comment thread
tmnd1991 marked this conversation as resolved.
Outdated
* all Spark configurations will be ignored and the original session ones will be used.
Comment thread
tmnd1991 marked this conversation as resolved.
Outdated
* If not set, a new session will be created when the client is built.
*
Comment thread
mgaido91 marked this conversation as resolved.
* @param sessionId The ID of the session to attach to.
* @return the builder itself.
*/
public LivyClientBuilder setSessionId(int sessionId) {
config.setProperty(LIVY_SESSION_ID_KEY, String.valueOf(sessionId));
return this;
}

public LivyClientBuilder setConf(String key, String value) {
if (value != null) {
config.setProperty(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.livy.Job;
import org.apache.livy.JobHandle;
import org.apache.livy.LivyClient;
import org.apache.livy.LivyClientBuilder;
import org.apache.livy.client.common.Serializer;
import static org.apache.livy.client.common.HttpMessages.*;

Expand All @@ -59,9 +60,17 @@ class HttpClient implements LivyClient {
// unused.
Matcher m = Pattern.compile("(.*)" + LivyConnection.SESSIONS_URI + "/([0-9]+)")
.matcher(uri.getPath());
String sessionIdFromConf = httpConf.get(LivyClientBuilder.LIVY_SESSION_ID_KEY);

try {
if (m.matches()) {
if (sessionIdFromConf != null && m.matches()) {
throw new IllegalArgumentException(
"Cannot set existing session both from URI and configuration");
} else if (sessionIdFromConf != null) {
this.conn = new LivyConnection(uri, httpConf);
this.sessionId = Integer.parseInt(sessionIdFromConf);
conn.post(null, SessionInfo.class, "/%d/connect", sessionId);
} else if (m.matches()) {
URI base = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(),
m.group(1), uri.getQuery(), uri.getFragment());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,34 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
testJob(false, response = Some(null))
}

withClient("should connect to existing sessions") {
var sid = client.asInstanceOf[HttpClient].getSessionId()
withClient("should connect to existing sessions using the URI") {
val sid = client.asInstanceOf[HttpClient].getSessionId()
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" +
s"${LivyConnection.SESSIONS_URI}/$sid"
val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).build()
newClient.stop(false)
verify(session, never()).stop()
}

withClient("should connect to existing sessions using the conf") {
val sid = client.asInstanceOf[HttpClient].getSessionId()
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}"
val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).setSessionId(sid).build()
newClient.stop(false)
verify(session, never()).stop()
}

withClient("should throw an exception if the sessionId is set through conf and URI") {
val sid = client.asInstanceOf[HttpClient].getSessionId()
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" +
s"${LivyConnection.SESSIONS_URI}/$sid"
intercept[IllegalArgumentException]{
val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).setSessionId(sid).build()
newClient.stop(false)
verify(session, never()).stop()
}
}

withClient("should tear down clients") {
client.stop(true)
verify(session, times(1)).stop()
Expand Down