Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
Expand All @@ -104,6 +103,7 @@
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.AmpleImpl;
import org.apache.accumulo.core.rpc.RpcService;
import org.apache.accumulo.core.rpc.SaslConnectionParams;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.security.Authorizations;
Expand Down Expand Up @@ -469,9 +469,9 @@ public Map<String,Pair<UUID,ResourceGroupId>> getScanServers() {
Optional<ServiceLockData> sld = ServiceLock.getLockData(getZooCache(), path, stat);
if (sld.isPresent()) {
final ServiceLockData data = sld.orElseThrow();
final String addr = data.getAddressString(ThriftService.TABLET_SCAN);
final UUID uuid = data.getServerUUID(ThriftService.TABLET_SCAN);
final ResourceGroupId group = data.getGroup(ThriftService.TABLET_SCAN);
final String addr = data.getAddressString(RpcService.TABLET_SCAN);
final UUID uuid = data.getServerUUID(RpcService.TABLET_SCAN);
final ResourceGroupId group = data.getGroup(RpcService.TABLET_SCAN);
liveScanServers.put(addr, new Pair<>(uuid, group));
}
} catch (IllegalArgumentException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.rpc.RpcService;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService.Client;
Expand Down Expand Up @@ -570,7 +570,7 @@ private Set<ServerId> getServers(ServerId.Type type,
Optional<ServiceLockData> sld = context.getZooCache().getLockData(mon);
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.NONE);
location = sld.orElseThrow().getAddressString(RpcService.NONE);
if (location != null && addressSelector.getPredicate().test(location)) {
HostAndPort hp = HostAndPort.fromString(location);
results.add(new ServerId(type, ResourceGroupId.DEFAULT, hp.getHost(), hp.getPort()));
Expand All @@ -584,7 +584,7 @@ private Set<ServerId> getServers(ServerId.Type type,
Optional<ServiceLockData> sld = context.getZooCache().getLockData(gc);
String location = null;
if (sld.isPresent()) {
location = sld.orElseThrow().getAddressString(ThriftService.GC);
location = sld.orElseThrow().getAddressString(RpcService.GC);
if (location != null && addressSelector.getPredicate().test(location)) {
HostAndPort hp = HostAndPort.fromString(location);
results.add(new ServerId(type, ResourceGroupId.DEFAULT, hp.getHost(), hp.getPort()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.rpc.clients.TServerClient.Exec;
import org.apache.accumulo.core.rpc.clients.TServerClient.ExecVoid;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.Exec;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes.ExecVoid;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.NamespacePermission;
import org.apache.accumulo.core.security.SystemPermission;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,43 +32,25 @@
import java.util.stream.Collectors;

import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.core.rpc.RpcService;
import org.apache.accumulo.core.util.AddressUtil;

import com.google.common.net.HostAndPort;

public class ServiceLockData implements Comparable<ServiceLockData> {

/**
* Thrift Service list
*/
public static enum ThriftService {
CLIENT,
COORDINATOR,
COMPACTOR,
FATE_CLIENT,
FATE_WORKER,
GC,
MANAGER,
NONE,
TABLET_INGEST,
TABLET_MANAGEMENT,
TABLET_SCAN,
TSERV
}

/**
* An object that describes a process, the group assigned to that process, the Thrift service and
* the address to use to communicate with that service.
* An object that describes a process, the group assigned to that process, the Thrift rpc service
* and the address to use to communicate with that service.
*/
public static class ServiceDescriptor {

private final UUID uuid;
private final ThriftService service;
private final RpcService service;
private final String address;
private final ResourceGroupId group;

public ServiceDescriptor(UUID uuid, ThriftService service, String address,
ResourceGroupId group) {
public ServiceDescriptor(UUID uuid, RpcService service, String address, ResourceGroupId group) {
this.uuid = requireNonNull(uuid);
this.service = requireNonNull(service);
this.address = requireNonNull(address);
Expand All @@ -79,7 +61,7 @@ public UUID getUUID() {
return uuid;
}

public ThriftService getService() {
public RpcService getService() {
return service;
}

Expand Down Expand Up @@ -124,7 +106,7 @@ private String serialize() {
}

/**
* A set of ServiceDescriptor's
* A set of ServiceDescriptors
*/
public static class ServiceDescriptors {
private final Set<ServiceDescriptor> descriptors;
Expand All @@ -146,34 +128,34 @@ public Set<ServiceDescriptor> getServices() {
}
}

private final EnumMap<ThriftService,ServiceDescriptor> services;
private final EnumMap<RpcService,ServiceDescriptor> services;

public ServiceLockData(ServiceDescriptors sds) {
this.services = new EnumMap<>(ThriftService.class);
this.services = new EnumMap<>(RpcService.class);
sds.getServices().forEach(sd -> this.services.put(sd.getService(), sd));
}

public ServiceLockData(UUID uuid, String address, ThriftService service, ResourceGroupId group) {
public ServiceLockData(UUID uuid, String address, RpcService service, ResourceGroupId group) {
this(new ServiceDescriptors(new HashSet<>(
Collections.singleton(new ServiceDescriptor(uuid, service, address, group)))));
}

public String getAddressString(ThriftService service) {
public String getAddressString(RpcService service) {
ServiceDescriptor sd = services.get(service);
return sd == null ? null : sd.getAddress();
}

public HostAndPort getAddress(ThriftService service) {
public HostAndPort getAddress(RpcService service) {
String s = getAddressString(service);
return s == null ? null : AddressUtil.parseAddress(s);
}

public ResourceGroupId getGroup(ThriftService service) {
public ResourceGroupId getGroup(RpcService service) {
ServiceDescriptor sd = services.get(service);
return sd == null ? null : sd.getGroup();
}

public UUID getServerUUID(ThriftService service) {
public UUID getServerUUID(RpcService service) {
ServiceDescriptor sd = services.get(service);
return sd == null ? null : sd.getUUID();
}
Expand Down Expand Up @@ -227,15 +209,15 @@ private static ServiceDescriptors deserialize(ServiceDescriptorsGson json) {

private static class ServiceDescriptorGson {
private UUID uuid;
private ThriftService service;
private RpcService service;
private String address;
private String group;

// default constructor required for Gson
@SuppressWarnings("unused")
public ServiceDescriptorGson() {}

public ServiceDescriptorGson(UUID uuid, ThriftService service, String address, String group) {
public ServiceDescriptorGson(UUID uuid, RpcService service, String address, String group) {
this.uuid = uuid;
this.service = service;
this.address = address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ResourceGroupPredicate;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
Expand All @@ -93,6 +92,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.UserCompactionRequestedColumnFamily;
import org.apache.accumulo.core.rpc.RpcService;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.zookeeper.ZcStat;
Expand Down Expand Up @@ -849,7 +849,7 @@ private static Optional<TServerInstance> checkTabletServer(ClientContext context
ZcStat stat = new ZcStat();
log.trace("Checking server at ZK path: {}", slp);
return ServiceLock.getLockData(context.getZooCache(), slp, stat)
.map(sld -> sld.getAddress(ServiceLockData.ThriftService.TSERV))
.map(sld -> sld.getAddress(RpcService.TSERV))
.map(address -> new TServerInstance(address, stat.getEphemeralOwner()));
}

Expand Down
42 changes: 42 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/rpc/RpcService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.rpc;

import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;

/**
* This is an enum containing all rpc service types used by Thrift. These are used by
* {@link ThriftClientTypes} and {@link ServiceLockData}
*/
public enum RpcService {
CLIENT,
COORDINATOR,
COMPACTOR,
FATE_CLIENT,
FATE_WORKER,
GC,
MANAGER,
NONE,
TABLET_INGEST,
TABLET_MANAGEMENT,
TABLET_SCAN,
TSERV,
SERVER_PROCESS
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.rpc.RpcService;
import org.apache.accumulo.core.util.Pair;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
Expand All @@ -36,15 +36,15 @@ public class ClientServiceThriftClient extends ThriftClientTypes<Client>
private static final Logger LOG = LoggerFactory.getLogger(ClientServiceThriftClient.class);
private final AtomicBoolean warnedAboutTServersBeingDown = new AtomicBoolean(false);

ClientServiceThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
ClientServiceThriftClient(RpcService service) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment in ThriftClientTypes regarding the service name and the multiplexed connection. I don't think the constructors for the ThriftClientTypes need to take an argument at this point, there is only one valid argument, RpcService.CLIENT.

The argument was a String in case we needed to change it, but things are working as expected. I think we just want to use something meaningful and short.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it could change to

ClientServiceThriftClient() {
  super(RpcService.CLIENT, new Client.Factory());
}

super(service, new Client.Factory());
}

@Override
public Pair<String,Client> getThriftServerConnection(ClientContext context,
boolean preferCachedConnections) throws TTransportException {
return getThriftServerConnection(LOG, this, context, preferCachedConnections,
warnedAboutTServersBeingDown, ThriftService.CLIENT);
warnedAboutTServersBeingDown, RpcService.CLIENT);
Comment thread
ddanielr marked this conversation as resolved.
Outdated
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
package org.apache.accumulo.core.rpc.clients;

import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService.Client;
import org.apache.accumulo.core.rpc.RpcService;

public class CompactionCoordinatorServiceThriftClient extends ThriftClientTypes<Client> {

CompactionCoordinatorServiceThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
CompactionCoordinatorServiceThriftClient(RpcService service) {
super(service, new Client.Factory());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
package org.apache.accumulo.core.rpc.clients;

import org.apache.accumulo.core.compaction.thrift.CompactorService.Client;
import org.apache.accumulo.core.rpc.RpcService;

public class CompactorServiceThriftClient extends ThriftClientTypes<Client> {

CompactorServiceThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
CompactorServiceThriftClient(RpcService service) {
super(service, new Client.Factory());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@

import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.manager.thrift.FateService.Client;
import org.apache.accumulo.core.rpc.RpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FateThriftClient extends ThriftClientTypes<Client> implements ManagerClient<Client> {

private static Logger LOG = LoggerFactory.getLogger(FateThriftClient.class);

FateThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
FateThriftClient(RpcService service) {
super(service, new Client.Factory());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package org.apache.accumulo.core.rpc.clients;

import org.apache.accumulo.core.manager.thrift.FateWorkerService;
import org.apache.accumulo.core.rpc.RpcService;

/**
* Client side object that can be used to interact with fate operations, which are supported by any
* manager process.
*/
public class FateWorkerThriftClient extends ThriftClientTypes<FateWorkerService.Client> {
FateWorkerThriftClient(String serviceName) {
super(serviceName, new FateWorkerService.Client.Factory());
FateWorkerThriftClient(RpcService service) {
super(service, new FateWorkerService.Client.Factory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
package org.apache.accumulo.core.rpc.clients;

import org.apache.accumulo.core.gc.thrift.GCMonitorService.Client;
import org.apache.accumulo.core.rpc.RpcService;

public class GCMonitorServiceThriftClient extends ThriftClientTypes<Client> {

GCMonitorServiceThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
GCMonitorServiceThriftClient(RpcService service) {
super(service, new Client.Factory());
}

}
Loading