EXAMENSARBETE INOM INFORMATIONS- OCHKOMMUNIKATIONSTEKNIK,AVANCERAD NIVÅ, 30 HPSTOCKHOLM, SVERIGE 2016Multi-Tenant Apache Kafkafor HopsKafka Topic-Based Multi-Tenancy and ACLBased Authorization for HopsMISGANU DESSALEGN MURUTSKTHSKOLAN FÖR INFORMATIONS- OCH KOMMUNIKATIONSTEKNIK
Multi-Tenant Apache Kafka for HopsKafka Topic-Based Multi-TenancyandACL-Based Authorization for HopsMisganu Dessalegn MurutsMaster of Science ThesisSoftware Engineering of Distributed SystemsSchool of Information and Communication TechnologyKTH Royal Institute of TechnologyStockholm, Sweden15 November 2016Examiner: Dr. Jim DowlingSupervisor: Gautier BerthouTRITA Number: TRITA-ICT-EX-2016:120
c Misganu Dessalegn Muruts, 15 November 2016
AbstractApache Kafka is a distributed, high throughput and fault-tolerant publish/subscribemessaging system in the Hadoop ecosystem. It is used as a distributed datastreaming and processing platform. Kafka topics are the units of message feedsin the Kafka cluster. Kafka producer publishes messages into these topics and aKafka consumer subscribes to topics to pull those messages. With the increasedusage of Kafka in the data infrastructure of many companies, there are manyKafka clients that publish and consume messages to/from the Kafka topics. Infact, these client operations can be malicious. To mitigate this risk, clients mustauthenticate themselves and their operation must be authorized before they canaccess to a given topic. Nowadays, Kafka ships with a pluggable Authorizerinterface to implement access control list (ACL) based authorization for clientoperation. Kafka users can implement the interface differently to satisfy theirsecurity requirements. SimpleACLAuthorizer is the out-of-box implementationof the interface and uses a Zookeeper for ACLs storage.HopsWorks, based on Hops - a next generation Hadoop distribution, providessupport for project-based multi-tenancy, where projects are fully isolated at thelevel of the Hadoop Filesystem and YARN. In this project, we added Kafka topicbased multi-tenancy in Hops projects. Kafka topic is created from inside Hopsproject and persisted both at the Zookeeper and the NDBCluster. Persisting atopic into a database enabled us for topic sharing across projects. ACLs areadded to Kafka topics and are persisted only into the database. Client accessto Kafka topics is authorized based on these ACLs. ACLs are added, updated,listed and/or removed from the HopsWorks WebUI. HopsACLAuthorizer, a Hopsimplementation of the Authorizer interface, authorizes Kafka client operationsusing the ACLs in the database. The Apache Avro schema registry for topicsenabled the producer and consumer to better integrate by transferring a preestablished message format. The result of this project is the first Hadoopdistribution that supports Kafka multi-tenancy.Keywords: Hadoop, Kafka, Hops, HopsWorks, Multi-Tenancy, Kafka Topics,Schema Registry, Messaging Systems, ACL Authorizationi
AcknowledgementsI would like to express my profound gratitude to Dr. Jim Dowling, for hiscontinuous support during my thesis period. His open-minded approach tonew ideas, insightful and tactful feedback to technical disagreement were theinvaluable assets to the success of the project. Moreover, learning from him wasthe thrilling experience that I would not pass without singling it out.My advisor, Gauthier Berthou, was also of much help to get the goals of thethesis achieved. His door was always open when I needed to talk to him. I amalso indebted to recognize the substantial assistance from Ermias, Theofilos andthe remaining Hops-team members. Moreover, I enjoyed their friendship and, ofcourse, the Friday fika.My extended appreciation goes to the Swedish Institute (SI ) which sponsoredmy two years master study in Sweden. My contract with SI did not terminate withmy master completion. I believe the support has laid a foundation for my futurecareer.Last but not least, my heartfelt gratitude goes to my family for standing by meto share my dream. The unfailing support they offer and the enduring confidencethey bestowed on me keeps constantly energizing me.iii
Contents1234Introduction1.1 Background . . . . . . . . . . . . . . . .1.2 Problem . . . . . . . . . . . . . . . . . .1.3 Purpose . . . . . . . . . . . . . . . . . .1.3.1 Benefits, Ethics and Sustainability1.4 Delimitation . . . . . . . . . . . . . . . .1.5 Outline . . . . . . . . . . . . . . . . . .1333444Background2.1 What is Messaging . . . . . . . . . . . . .2.2 Kafka Architecture . . . . . . . . . . . . .2.3 Kafka Main Characteristics . . . . . . . . .2.4 Kafka and other Messaging Services . . . .2.5 Apache Kafka Use Cases . . . . . . . . . .2.6 About SICS Hops Project . . . . . . . . . .2.7 Multi-Tenant Architecture . . . . . . . . .2.7.1 Multi-Tenancy in Cloud Computing2.7.2 Multi-Tenancy in Database . . . . .77811121213141414Related Works3.1 Kafka Security . . . . . . . . . . .3.2 ACL Based Authorization . . . . .3.2.1 Kafka SimpleAclAuthorizer3.2.2 DefaultPrincipalBuilder . .3.2.3 Other Implementations . . .3.3 Schema Registry . . . . . . . . . .3.4 Hops Project based Multi-tenancy .1717171819192021.Methodology234.1 Goal . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 234.2 Solution . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 23v
viC ONTENTS4.3567Methodology . . . . . . . . . . . . . . . . . . . . . . . . . . . .24Analysis5.1 Kafka Topic . . . . . . . . . . . . . . . . . . . . . . . .5.1.1 Existing Apache Kafka Topic Operations . . . .5.1.2 Hops New Topic Operations . . . . . . . . . . .5.2 Topic Availability Invariant . . . . . . . . . . . . . . .5.2.1 Topic Availability States . . . . . . . . . . . . .5.2.2 Why Always to State Four . . . . . . . . . . . .5.3 Why Topic Sharing . . . . . . . . . . . . . . . . . . . .5.4 Hops Schema Registry . . . . . . . . . . . . . . . . . .5.4.1 Kafka Clients Access to Avro Schema . . . . . .5.4.2 Avro Schema Compatibility and Evolution . . .5.5 Synchronizing Zookeeper and Database for Kafka Topics5.5.1 Topic Synchronization Failure Scenarios . . . .5.5.2 Possible Synchronization Approaches . . . . . .5.6 Hops ACL Definition . . . . . . . . . . . . . . . . . . .5.6.1 The Role Concept . . . . . . . . . . . . . . . .5.6.2 Fine-grained ACLs and Role-base ACLs . . . .5.7 HopsAclAuthorizer Class . . . . . . . . . . . . . . . . .5.7.1 HopsPrincipalBuilder Class . . . . . . . . . . .5.7.2 When is a topic operation authorized . . . . . . .2525252727282829293031313233353737394040Test Configurations6.1 Kafka Broker Configurations6.2 Zookeeper Configuration . .6.3 Configuring Kafka Clients .6.4 Local Resources . . . . . . .6.5 Kafka Util Library . . . . .6.6 Spark job . . . . . . . . . .43434444454546.Conclusions, Recommendations and Future Works477.1 Conclusions and Recommendations . . . . . . . . . . . . . . . . 477.2 Future Works . . . . . . . . . . . . . . . . . . . . . . . . . . . . 48Bibliography49
List of Figures22.214.171.124.4Forms of messaging . . . . . . . . . .Kafka Architecture  . . . . . . . .Kafka consumer group abstraction Shared database, shared schema  .vii. 9. 9. 11. 15
List of Tables2.1Degree of multi-tenancy in cloud computing . . . . . . . . . . . .5.15.2Sample Hops ACL definitions . . . . . . . . . . . . . . . . . . . 38Hops ACL definition levels . . . . . . . . . . . . . . . . . . . . . 39ix14
List of Listings3.16.1Schema example . . . . . . . . . . . . . . . . . . . . . . . . . . 21Sample Kafka broker configuration . . . . . . . . . . . . . . . . . 43xi
Chapter 1IntroductionNowadays, the amount of data either flowing on the Internet or enterprises haveon their storage is large and complex. Such data is often generated and/orcollected from sensors and mobile devices, social media user activities, traditionalenterprises, e-commerce customer activities and others. This collected data canbe structured, semi-structured or totally unstructured making it challenging tomanage, analyze and generate insights from it.Big data refers to a huge volume of data that cannot be analyzed and storedusing the traditional methods and tools of data processing. The challenges inhandling Big Data are defined by three of the Big Data Systems properties, oftentermed as 3’Vs [4, 5]. It usually has one or more of them - high volume, highvelocity and/or high variety. Volume - refers to the size of data. Data volume can be growing rapidly andendlessly despite the limited capacity of the data processing resources. Variety - refers to the various data source types and the various dataformats. Data may come from business transactions, social media useractivities and IoT sensors in different formats ( e.g. text documents, audio,video, financial transactions and others). Velocity - refers to how quickly data is generated, delivered, stored andretrieved. Data can be in batch form, can arrive periodically or in real-time.Other rarely mentioned 2’Vs are also added as characteristics of Big Data. Veracity - refers to the uncertainty or inaccuracy of the data as the data maycome from unknown and ever-changing sources. Data filtering, structuringwill be needed to keep data consistency [4, 5].1
2C HAPTER 1. I NTRODUCTION Value - refers to the quality of data stored and its commercial importance. Storing data is a waste of storage resources if that data cannotbe analyzed to generate meaningful insights and used for better businessoperations.Big Data, in terms of its properties, is explained as a volume of data with highacquisition velocity that has different data representations. Storing data is a wasteof storage resources unless its opportunities are harnessed and used to empowerorganizations, by providing faster and better decision making and better customerexperiences, through otherwise inaccessible data patterns and insights. Suchvolume of data inundates businesses on a day-to-day basis, making it difficultto analyze using traditional relational approaches . Continuous researches byindustries and academia to maximize opportunities that can be gained from bigdata is giving way for the emerging data driven-economy.Echoing the motto of ’data is the future oil’, big companies have been investingon Big Data Analytic and making use of the data they have. Due to their limitedstoring and processing capacities for big data, traditional relational databasesare becoming like things of the past. Even the highly centralized enterprisedata centers are incapable of processing a huge volume of data. Big data isdistributed data. Big Data Analytic is the modern approach to examine largedata and uncover hidden patterns in it giving insights. In other words, it isthe application of advanced analysis techniques against a large data stored in adistributed environment.Apache Hadoop has become the defacto Big Data Analytic platform. It isan open-source framework for distributed storage and distributed processing ofa large amount of data using a cluster of commodity hardware. It does notneed a custom designed computing hardware. It rather uses the low performanceand easily affordable computing hardware. It has many distributions - Cloudera,Hortonworks and MapR being the most widely used distributions.The distributed nature of data in Big Data systems comes with its ownchallenges and opportunities. Balancing the storing and processing loads amongdispersed commodity computers is one of the benefits that enabled to analyzeBig Data. On the other hand, communication among the distributed processingcomponents and coordinating them are among the challenges that should beaddressed. These components can communicate by messaging - a mechanism thatenables computer processes to send/receive signals or data among themselves.There are different messaging platforms used in different systems. But ApacheKafka is usually used with Hadoop and it is the focus of the thesis work.
1.1. BACKGROUND1.13BackgroundIn recent years, Apache Kafka has become the defacto high throughput messagingsystem within the Hadoop ecosystem. Popular architectures for handling data-inmotion, such as Dataflow and Lambda architectures, are built using Kafka. Kafkasupport for Hadoop is still at the early beta stage despite Kafka’s importance toHadoop. Prior to Kafka version 0.9, it had a few consideration to data security.The Kafka community introduced a security feature only after this version.At KTH/SICS, we are developing the Hadoop Open Platform-as-a-Service(Hops), a next generation architecture for Hadoop that improves scalability andsecurity. Hops does not yet support Kafka. In this project, we will integrate amulti-tenant Kafka service into HopsWorks.1.2ProblemApache Kafka is one of the important components of a company’s data infrastructure.As a messaging service, it is used to store some of the critical data in a cluster ofservers. Limiting access, to the data, only to company internal employees willnot guarantee that the data is not misused. Data should be secured from bothinternal and external users’ threats. Even internal users have different duties anddata requirements. So, the permission to access this data should be based on thedata that the user wants for legitimate reasons.The challenge is how to secure this crucial data from possible internal andexternal threats that may otherwise compromise company data?1.3PurposeThis project aims at building a multi-tenant Kafka cluster for HopsWorks. ThisKafka cluster will support ACL-based user authorization. By ACL based userauthorization, it means that after authenticating Kafka client connections, theirrequests to access secured resources will be authorized using access control listsdefined for those resources. The main purposes of this project includes thefollowing parts: Multi-tenant Kafka for HopsWorks: HopsWorks has the concept ofprojects. A user should be either a project owner or project member toperform any job on HopsWorks. For jobs that require access to Kafkatopics, a user will be able to create and access topics within the project.Topic owner will also be able to share the topic across projects if needed.
4C HAPTER 1. I NTRODUCTION Hops schema registry: a Kafka topic will have associated Apache Avroschema for data integrity. Schema registry will be implemented to storeinto and retrieve from a schema. ACL-based authorization: a topic will have associated user access controllists (ACLs) defining which user performs what on it. Hops ACL-basedauthorizer implementation will authorize user requests based on the topicACLs persisted into a database.1.3.1Benefits, Ethics and SustainabilityData security and privacy is essential for the very existence companies that ownclassified data. Every company needs a secure way of data handling either duringcommunication or storage. Often times, data in a network is at higher risk as it isexposed to a wide range of users compared to data in local storage. Securing suchdata is crucial for the successful functioning of enterprises because exposed datacan reach to the extent of endangering their existence.Integrating Kafka into HopsWorks can introduce security holes unless appropriatelyconfigured. For instance, by sharing a topic across projects and allowing topicoperations to project members, the topic resident data is open to all users whichmight not be desired in an environment where there is a classified data. Accessto such data should be authorized so that intruders are blocked from causingsecurity and privacy problems. Even though Kafka is scalable and high throughputmessaging system, which may discourage the concept of topic sharing acrossprojects, topic sharing is still essential for the efficient use of resources.1.4DelimitationThis work depends on many other previous works. It mainly uses the ApacheKafka project and the Hops. It does not change the Kafka project though. It onlychanges the implementation of the pluggable authorizer interface provided by theproject. Basing on the Hops feature that it provides project as a service, and thefunctionality that SSL keys and certificates can be generated both for Hops projectmembers and Kafka brokers on demand, this thesis work will add a multi-tenantKafka to HopsWorks.1.5OutlineAs introduced above, this paper discusses on how to integrate a multi-tenant Kafkainto HopsWorks. Chapter 2 goes through a deeper discussion of the Apache
1.5. O UTLINE5Kafka and its main concepts relevant to the multi-tenancy feature followed bychapter 3 that briefs the related previous works attempted to provide secureKafka cluster, authorization of client requests to the cluster and schema registries.Chapter 4 discusses the methodology used, the goals and the proposed solutionof this project. The solution is analyzed thoroughly in chapter 5 that covers thedetailed thesis works. Chapter 6 goes through the test configurations and test filesdeveloped to test the multi-tenant Kafka. Finally, chapter 7 concludes the paperby providing conclusions, recommendations and future works.
Chapter 2BackgroundKafka is a distributed fault-tolerant, high throughput, partitioned, replicatedpublish/subscribe messaging system, originally developed at Linkedin and nowopen sourced under the Apache License. This chapter explores Apache Kafkawith detail discussions to its architecture, design principles and characteristics. Italso discusses multi-tenancy architecture both in cloud computing and databasedomains.2.1What is MessagingMessaging is a way to allow computer processes to communicate with one anotherby sending messages along a communication medium. These messages cancontain information about event notifications or service requests. If both thesender and the receiver processes have almost similar speeds and/or processesexchange messages pairwise, a direct connection between the communicatingparties can provide the messaging medium. This is a point to point messaging,as illustrated in figure 2.1a below.However, when either the sender process has higher speed than the receiverprocess, and/or the sender process wants to send the message to multiple receiversand/or the receiver process wants to receive messages from multiple senders, amessaging service that implements standard messaging concepts such as batching,broadcasting, ordering is needed. An implementation of such concepts meddlesthe communication between the end parties. The hypothetical messaging systemdepicted in figure 2.1b, is only for the illustration of the later scenario. Otherwise,it is an opaque messaging system that does not implement any messaging conceptsor semantics for that matter.These two modes of messaging are briefed here to spring into Apache Kafkawhich implements the later mode. The thorough discussion of these modes and7
8C HAPTER 2. BACKGROUNDtheir comparisons set aside, one notable difference between them is that the firstmode supports only synchronous communications while the later can supportboth synchronous and asynchronous communications. Additionally, a messagingsystem can be seen as a level of indirection entity in between to decouple messagesenders from recipients. One advantage of decoupling message recipients frommessages senders is that they do not need to know about each other.Messaging services can be implemented using either publish/subscribe basedor queue based model [8, 9]. In a queue based implementation, senders pushmessages into queues and receivers read those messages. Messages can either bepulled by receivers or pushed by the queue depending on the implementation ofthe queue. In some implementations, only a single sender can push messagesto a queue and a single receiver can read messages from the queue. In thisapproach, the queue is used as a message buffer to balance the sender and receiverprocesses speeds. This is simply a point-to-point link with a buffering capabilityto support asynchronous messaging. If multiple receivers are reading from thequeue, however, each receiver reads a portion of the messages on the queue aseach message has to be read by only one consumer. To achieve this, the queueshould implement a mechanism that identifies which messages should be sent towhich receivers and how, either in a round-robin fashion or following anotheralgorithm.In a publish/subscribe (pub/sub in short) based messaging system implementation,there are sets of topic channels to which senders push messages and from whicha message is broadcast to all receivers subscribed to receive messages from thetopic. The queue based implementation is a degenerate case of pub/sub wherethere is only one topic channel, one sender and one receiver.Kafka implemented as a pub/sub system abstracts the queue-based approachalso as discussed in section 2.2 below.2.2Kafka ArchitectureApache Kafka is an advanced messaging system where its operational architecturelooks like figure 2.2. It consists of the following main components: producer,consumer, broker, Zookeeper. It supports a plug-able authorizer module. Message : is the transferable unit of data in Kafka. It is an array of bytes. Broker : Kafka is a cluster of servers each called a broker. Each brokerneeds a separate configuration. Kafka broker configuration is specifiedin the server.properties file, with unique configuration values for theproperties: broker.id, port and log.dir.
2.2. K AFKA A RCHITECTURE(a) Point to point messaging9(b) Pub/sub messaging serviceFigure 2.1: Forms of messagingFigure 2.2: Kafka Architecture  Topic : an abstraction which a message is pushed into and consumed from.A topic is partitioned and replicated, and for each topic partition there is aleader broker and other followers if required. Producer : a Kafka client process that publishes messages into topics ofits choice. It has the ability to choose which message to publish into whichtopic partitions. A topic partition will be discussed later. Consumer : a Kafka client process that consumes messages from topics.Each consumer instance labels itself with a consumer group identification. Apache Zookeeper : a high performance distributed coordination servicewith a hierarchical namespace. The main Zookeeper roles in Apache Kafkaare:
10C HAPTER 2. BACKGROUND1. Coordination : managing cluster membership and electing a leader,watching broker failure and replacing it when it happens, providingnode watchers others.2. Metadata storage : it stores the topic information such as topicreplication and partition information, resource ACLs, and maintainsconsumer offsets. A description how Zookeeper stores the metadata isdiscussed as Kafka data structure in Zookeeper .3. It also provides quotas for clients as of Apache Kafka version 0.9.For each topic, Kafka cluster maintains a configurable number of partitions. Apartition is an ordered sequence of messages and continually appended to. Eachmessage is identified by an offset and retained for a configurable number of days.Kafka provides a single consumer abstraction called consumer-group thatgeneralizes both the queue based and pub/sub implementations, see figure 2.3.Consumers label themselves within a consumer group, and each message is readby a single consumer in the consumer group . This abstraction allows agroup of consumers to divide up the work of consuming and processing Kafkamessages. Each consumer group is subscribed to every topic partition, whicheffectively broadcasts every message to all consumer groups. But within aconsumer group, only a single instance of consumer processes the deliveredmessage. If each consumer group consists of only a single consumer, Kafkabecomes a publish/subscribe system where every message is broadcast to a singleinstance consumer group and processed by it. If all consumer instances belongto a single consumer group, Kafka degenerates to a queue system where eachmessage is read by a single consumer group and processed by a single consumerinstance within the group. The Consumer group is the main concept behind thehorizontal scalability feature of Kafka. It allows a pool of processes to divide upthe work of consuming and processing records , it will be discussed in section2.3 below.The number of topic partitions determines the number of consumer instancesin a consumer group that can process messages in parallel. For a topic that hasN number of partitions, it can support up to a maximum of N consumers. Ifthere are fewer partitions than the number of consumers in a consumer group, theextra consumers will be kept idle. If there are more partitions than consumers,to the contrary, a single consumer instance may consume from more than onepartitions. Generally, more number of partitions leads to high throughput.However, availability and latency requirements might be compromised as morepartitions introduce more open file handles in brokers, end-to-end latency, andmore memory requirements in the clients . Each partition is replicated into aconfigurable number of replica servers. One of the replica servers acts as a leader
2.3. K AFKA M AIN C HARACTERISTICS11for that partition and the remaining servers follow the leader. In cases the leaderfails, one of the followers is promoted to be a new leader of the partition.Figure 2.3: Kafka consumer group abstraction 2.3Kafka Main CharacteristicsKafka, often rethought as a distributed transaction log, was developed to providemessaging services. A few of the basic design principles are listed below. Scalability - Kafka scales horizontally by adding more commodity nodeson the fly. So, the cluster size can grow/shrink dynamically withoutcluster downtime. Deploying more commodity nodes than a single highperformance server increases the number of I/O operations that can handlea huge volume of data. For instance, N times partitioned topic can optimallysupport N consumers whether the deployment is a single node or a clusterof nodes. However, due to the limited number of I/O operations on asingle server, the consuming processes will be fast if the topic partitionsare distributed in a cluster of nodes. Scaling provides a high throughput toboth producers and consumers by distributing the load to the whole cluster. Durability - Unlike other messaging systems, Kafka persists messages ontoa disk with a strong durability guarantee. The advantage of persistingmessages onto disk is twofold. It ensures that data on a disk is not lostwhen a broker gets down. It also aggregates the data for batch processing.
12C HAPTER 2. BACKGROUND Fault Tolerance - Kafka can survive N-1 node failures in a cluster ofN nodes if the message configuration is properly configured. Each topicpartition is replicated into a configurable number of replicas and the leaderfor that partition keeps a list of in-sync-replicas. This list follows the leaderso that each node in the list reflects the leader states. In case the leaderbroker is down, one of the in-sync-replicas will be elected as new leader ofthe partition and Kafka keeps serving. This fail-over process is transparentto the application.2.4Kafka and other Messaging ServicesUnlike other messaging systems like RabbitMQ and Apache ActiveMQ, Kafkaprovides the following services [12, 13, 14]. It scales out horizontally very easily by adding more commodity servers.The main approach to scaling out in Kafka is by adding more consumers toa consumer group. It provides high throughput for both publishing and consuming. It persists messages onto disks. This feature enables Kafka to process bothbatch and streaming data. It automatically balances consumers during failure. Each consumer in aconsumer group processes different messages. If one of them fails, theconsumer re-balance algorithm should run so that a new consumer will beassigned to each partition whose messages were processed by the failedconsumer.2.5Apache Kafka Use CasesApache Kafka is now a reliable and mature project deployed in the data pipelinesof many industry leaders such as LinkedIn, Spotify, Yahoo, Netflix, Paypal andothers . Some of the use cases [8, 16, 17] where these companies use ApacheKafka for are listed below. Messaging - it is used as a messaging broker, decoupling consumers fromproducers in large scale message processing applications. It can also beused to replicate database. The original database will act as Kafka producerand the destination database will act as Kafka consumer. In addition, DBAscan use Kafka to decouple apps from databases.
2.6. A BOUT SICS H OPS P ROJECT13 Website activity tracking - it can be used to track user website activities(page view, search) or user social media activities (shares, posts, comments,twits etc), user preferences on music streaming applications (
HopsWorks, based on Hops - a next generation Hadoop distribution, provides support for project-based multi-tenancy, where projects are fully isolated at the level of the Hadoop Filesystem and YARN. In this project, we added Kafka topic-based multi-tenancy