As announced in the August HP press release, the upcoming version of Vertica will be able integrate with Apache Kafka to provide real-time streaming. This post discusses what was shared in the Big Data Conference session on how Vertica and Kafka will integrate to provide a real time, low latency, and scalable streaming solution.
Notice: This feature is not yet released, it may change before release, and it is not a commitment to deliver for a particular release.
Vertica has always been strong at loading data in batches. Organizations with a real-time requirement need to make these batches smaller and more frequent to get faster insight over more use cases. The Kafka integration will allow different processing systems to load data seamlessly into Vertica.
The solution presented at the Big Data Conference will leverage Kafka to stream data in parallel across all nodes with low latency and high throughput. Organizations will no longer have to build custom streaming data ingestion solutions and can move away from traditional methods.
In a traditional bulk load approach, data is loaded through a client such as ODBC or JDBC. This method is easy to set up as there are many tools available for the task. However, it has a limited number of streams and lower throughput. The goal is to make loading data easier while overcoming challenges such as unpredictable bursts from different sources.
With a parallel COPY approach, the complexity increases as files are written to a staging area on local nodes. It is further complicated by scripting needed to load in parallel. The benefits of this being that throughput increases and it is typically scalable.
With Kafka, the goal is to get the best of both approaches.
Kafka is a distributed, scalable, partitioned, and replicated commit log service providing the functionality of a messaging system. In some organizations, it acts as the pipe for which all data flows. Processes which publish messages to Kafka are called producers; processes which subscribe to Kafka are called consumers:
The original use case for Kafka was website activity tracking at LinkedIn. Other use cases such as Metrics, Log Aggregation, Stream Processing, or Event Sourcing are all possible within Kafka.1 Some other organizations leveraging Kafka are below; or visit the Kafka Wiki for a better list with use cases.
The producers generate data and connect to Kafka brokers. The data is written as messages to a topic (a subject of information), which in this solution aligns to a Vertica table. After data is written to the Kafka cluster, the producer has finished its process. Next, consumers query Kafka to pull data out. Parallelism is helped by partitioning topics allowing for reading in parallel.
The advantage with Kafka is that data sources are decoupled from the targets. Traditional approaches of writing multiple ETL flows for multiple systems are no longer needed. Kafka decreases the complexity as the source and target systems only need to know about Kafka:
In this integration, Vertica will act as a consumer for Kafka data. On the Vertica side, a parallel COPY operation will pull topics from Kafka. The expected supported formats will be JSON and Avro, although if the solution is open-sourced, organizations will be able to write their own parsers. A load scheduler will monitor resource pools to optimize these loads. To close the loop, Vertica will also be able to produce, or export query results to Kafka.
From an engineering perspective, some basic requirements had to be met in this solution:
- Low Latency – data moves fast and must be processed from multiple topics efficiently
- Parallelism – resource contention must be handled for concurrent COPY statements
- Data Loss – mitigated through the use of atomic operations in microbatches
- Limited Resources – dedicated resource pool for Kafka loads
- Scalability – adding Vertica nodes or Kafka brokers must increase scalability and pipes must be equal on both sides
- Monitoring/Usability – track and report on offsets and general use from internal system data
The concept of a microbatch covers how offsets are tracked, and data is loaded. Initially, a set of offsets is fetched and a microbatch is started based on the offsets (i.e. offset x). Data is then pulled from Kafka and those messages are loaded into Vertica. Once the microbatch is complete, the offsets are updated and the operation is committed. To mitigate potential issues of offsets with clients and data being missed, offsets are tracked internally in Vertica instead of Zookeeper.
Inside the COPY statement, the Kafka source will have numerous arguments which indicate the stream topic and offsets, brokers, duration, and execution parallelism. A parser, Avro or JSON, is part of the COPY and the rest remains standard (i.e. REJECTED DATA and DIRECT).
With this implementation, topics will be dynamically prioritized to allow for better management around varying durations of topics. After an initial batch, subsequent batches will be resorted based on previous duration. Therefore, low volume microbatches or topics will run in the beginning and allow the remaining time for the high volume microbatches.
If any of the lower volume microbatches would require more time, the batch will have to use its allotted time, and would have more time in the subsequent iteration. Again, the microbatch duration will be configurable within the COPY, and with different instances of schedulers. New system tables will offer visibility into process monitoring over the scheduler history, offsets, and events.
In preliminary tests with 6 servers in a typical recommended configuration, a data set of 1.5 million 5KB JSON messages was loaded in 1.2 TB/hour over a single partition, and at 2.2 TB/hour over three partitions. These results included time to load, parse, and meta overhead while utilizing only 30% CPU and 14GB of memory.
To try out this upcoming feature, a beta is available through a request form.