diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java index 2c544716f951..8f6711ccb845 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/RPC.java @@ -83,7 +83,7 @@ public class RPC { final static int RPC_SERVICE_CLASS_DEFAULT = 0; public enum RpcKind { RPC_BUILTIN ((short) 1), // Used for built in calls by tests - RPC_WRITABLE ((short) 2), // Use WritableRpcEngine + RPC_WRITABLE ((short) 2), // ignored RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine final static short MAX_SIZE = RPC_PROTOCOL_BUFFER.value; // used for array size private final short value; @@ -216,8 +216,7 @@ static synchronized RpcEngine getProtocolEngine(Class protocol, Configuration conf) { RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { - Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), - WritableRpcEngine.class); + Class impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), ProtobufRpcEngine.class); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); PROTOCOL_ENGINES.put(protocol, engine); } @@ -1034,6 +1033,19 @@ Map getProtocolImplMap(RPC.RpcKind rpcKind) { return protocolImplMapArray.get(rpcKind.ordinal()); } + /** + * Returns {@code true} only if at least one protocol has been registered + * on this server instance for the given {@link RPC.RpcKind}. + * Used to reject incoming requests for unsupported RPC kinds before any + * deserialization of the request payload takes place. + * @param rpcKind the RPC kind from the incoming request header. + * @return {@code true} if at least one protocol is registered for this kind. + */ + boolean hasRegisteredProtocols(RPC.RpcKind rpcKind) { + Map implMap = getProtocolImplMap(rpcKind); + return implMap != null && !implMap.isEmpty(); + } + // Register protocol and its impl for rpc calls void registerProtocolAndImpl(RpcKind rpcKind, Class protocolClass, Object protocolImpl) { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java index e0e4517ad584..2f767ae4bdf8 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/Server.java @@ -2654,15 +2654,33 @@ private void checkRpcHeaders(RpcRequestHeaderProto header) private void processRpcRequest(RpcRequestHeaderProto header, RpcWritable.Buffer buffer) throws RpcServerException, InterruptedException { - Class rpcRequestClass = + if (header.getRpcKind() == RpcKindProto.RPC_WRITABLE) { + final String err = "WritableRpcEngine is not supported."; + LOG.warn("{} Client: {}", err, getHostAddress()); + throw new FatalRpcServerException(RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err); + } + // Reject requests for RPC kinds with no registered protocols on this + // server instance. This prevents deserialization of untrusted payloads + // for unsupported kinds. See HADOOP-19864. + if (Server.this instanceof RPC.Server) { + RPC.Server server = (RPC.Server) Server.this; + final RPC.RpcKind kind = ProtoUtil.convert(header.getRpcKind()); + if (!server.hasRegisteredProtocols(kind)) { + final String err = "No protocols registered on this server for RpcKind " + + header.getRpcKind() + + ". Rejecting request without deserialization."; + LOG.info("{} Client: {}", err, getHostAddress()); + throw new FatalRpcServerException( + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err); + } + } + Class rpcRequestClass = getRpcRequestWrapper(header.getRpcKind()); if (rpcRequestClass == null) { - LOG.warn("Unknown rpc kind " + header.getRpcKind() + - " from client " + getHostAddress()); - final String err = "Unknown rpc kind in rpc header" + - header.getRpcKind(); + LOG.warn("Unknown rpc kind {} from client {}", header.getRpcKind(), getHostAddress()); throw new FatalRpcServerException( - RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, err); + RpcErrorCodeProto.FATAL_INVALID_RPC_HEADER, + "Unknown rpc kind in rpc header " + header.getRpcKind()); } Writable rpcRequest; try { //Read the rpc request @@ -2670,12 +2688,12 @@ private void processRpcRequest(RpcRequestHeaderProto header, } catch (RpcServerException rse) { // lets tests inject failures. throw rse; } catch (Throwable t) { // includes runtime exception from newInstance - LOG.warn("Unable to read call parameters for client " + - getHostAddress() + "on connection protocol " + - this.protocolName + " for rpcKind " + header.getRpcKind(), t); - String err = "IPC server unable to read call parameters: "+ t.getMessage(); + LOG.warn( + "Unable to read call parameters for client {} on connection protocol {} for rpcKind {}", + getHostAddress(), this.protocolName, header.getRpcKind(), t); throw new FatalRpcServerException( - RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err); + RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, + "IPC server unable to read call parameters: "+ t.getMessage()); } CallerContext callerContext = null; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/WritableRpcEngine.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/WritableRpcEngine.java deleted file mode 100644 index d23e59b4a1fa..000000000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ipc_/WritableRpcEngine.java +++ /dev/null @@ -1,630 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ipc_; - -import java.lang.reflect.Proxy; -import java.lang.reflect.Method; -import java.lang.reflect.InvocationTargetException; - -import java.net.InetSocketAddress; -import java.io.*; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.net.SocketFactory; - -import org.apache.hadoop.io.*; -import org.apache.hadoop.io.retry.RetryPolicy; -import org.apache.hadoop.ipc_.Client.ConnectionId; -import org.apache.hadoop.ipc_.RPC.RpcInvoker; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.Time; -import org.apache.hadoop.conf.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** An RpcEngine implementation for Writable data. */ -@Deprecated -public class WritableRpcEngine implements RpcEngine { - private static final Logger LOG = LoggerFactory.getLogger(RPC.class); - - //writableRpcVersion should be updated if there is a change - //in format of the rpc messages. - - // 2L - added declared class to Invocation - public static final long writableRpcVersion = 2L; - - /** - * Whether or not this class has been initialized. - */ - private static boolean isInitialized = false; - - static { - ensureInitialized(); - } - - /** - * Initialize this class if it isn't already. - */ - public static synchronized void ensureInitialized() { - if (!isInitialized) { - initialize(); - } - } - - /** - * Register the rpcRequest deserializer for WritableRpcEngine - */ - private static synchronized void initialize() { - org.apache.hadoop.ipc_.Server.registerProtocolEngine(RPC.RpcKind.RPC_WRITABLE, - Invocation.class, new Server.WritableRpcInvoker()); - isInitialized = true; - } - - - /** A method invocation, including the method name and its parameters.*/ - private static class Invocation implements Writable, Configurable { - private String methodName; - private Class[] parameterClasses; - private Object[] parameters; - private Configuration conf; - private long clientVersion; - private int clientMethodsHash; - private String declaringClassProtocolName; - - //This could be different from static writableRpcVersion when received - //at server, if client is using a different version. - private long rpcVersion; - - @SuppressWarnings("unused") // called when deserializing an invocation - public Invocation() {} - - public Invocation(Method method, Object[] parameters) { - this.methodName = method.getName(); - this.parameterClasses = method.getParameterTypes(); - this.parameters = parameters; - rpcVersion = writableRpcVersion; - if (method.getDeclaringClass().equals(VersionedProtocol.class)) { - //VersionedProtocol is exempted from version check. - clientVersion = 0; - clientMethodsHash = 0; - } else { - this.clientVersion = RPC.getProtocolVersion(method.getDeclaringClass()); - this.clientMethodsHash = ProtocolSignature.getFingerprint(method - .getDeclaringClass().getMethods()); - } - this.declaringClassProtocolName = - RPC.getProtocolName(method.getDeclaringClass()); - } - - /** The name of the method invoked. */ - public String getMethodName() { return methodName; } - - /** The parameter classes. */ - public Class[] getParameterClasses() { return parameterClasses; } - - /** The parameter instances. */ - public Object[] getParameters() { return parameters; } - - private long getProtocolVersion() { - return clientVersion; - } - - @SuppressWarnings("unused") - private int getClientMethodsHash() { - return clientMethodsHash; - } - - /** - * Returns the rpc version used by the client. - * @return rpcVersion - */ - public long getRpcVersion() { - return rpcVersion; - } - - @Override - @SuppressWarnings("deprecation") - public void readFields(DataInput in) throws IOException { - rpcVersion = in.readLong(); - declaringClassProtocolName = UTF8.readString(in); - methodName = UTF8.readString(in); - clientVersion = in.readLong(); - clientMethodsHash = in.readInt(); - parameters = new Object[in.readInt()]; - parameterClasses = new Class[parameters.length]; - ObjectWritable objectWritable = new ObjectWritable(); - for (int i = 0; i < parameters.length; i++) { - parameters[i] = - ObjectWritable.readObject(in, objectWritable, this.conf); - parameterClasses[i] = objectWritable.getDeclaredClass(); - } - } - - @Override - @SuppressWarnings("deprecation") - public void write(DataOutput out) throws IOException { - out.writeLong(rpcVersion); - UTF8.writeString(out, declaringClassProtocolName); - UTF8.writeString(out, methodName); - out.writeLong(clientVersion); - out.writeInt(clientMethodsHash); - out.writeInt(parameterClasses.length); - for (int i = 0; i < parameterClasses.length; i++) { - ObjectWritable.writeObject(out, parameters[i], parameterClasses[i], - conf, true); - } - } - - @Override - public String toString() { - StringBuilder buffer = new StringBuilder(); - buffer.append(methodName); - buffer.append("("); - for (int i = 0; i < parameters.length; i++) { - if (i != 0) - buffer.append(", "); - buffer.append(parameters[i]); - } - buffer.append(")"); - buffer.append(", rpc version="+rpcVersion); - buffer.append(", client version="+clientVersion); - buffer.append(", methodsFingerPrint="+clientMethodsHash); - return buffer.toString(); - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - @Override - public Configuration getConf() { - return this.conf; - } - - } - - private static ClientCache CLIENTS=new ClientCache(); - - private static class Invoker implements RpcInvocationHandler { - private Client.ConnectionId remoteId; - private Client client; - private boolean isClosed = false; - private final AtomicBoolean fallbackToSimpleAuth; - private final AlignmentContext alignmentContext; - - public Invoker(Class protocol, - InetSocketAddress address, UserGroupInformation ticket, - Configuration conf, SocketFactory factory, - int rpcTimeout, AtomicBoolean fallbackToSimpleAuth, - AlignmentContext alignmentContext) - throws IOException { - this.remoteId = Client.ConnectionId.getConnectionId(address, protocol, - ticket, rpcTimeout, null, conf); - this.client = CLIENTS.getClient(conf, factory); - this.fallbackToSimpleAuth = fallbackToSimpleAuth; - this.alignmentContext = alignmentContext; - } - - @Override - public Object invoke(Object proxy, Method method, Object[] args) - throws Throwable { - long startTime = 0; - if (LOG.isDebugEnabled()) { - startTime = Time.monotonicNow(); - } - - ObjectWritable value = (ObjectWritable) - client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args), - remoteId, fallbackToSimpleAuth, alignmentContext); - if (LOG.isDebugEnabled()) { - long callTime = Time.monotonicNow() - startTime; - LOG.debug("Call: " + method.getName() + " " + callTime); - } - return value.get(); - } - - /* close the IPC client that's responsible for this invoker's RPCs */ - @Override - synchronized public void close() { - if (!isClosed) { - isClosed = true; - CLIENTS.stopClient(client); - } - } - - @Override - public ConnectionId getConnectionId() { - return remoteId; - } - } - - // for unit testing only - static Client getClient(Configuration conf) { - return CLIENTS.getClient(conf); - } - - /** - * Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. - * @param Generics Type T - * @param protocol input protocol. - * @param clientVersion input clientVersion. - * @param addr input addr. - * @param ticket input ticket. - * @param conf input configuration. - * @param factory input factory. - * @param rpcTimeout input rpcTimeout. - * @param connectionRetryPolicy input connectionRetryPolicy. - * @throws IOException raised on errors performing I/O. - */ - @Override - public ProtocolProxy getProxy(Class protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory, - int rpcTimeout, RetryPolicy connectionRetryPolicy) - throws IOException { - return getProxy(protocol, clientVersion, addr, ticket, conf, factory, - rpcTimeout, connectionRetryPolicy, null, null); - } - - /** - * Construct a client-side proxy object with a ConnectionId. - * - * @param Generics Type T. - * @param protocol input protocol. - * @param clientVersion input clientVersion. - * @param connId input ConnectionId. - * @param conf input Configuration. - * @param factory input factory. - * @throws IOException raised on errors performing I/O. - * @return ProtocolProxy. - */ - @Override - public ProtocolProxy getProxy(Class protocol, long clientVersion, - Client.ConnectionId connId, Configuration conf, SocketFactory factory) - throws IOException { - return getProxy(protocol, clientVersion, connId.getAddress(), - connId.getTicket(), conf, factory, connId.getRpcTimeout(), - connId.getRetryPolicy(), null, null); - } - - /** - * Construct a client-side proxy object that implements the named protocol, - * talking to a server at the named address. - * @param Generics Type. - * @param protocol input protocol. - * @param clientVersion input clientVersion. - * @param addr input addr. - * @param ticket input ticket. - * @param conf input configuration. - * @param factory input factory. - * @param rpcTimeout input rpcTimeout. - * @param connectionRetryPolicy input connectionRetryPolicy. - * @param fallbackToSimpleAuth input fallbackToSimpleAuth. - * @param alignmentContext input alignmentContext. - * @return ProtocolProxy. - */ - @Override - @SuppressWarnings("unchecked") - public ProtocolProxy getProxy(Class protocol, long clientVersion, - InetSocketAddress addr, UserGroupInformation ticket, - Configuration conf, SocketFactory factory, - int rpcTimeout, RetryPolicy connectionRetryPolicy, - AtomicBoolean fallbackToSimpleAuth, - AlignmentContext alignmentContext) - throws IOException { - - if (connectionRetryPolicy != null) { - throw new UnsupportedOperationException( - "Not supported: connectionRetryPolicy=" + connectionRetryPolicy); - } - - T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), - new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, - factory, rpcTimeout, fallbackToSimpleAuth, alignmentContext)); - return new ProtocolProxy(protocol, proxy, true); - } - - /* Construct a server for a protocol implementation instance listening on a - * port and address. */ - @Override - public RPC.Server getServer(Class protocolClass, - Object protocolImpl, String bindAddress, int port, - int numHandlers, int numReaders, int queueSizePerHandler, - boolean verbose, Configuration conf, - SecretManager secretManager, - String portRangeConfig, AlignmentContext alignmentContext) - throws IOException { - return new Server(protocolClass, protocolImpl, conf, bindAddress, port, - numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, - portRangeConfig, alignmentContext); - } - - - /** An RPC Server. */ - @Deprecated - public static class Server extends RPC.Server { - /** - * Construct an RPC server. - * @param instance the instance whose methods will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * - * @deprecated Use #Server(Class, Object, Configuration, String, int) - * @throws IOException raised on errors performing I/O. - */ - @Deprecated - public Server(Object instance, Configuration conf, String bindAddress, - int port) throws IOException { - this(null, instance, conf, bindAddress, port); - } - - - /** Construct an RPC server. - * @param protocolClass class - * @param protocolImpl the instance whose methods will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * @throws IOException raised on errors performing I/O. - */ - public Server(Class protocolClass, Object protocolImpl, - Configuration conf, String bindAddress, int port) - throws IOException { - this(protocolClass, protocolImpl, conf, bindAddress, port, 1, -1, -1, - false, null, null); - } - - /** - * Construct an RPC server. - * @param protocolImpl the instance whose methods will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * @param numHandlers the number of method handler threads to run - * @param verbose whether each call should be logged - * @param numReaders input numberReaders. - * @param queueSizePerHandler input queueSizePerHandler. - * @param secretManager input secretManager. - * - * @deprecated use Server#Server(Class, Object, - * Configuration, String, int, int, int, int, boolean, SecretManager) - * @throws IOException raised on errors performing I/O. - */ - @Deprecated - public Server(Object protocolImpl, Configuration conf, String bindAddress, - int port, int numHandlers, int numReaders, int queueSizePerHandler, - boolean verbose, SecretManager secretManager) - throws IOException { - this(null, protocolImpl, conf, bindAddress, port, - numHandlers, numReaders, queueSizePerHandler, verbose, - secretManager, null); - - } - - /** - * Construct an RPC server. - * @param protocolClass - the protocol being registered - * can be null for compatibility with old usage (see below for details) - * @param protocolImpl the protocol impl that will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * @param numHandlers the number of method handler threads to run - * @param verbose whether each call should be logged - * @param secretManager input secretManager. - * @param queueSizePerHandler input queueSizePerHandler. - * @param portRangeConfig input portRangeConfig. - * @param numReaders input numReaders. - * - * @deprecated use Server#Server(Class, Object, - * Configuration, String, int, int, int, int, boolean, SecretManager) - * @throws IOException raised on errors performing I/O. - */ - @Deprecated - public Server(Class protocolClass, Object protocolImpl, - Configuration conf, String bindAddress, int port, - int numHandlers, int numReaders, int queueSizePerHandler, - boolean verbose, SecretManager secretManager, - String portRangeConfig) - throws IOException { - this(null, protocolImpl, conf, bindAddress, port, - numHandlers, numReaders, queueSizePerHandler, verbose, - secretManager, null, null); - } - - /** - * Construct an RPC server. - * @param protocolClass - the protocol being registered - * can be null for compatibility with old usage (see below for details) - * @param protocolImpl the protocol impl that will be called - * @param conf the configuration to use - * @param bindAddress the address to bind on to listen for connection - * @param port the port to listen for connections on - * @param numHandlers the number of method handler threads to run - * @param verbose whether each call should be logged - * @param alignmentContext provides server state info on client responses - * @param numReaders input numReaders. - * @param portRangeConfig input portRangeConfig. - * @param queueSizePerHandler input queueSizePerHandler. - * @param secretManager input secretManager. - * @throws IOException raised on errors performing I/O. - */ - public Server(Class protocolClass, Object protocolImpl, - Configuration conf, String bindAddress, int port, - int numHandlers, int numReaders, int queueSizePerHandler, - boolean verbose, SecretManager secretManager, - String portRangeConfig, AlignmentContext alignmentContext) - throws IOException { - super(bindAddress, port, null, numHandlers, numReaders, - queueSizePerHandler, conf, - serverNameFromClass(protocolImpl.getClass()), secretManager, - portRangeConfig); - setAlignmentContext(alignmentContext); - this.verbose = verbose; - - - Class[] protocols; - if (protocolClass == null) { // derive protocol from impl - /* - * In order to remain compatible with the old usage where a single - * target protocolImpl is suppled for all protocol interfaces, and - * the protocolImpl is derived from the protocolClass(es) - * we register all interfaces extended by the protocolImpl - */ - protocols = RPC.getProtocolInterfaces(protocolImpl.getClass()); - - } else { - if (!protocolClass.isAssignableFrom(protocolImpl.getClass())) { - throw new IOException("protocolClass "+ protocolClass + - " is not implemented by protocolImpl which is of class " + - protocolImpl.getClass()); - } - // register protocol class and its super interfaces - registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, protocolClass, protocolImpl); - protocols = RPC.getProtocolInterfaces(protocolClass); - } - for (Class p : protocols) { - if (!p.equals(VersionedProtocol.class)) { - registerProtocolAndImpl(RPC.RpcKind.RPC_WRITABLE, p, protocolImpl); - } - } - - } - - private static void log(String value) { - if (value!= null && value.length() > 55) - value = value.substring(0, 55)+"..."; - LOG.info(value); - } - - @Deprecated - static class WritableRpcInvoker implements RpcInvoker { - - @Override - public Writable call(org.apache.hadoop.ipc_.RPC.Server server, - String protocolName, Writable rpcRequest, long receivedTime) - throws IOException, RPC.VersionMismatch { - - Invocation call = (Invocation)rpcRequest; - if (server.verbose) log("Call: " + call); - - // Verify writable rpc version - if (call.getRpcVersion() != writableRpcVersion) { - // Client is using a different version of WritableRpc - throw new RpcServerException( - "WritableRpc version mismatch, client side version=" - + call.getRpcVersion() + ", server side version=" - + writableRpcVersion); - } - - long clientVersion = call.getProtocolVersion(); - final String protoName; - ProtoClassProtoImpl protocolImpl; - if (call.declaringClassProtocolName.equals(VersionedProtocol.class.getName())) { - // VersionProtocol methods are often used by client to figure out - // which version of protocol to use. - // - // Versioned protocol methods should go the protocolName protocol - // rather than the declaring class of the method since the - // the declaring class is VersionedProtocol which is not - // registered directly. - // Send the call to the highest protocol version - VerProtocolImpl highest = server.getHighestSupportedProtocol( - RPC.RpcKind.RPC_WRITABLE, protocolName); - if (highest == null) { - throw new RpcServerException("Unknown protocol: " + protocolName); - } - protocolImpl = highest.protocolTarget; - } else { - protoName = call.declaringClassProtocolName; - - // Find the right impl for the protocol based on client version. - ProtoNameVer pv = - new ProtoNameVer(call.declaringClassProtocolName, clientVersion); - protocolImpl = - server.getProtocolImplMap(RPC.RpcKind.RPC_WRITABLE).get(pv); - if (protocolImpl == null) { // no match for Protocol AND Version - VerProtocolImpl highest = - server.getHighestSupportedProtocol(RPC.RpcKind.RPC_WRITABLE, - protoName); - if (highest == null) { - throw new RpcServerException("Unknown protocol: " + protoName); - } else { // protocol supported but not the version that client wants - throw new RPC.VersionMismatch(protoName, clientVersion, - highest.version); - } - } - } - - // Invoke the protocol method - Exception exception = null; - Call currentCall = Server.getCurCall().get(); - try { - Method method = - protocolImpl.protocolClass.getMethod(call.getMethodName(), - call.getParameterClasses()); - method.setAccessible(true); - server.rpcDetailedMetrics.init(protocolImpl.protocolClass); - currentCall.setDetailedMetricsName(call.getMethodName()); - Object value = - method.invoke(protocolImpl.protocolImpl, call.getParameters()); - if (server.verbose) log("Return: "+value); - return new ObjectWritable(method.getReturnType(), value); - - } catch (InvocationTargetException e) { - Throwable target = e.getTargetException(); - if (target instanceof IOException) { - exception = (IOException)target; - throw (IOException)target; - } else { - IOException ioe = new IOException(target.toString()); - ioe.setStackTrace(target.getStackTrace()); - exception = ioe; - throw ioe; - } - } catch (Throwable e) { - if (!(e instanceof IOException)) { - LOG.error("Unexpected throwable object ", e); - } - IOException ioe = new IOException(e.toString()); - ioe.setStackTrace(e.getStackTrace()); - exception = ioe; - throw ioe; - } finally { - if (exception != null) { - currentCall.setDetailedMetricsName( - exception.getClass().getSimpleName()); - } - } - } - } - } - - @Override - public ProtocolProxy getProtocolMetaInfoProxy( - ConnectionId connId, Configuration conf, SocketFactory factory) - throws IOException { - throw new UnsupportedOperationException("This proxy is not supported"); - } -} diff --git a/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto b/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto index a803ff68f97c..72e6f9c924f5 100644 --- a/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto +++ b/hadoop-hdds/interface-client/src/main/proto/RpcHeader.proto @@ -47,7 +47,7 @@ package hadoop.common; */ enum RpcKindProto { RPC_BUILTIN = 0; // Used for built in calls by tests - RPC_WRITABLE = 1; // Use WritableRpcEngine + RPC_WRITABLE = 1; // ignored RPC_PROTOCOL_BUFFER = 2; // Use ProtobufRpcEngine }