Skip to content

Commit c08f6e7

Browse files
committed
Merge branch 'develop'
2 parents 0689722 + d41842f commit c08f6e7

8 files changed

Lines changed: 182 additions & 41 deletions

File tree

src/main/scala/com/quantifind/kafka/OffsetGetter.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.quantifind.kafka
33
import com.quantifind.kafka.core._
44
import com.quantifind.kafka.offsetapp.OffsetGetterArgs
55
import com.quantifind.kafka.OffsetGetter.{BrokerInfo, KafkaInfo, OffsetInfo}
6+
import com.quantifind.utils.ZkUtilsWrapper
67
import com.twitter.util.Time
78

89
import java.util.concurrent.atomic.AtomicBoolean
@@ -21,11 +22,9 @@ import scala.util.control.NonFatal
2122
case class Node(name: String, children: Seq[Node] = Seq())
2223

2324
case class TopicDetails(consumers: Seq[ConsumerDetail])
24-
2525
case class TopicDetailsWrapper(consumers: TopicDetails)
2626

2727
case class TopicAndConsumersDetails(active: Seq[KafkaInfo], inactive: Seq[KafkaInfo])
28-
2928
case class TopicAndConsumersDetailsWrapper(consumers: TopicAndConsumersDetails)
3029

3130
case class ConsumerDetail(name: String)
@@ -34,7 +33,7 @@ trait OffsetGetter extends Logging {
3433

3534
val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
3635

37-
def zkUtils: ZkUtils
36+
def zkUtils: ZkUtilsWrapper
3837

3938
// kind of interface methods
4039
def getTopicList(group: String): List[String]
@@ -217,7 +216,7 @@ object OffsetGetter {
217216
}
218217

219218
val kafkaOffsetListenerStarted: AtomicBoolean = new AtomicBoolean(false)
220-
var zkUtils: ZkUtils = null
219+
var zkUtils: ZkUtilsWrapper = null
221220
var consumerConnector: ConsumerConnector = null
222221
var newKafkaConsumer: KafkaConsumer[String, String] = null
223222

@@ -231,7 +230,7 @@ object OffsetGetter {
231230
def getInstance(args: OffsetGetterArgs): OffsetGetter = {
232231

233232
if (kafkaOffsetListenerStarted.compareAndSet(false, true)) {
234-
zkUtils = createZkUtils(args)
233+
zkUtils = new ZkUtilsWrapper(createZkUtils(args))
235234

236235
args.offsetStorage.toLowerCase match {
237236

@@ -245,7 +244,7 @@ object OffsetGetter {
245244

246245
args.offsetStorage.toLowerCase match {
247246
case "kafka" =>
248-
new KafkaOffsetGetter(zkUtils, args)
247+
new KafkaOffsetGetter(args)
249248
case "storm" =>
250249
new StormOffsetGetter(zkUtils, args.stormZKOffsetBase)
251250
case _ =>

src/main/scala/com/quantifind/kafka/core/KafkaOffsetGetter.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import scala.concurrent.{Await, Future, duration}
2525
/**
2626
* Created by rcasey on 11/16/2016.
2727
*/
28-
class KafkaOffsetGetter(theZkUtils: ZkUtils, args: OffsetGetterArgs) extends OffsetGetter {
28+
class KafkaOffsetGetter(args: OffsetGetterArgs) extends OffsetGetter {
2929

3030
import KafkaOffsetGetter._
3131

32-
override val zkUtils = theZkUtils
32+
// TODO: We will get all data from the Kafka broker in this class. This is here simply to satisfy
33+
// the OffsetGetter dependency until it can be refactored.
34+
override val zkUtils = null
3335

3436
override def processPartition(group: String, topic: String, pid: Int): Option[OffsetInfo] = {
3537

@@ -47,8 +49,13 @@ class KafkaOffsetGetter(theZkUtils: ZkUtils, args: OffsetGetterArgs) extends Off
4749
val lag: Long = logEndOffset.get - committedOffset
4850
val logEndOffsetReported: Long = if (lag < 0) committedOffset - lag else logEndOffset.get
4951

50-
val client: Option[ClientGroup] = Option(clients.filter(c => (c.group == group && c.topicPartitions.contains(topicAndPartition))).head)
51-
val clientString: Option[String] = if (client.isDefined) Option(client.get.clientId + client.get.clientHost) else Option("NA")
52+
// Get client information if we can find an associated client
53+
var clientString: Option[String] = Option("NA")
54+
val filteredClients = clients.filter(c => (c.group == group && c.topicPartitions.contains(topicAndPartition)))
55+
if (!filteredClients.isEmpty) {
56+
val client: ClientGroup = filteredClients.head
57+
clientString = Option(client.clientId + client.clientHost)
58+
}
5259

5360
OffsetInfo(group = group,
5461
topic = topic,

src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ package com.quantifind.kafka.core
22

33
import com.quantifind.kafka.OffsetGetter
44
import com.quantifind.kafka.OffsetGetter.OffsetInfo
5+
import com.quantifind.utils.ZkUtilsWrapper
56
import com.twitter.util.Time
67
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
78
import kafka.common.TopicAndPartition
8-
import kafka.utils.{Json, ZkUtils}
9+
import kafka.utils.{Json}
910
import org.I0Itec.zkclient.exception.ZkNoNodeException
1011
import org.apache.zookeeper.data.Stat
1112

@@ -15,7 +16,7 @@ import scala.util.control.NonFatal
1516
/**
1617
* a version that manages offsets saved by Storm Kafka Spout
1718
*/
18-
class StormOffsetGetter(theZkUtils: ZkUtils, zkOffsetBase: String) extends OffsetGetter {
19+
class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extends OffsetGetter {
1920

2021
override val zkUtils = theZkUtils
2122

src/main/scala/com/quantifind/kafka/core/ZKOffsetGetter.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package com.quantifind.kafka.core
22

33
import com.quantifind.kafka.OffsetGetter
4-
import OffsetGetter.OffsetInfo
4+
import com.quantifind.kafka.OffsetGetter.OffsetInfo
5+
import com.quantifind.utils.ZkUtilsWrapper
6+
57
import com.twitter.util.Time
68
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
79
import kafka.common.TopicAndPartition
810
import kafka.utils.ZkUtils
9-
import org.I0Itec.zkclient.exception.ZkNoNodeException
1011
import org.apache.zookeeper.data.Stat
12+
import org.I0Itec.zkclient.exception.ZkNoNodeException
1113

1214
import scala.collection._
1315
import scala.util.control.NonFatal
@@ -17,7 +19,7 @@ import scala.util.control.NonFatal
1719
* User: pierre
1820
* Date: 1/22/14
1921
*/
20-
class ZKOffsetGetter(theZkUtils: ZkUtils) extends OffsetGetter {
22+
class ZKOffsetGetter(theZkUtils: ZkUtilsWrapper) extends OffsetGetter {
2123

2224
override val zkUtils = theZkUtils
2325

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package com.quantifind.utils
2+
3+
import java.util
4+
5+
import kafka.api.{ApiVersion, LeaderAndIsr}
6+
import kafka.cluster.{EndPoint, BrokerEndPoint, Cluster, Broker}
7+
import kafka.common.TopicAndPartition
8+
import kafka.consumer.ConsumerThreadId
9+
import kafka.controller.{ReassignedPartitionsContext, LeaderIsrAndControllerEpoch}
10+
import kafka.utils.ZkUtils
11+
import org.I0Itec.zkclient.ZkClient
12+
import org.apache.kafka.common.protocol.SecurityProtocol
13+
import org.apache.zookeeper.data.{ACL, Stat}
14+
15+
import scala.collection.mutable
16+
17+
/*
18+
This class is mainly to help us mock the ZkUtils class. It is really painful to get powermock to work with scalatest,
19+
so we created this class with a little help from IntelliJ to auto-generate the delegation code
20+
*/
21+
class ZkUtilsWrapper(zkUtils: ZkUtils) {
22+
23+
val ConsumersPath = ZkUtils.ConsumersPath
24+
25+
val delegator = zkUtils
26+
27+
//def updatePersistentPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.updatePersistentPath(path, data, acls)
28+
29+
//def updatePartitionReassignmentData(partitionsToBeReassigned: collection.Map[TopicAndPartition, Seq[Int]]): Unit = delegator.updatePartitionReassignmentData(partitionsToBeReassigned)
30+
31+
//def updateEphemeralPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.updateEphemeralPath(path, data, acls)
32+
33+
//def setupCommonPaths(): Unit = delegator.setupCommonPaths()
34+
35+
//def replicaAssignmentZkData(map: collection.Map[String, Seq[Int]]): String = delegator.replicaAssignmentZkData(map)
36+
37+
//def registerBrokerInZk(id: Int, host: String, port: Int, advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], jmxPort: Int, rack: Option[String], apiVersion: ApiVersion): Unit = delegator.registerBrokerInZk(id, host, port, advertisedEndpoints, jmxPort, rack, apiVersion)
38+
39+
def readDataMaybeNull(path: String): (Option[String], Stat) = delegator.readDataMaybeNull(path)
40+
41+
def readData(path: String): (String, Stat) = delegator.readData(path)
42+
43+
//def pathExists(path: String): Boolean = delegator.pathExists(path)
44+
45+
//def parseTopicsData(jsonData: String): Seq[String] = delegator.parseTopicsData(jsonData)
46+
47+
//def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = delegator.parsePartitionReassignmentDataWithoutDedup(jsonData)
48+
49+
//def parsePartitionReassignmentData(jsonData: String): collection.Map[TopicAndPartition, Seq[Int]] = delegator.parsePartitionReassignmentData(jsonData)
50+
51+
//def makeSurePersistentPathExists(path: String, acls: util.List[ACL]): Unit = delegator.makeSurePersistentPathExists(path, acls)
52+
53+
//def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = delegator.leaderAndIsrZkData(leaderAndIsr, controllerEpoch)
54+
55+
//def getTopicsByConsumerGroup(consumerGroup: String): Seq[String] = delegator.getTopicsByConsumerGroup(consumerGroup)
56+
57+
//def getSortedBrokerList(): Seq[Int] = delegator.getSortedBrokerList()
58+
59+
//def getSequenceId(path: String, acls: util.List[ACL]): Int = delegator.getSequenceId(path, acls)
60+
61+
//def getReplicasForPartition(topic: String, partition: Int): Seq[Int] = delegator.getReplicasForPartition(topic, partition)
62+
63+
//def getReplicaAssignmentForTopics(topics: Seq[String]): mutable.Map[TopicAndPartition, Seq[Int]] = delegator.getReplicaAssignmentForTopics(topics)
64+
65+
//def getPartitionsUndergoingPreferredReplicaElection(): collection.Set[TopicAndPartition] = delegator.getPartitionsUndergoingPreferredReplicaElection()
66+
67+
def getPartitionsForTopics(topics: Seq[String]): mutable.Map[String, Seq[Int]] = delegator.getPartitionsForTopics(topics)
68+
69+
//def getPartitionsBeingReassigned(): collection.Map[TopicAndPartition, ReassignedPartitionsContext] = delegator.getPartitionsBeingReassigned()
70+
71+
//def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: collection.Set[TopicAndPartition]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = delegator.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions)
72+
73+
//def getPartitionAssignmentForTopics(topics: Seq[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = delegator.getPartitionAssignmentForTopics(topics)
74+
75+
def getLeaderForPartition(topic: String, partition: Int): Option[Int] = delegator.getLeaderForPartition(topic, partition)
76+
77+
//def getLeaderAndIsrForPartition(topic: String, partition: Int): Option[LeaderAndIsr] = delegator.getLeaderAndIsrForPartition(topic, partition)
78+
79+
//def getInSyncReplicasForPartition(topic: String, partition: Int): Seq[Int] = delegator.getInSyncReplicasForPartition(topic, partition)
80+
81+
//def getEpochForPartition(topic: String, partition: Int): Int = delegator.getEpochForPartition(topic, partition)
82+
83+
//def getController(): Int = delegator.getController()
84+
85+
def getConsumersPerTopic(group: String, excludeInternalTopics: Boolean): mutable.Map[String, List[ConsumerThreadId]] = delegator.getConsumersPerTopic(group, excludeInternalTopics)
86+
87+
//def getConsumersInGroup(group: String): Seq[String] = delegator.getConsumersInGroup(group)
88+
89+
//def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = delegator.getConsumerPartitionOwnerPath(group, topic, partition)
90+
91+
//def getConsumerGroups(): Seq[String] = delegator.getConsumerGroups()
92+
93+
//def getChildrenParentMayNotExist(path: String): Seq[String] = delegator.getChildrenParentMayNotExist(path)
94+
95+
def getChildren(path: String): Seq[String] = delegator.getChildren(path)
96+
97+
//def getBrokerSequenceId(MaxReservedBrokerId: Int): Int = delegator.getBrokerSequenceId(MaxReservedBrokerId)
98+
99+
//def getBrokerInfo(brokerId: Int): Option[Broker] = delegator.getBrokerInfo(brokerId)
100+
101+
//def getAllTopics(): Seq[String] = delegator.getAllTopics()
102+
103+
//def getAllPartitions(): collection.Set[TopicAndPartition] = delegator.getAllPartitions()
104+
105+
//def getAllEntitiesWithConfig(entityType: String): Seq[String] = delegator.getAllEntitiesWithConfig(entityType)
106+
107+
//def getAllConsumerGroupsForTopic(topic: String): collection.Set[String] = delegator.getAllConsumerGroupsForTopic(topic)
108+
109+
def getAllBrokersInCluster(): Seq[Broker] = delegator.getAllBrokersInCluster()
110+
111+
//def getAllBrokerEndPointsForChannel(protocolType: SecurityProtocol): Seq[BrokerEndPoint] = delegator.getAllBrokerEndPointsForChannel(protocolType)
112+
113+
//def formatAsReassignmentJson(partitionsToBeReassigned: collection.Map[TopicAndPartition, Seq[Int]]): String = delegator.formatAsReassignmentJson(partitionsToBeReassigned)
114+
115+
//def deletePathRecursive(path: String): Unit = delegator.deletePathRecursive(path)
116+
117+
//def deletePath(path: String): Boolean = delegator.deletePath(path)
118+
119+
//def deletePartition(brokerId: Int, topic: String): Unit = delegator.deletePartition(brokerId, topic)
120+
121+
//def createSequentialPersistentPath(path: String, data: String, acls: util.List[ACL]): String = delegator.createSequentialPersistentPath(path, data, acls)
122+
123+
//def createPersistentPath(path: String, data: String, acls: util.List[ACL]): Unit = delegator.createPersistentPath(path, data, acls)
124+
125+
//def createEphemeralPathExpectConflict(path: String, data: String, acls: util.List[ACL]): Unit = delegator.createEphemeralPathExpectConflict(path, data, acls)
126+
127+
//def conditionalUpdatePersistentPathIfExists(path: String, data: String, expectVersion: Int): (Boolean, Int) = delegator.conditionalUpdatePersistentPathIfExists(path, data, expectVersion)
128+
129+
//def conditionalUpdatePersistentPath(path: String, data: String, expectVersion: Int, optionalChecker: Option[(ZkUtils, String, String) => (Boolean, Int)]): (Boolean, Int) = delegator.conditionalUpdatePersistentPath(path, data, expectVersion, optionalChecker)
130+
131+
//def conditionalDeletePath(path: String, expectedVersion: Int): Boolean = delegator.conditionalDeletePath(path, expectedVersion)
132+
133+
def close(): Unit = delegator.close()
134+
135+
}
Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
1-
/*
21
package com.quantifind.kafka.core
32

3+
import com.quantifind.kafka.offsetapp.OffsetGetterArgs
44
import kafka.api.{OffsetRequest, OffsetResponse, PartitionOffsetsResponse}
55
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
66
import kafka.coordinator.GroupTopicPartition
77
import kafka.consumer.SimpleConsumer
88
import kafka.utils.ZkUtils
9-
import org.I0Itec.zkclient.ZkClient
109
import org.mockito.Matchers._
1110
import org.mockito.Mockito._
12-
import org.mockito.{Matchers => MockitoMatchers, Mockito}
11+
import org.mockito.{Mockito, Matchers => MockitoMatchers}
1312
import org.scalatest._
1413

1514
class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
1615

1716
trait Fixture {
1817

19-
val mockedZkClient = Mockito.mock(classOf[ZkClient])
2018
val mockedZkUtil = Mockito.mock(classOf[ZkUtils])
2119
val mockedConsumer = Mockito.mock(classOf[SimpleConsumer])
2220
val testPartitionLeader = 1
2321

24-
val offsetGetter = new KafkaOffsetGetter(mockedZkUtil)
22+
val args = new OffsetGetterArgs
23+
24+
val offsetGetter = new KafkaOffsetGetter(args)
2525
offsetGetter.consumerMap += (testPartitionLeader -> Some(mockedConsumer))
2626
}
2727

@@ -30,16 +30,21 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
3030
val testGroup = "testgroup"
3131
val testTopic = "testtopic"
3232
val testPartition = 1
33+
val committedOffset = 100
34+
val logEndOffset = 102
3335

3436
val topicAndPartition = TopicAndPartition(testTopic, testPartition)
3537
val groupTopicPartition = GroupTopicPartition(testGroup, topicAndPartition)
36-
val offsetAndMetadata = OffsetAndMetadata(100, "meta", System.currentTimeMillis)
38+
val offsetAndMetadata = OffsetAndMetadata(committedOffset, "meta", System.currentTimeMillis)
3739
KafkaOffsetGetter.committedOffsetMap += (groupTopicPartition -> offsetAndMetadata)
3840

39-
when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(mockedZkClient), MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition)))
41+
//topicPartitionOffsetsMap
42+
KafkaOffsetGetter.topicPartitionOffsetsMap += (topicAndPartition -> logEndOffset)
43+
44+
when(mockedZkUtil.getLeaderForPartition(MockitoMatchers.eq(testTopic), MockitoMatchers.eq(testPartition)))
4045
.thenReturn(Some(testPartitionLeader))
4146

42-
val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(102)))
47+
val partitionErrorAndOffsets = Map(topicAndPartition -> PartitionOffsetsResponse(0,Seq(logEndOffset)))
4348
val offsetResponse = OffsetResponse(1, partitionErrorAndOffsets)
4449
when(mockedConsumer.getOffsetsBefore(any[OffsetRequest])).thenReturn(offsetResponse)
4550

@@ -48,11 +53,10 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
4853
offsetInfo.topic shouldBe testTopic
4954
offsetInfo.group shouldBe testGroup
5055
offsetInfo.partition shouldBe testPartition
51-
offsetInfo.offset shouldBe 100
52-
offsetInfo.logSize shouldBe 102
56+
offsetInfo.offset shouldBe committedOffset
57+
offsetInfo.logSize shouldBe logEndOffset
5358
case None => fail("Failed to build offset data")
5459
}
5560

5661
}
57-
}
58-
*/
62+
}

src/test/scala/com/quantifind/kafka/core/StormOffsetGetterSpec.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
1-
/*
21
package com.quantifind.kafka.core
32

43
import com.quantifind.utils.ZkUtilsWrapper
5-
import org.I0Itec.zkclient.ZkClient
64
import org.apache.zookeeper.data.Stat
75
import org.mockito.Matchers._
86
import org.mockito.Mockito._
@@ -13,11 +11,10 @@ class StormOffsetGetterSpec extends FlatSpec with ShouldMatchers {
1311

1412
trait Fixture {
1513

16-
val mockedZkClient = Mockito.mock(classOf[ZkClient])
1714
val zkOffsetBase = "/stormconsumers"
1815
val mockedZkUtil = Mockito.mock(classOf[ZkUtilsWrapper])
1916

20-
val offsetGetter = new StormOffsetGetter(mockedZkClient, zkOffsetBase, mockedZkUtil)
17+
val offsetGetter = new StormOffsetGetter(mockedZkUtil, zkOffsetBase)
2118
}
2219

2320
"StormOffsetGetter" should "be able to extract topic from persisted spout state" in new Fixture {
@@ -38,12 +35,11 @@ class StormOffsetGetterSpec extends FlatSpec with ShouldMatchers {
3835
}
3936
}"""
4037
val ret = (spoutState, Mockito.mock(classOf[Stat]))
41-
when(mockedZkUtil.readData(MockitoMatchers.eq(mockedZkClient), anyString)).thenReturn(ret)
38+
when(mockedZkUtil.readData(anyString)).thenReturn(ret)
4239

4340
val topics = offsetGetter.getTopicList(testGroup)
4441

4542
topics.size shouldBe 1
4643
topics(0) shouldBe testTopic
4744
}
48-
}
49-
*/
45+
}

0 commit comments

Comments
 (0)