As announced in the August HP press release, the upcoming version of Vertica will be able integrate with Apache Kafka to provide real-time streaming. This post discusses what was shared in the Big Data Conference session on how Vertica and Kafka will integrate to provide a real time, low latency, and scalable streaming solution.
Notice: This feature is not yet released, it may change before release, and it is not a commitment to deliver for a particular release.
Background
Vertica has always been strong at loading data in batches. Organizations with a real-time requirement need to make these batches smaller and more frequent to get faster insight over more use cases. The Kafka integration will allow different processing systems to load data seamlessly into Vertica.
The solution presented at the Big Data Conference will leverage Kafka to stream data in parallel across all nodes with low latency and high throughput. Organizations will no longer have to build custom streaming data ingestion solutions and can move away from traditional methods.
What’s Worked
In a traditional bulk load approach, data is loaded through a client such as ODBC or JDBC. This method is easy to set up as there are many tools available for the task. However, it has a limited number of streams and lower throughput. The goal is to make loading data easier while overcoming challenges such as unpredictable bursts from different sources.
With a parallel COPY approach, the complexity increases as files are written to a staging area on local nodes. It is further complicated by scripting needed to load in parallel. The benefits of this being that throughput increases and it is typically scalable.
With Kafka, the goal is to get the best of both approaches.
Kafka
Kafka is a distributed, scalable, partitioned, and replicated commit log service providing the functionality of a messaging system. In some organizations, it acts as the pipe for which all data flows. Processes which publish messages to Kafka are called producers; processes which subscribe to Kafka are called consumers:
The original use case for Kafka was website activity tracking at LinkedIn. Other use cases such as Metrics, Log Aggregation, Stream Processing, or Event Sourcing are all possible within Kafka.1 Some other organizations leveraging Kafka are below; or visit the Kafka Wiki for a better list with use cases.
LinkedIn Yahoo Netflix Square | Spotify Uber Goldman Sachs Tumblr | PayPal Box Airbnb Coursera Shopify | Cerner Ancestry.com DataSift Hotels.com Trivago |
The producers generate data and connect to Kafka brokers. The data is written as messages to a topic (a subject of information), which in this solution aligns to a Vertica table. After data is written to the Kafka cluster, the producer has finished its process. Next, consumers query Kafka to pull data out. Parallelism is helped by partitioning topics allowing for reading in parallel.
The advantage with Kafka is that data sources are decoupled from the targets. Traditional approaches of writing multiple ETL flows for multiple systems are no longer needed. Kafka decreases the complexity as the source and target systems only need to know about Kafka:
A large Kafka ecosystem exists (stream processors, Hadoop integrations, management consoles, loggers, etc.) as well as support for a long list of clients (Python, .net, Storm, Perl, Node.js, etc.).
Vertica Integration
In this integration, Vertica will act as a consumer for Kafka data. On the Vertica side, a parallel COPY operation will pull topics from Kafka. The expected supported formats will be JSON and Avro, although if the solution is open-sourced, organizations will be able to write their own parsers. A load scheduler will monitor resource pools to optimize these loads. To close the loop, Vertica will also be able to produce, or export query results to Kafka.
Requirements
From an engineering perspective, some basic requirements had to be met in this solution:
- Low Latency – data moves fast and must be processed from multiple topics efficiently
- Parallelism – resource contention must be handled for concurrent COPY statements
- Data Loss – mitigated through the use of atomic operations in microbatches
- Limited Resources – dedicated resource pool for Kafka loads
- Scalability – adding Vertica nodes or Kafka brokers must increase scalability and pipes must be equal on both sides
- Monitoring/Usability – track and report on offsets and general use from internal system data
Microbatches
The concept of a microbatch covers how offsets are tracked, and data is loaded. Initially, a set of offsets is fetched and a microbatch is started based on the offsets (i.e. offset x). Data is then pulled from Kafka and those messages are loaded into Vertica. Once the microbatch is complete, the offsets are updated and the operation is committed. To mitigate potential issues of offsets with clients and data being missed, offsets are tracked internally in Vertica instead of Zookeeper.
Inside the COPY statement, the Kafka source will have numerous arguments which indicate the stream topic and offsets, brokers, duration, and execution parallelism. A parser, Avro or JSON, is part of the COPY and the rest remains standard (i.e. REJECTED DATA and DIRECT).
Implementation
With this implementation, topics will be dynamically prioritized to allow for better management around varying durations of topics. After an initial batch, subsequent batches will be resorted based on previous duration. Therefore, low volume microbatches or topics will run in the beginning and allow the remaining time for the high volume microbatches.
If any of the lower volume microbatches would require more time, the batch will have to use its allotted time, and would have more time in the subsequent iteration. Again, the microbatch duration will be configurable within the COPY, and with different instances of schedulers. New system tables will offer visibility into process monitoring over the scheduler history, offsets, and events.
Results
In preliminary tests with 6 servers in a typical recommended configuration, a data set of 1.5 million 5KB JSON messages was loaded in 1.2 TB/hour over a single partition, and at 2.2 TB/hour over three partitions. These results included time to load, parse, and meta overhead while utilizing only 30% CPU and 14GB of memory.
Beta
To try out this upcoming feature, a beta is available through a request form.
About the author /
Norbert KrupaNorbert is the founder of vertica.tips and a Solutions Engineer at Talend. He is an HP Accredited Solutions Expert for Vertica Big Data Solutions. He has written the Vertica Diagnostic Queries which aim to cover monitoring, diagnostics and performance tuning. The views, opinions, and thoughts expressed here do not represent those of the user's employer.
Leave a Reply Cancel reply
This site uses Akismet to reduce spam. Learn how your comment data is processed.
Upcoming Events
- No upcoming events
Subscribe to Blog via Email
Use Cases
- Optimal+ helps semiconductor companies harness big data
- Performance analytics at Snagajob scales to the rapid pace of hiring
- Qantas Airlines using Amazon Web Services (AWS)
- Cardlytics migrates to a new scalable analytics platform enabling clients to attract new customers
- GoodData analytics developers on what they look for in a big data platform
2 Comments
When it is expected the release of integration with Kafka? Long time waiting must have addition such as seamless connection with Apache Spark, but there is almost three months already passing after announcement of Excavator and no other information nor official about this at all. Just one post in this blog, noting more.
Max, I don’t work for HP, but according to a post on vertica-forums.com, looks to be released end of October.