Conferences

Real Time Streaming with Kafka

2

As announced in the August HP press release, the upcoming version of Vertica will be able integrate with Apache Kafka to provide real-time streaming. This post discusses what was shared in the Big Data Conference session on how Vertica and Kafka will integrate to provide a real time, low latency, and scalable streaming solution.

Notice: This feature is not yet released, it may change before release, and it is not a commitment to deliver for a particular release.

Not Aliens - Kafka

Background

Vertica has always been strong at loading data in batches. Organizations with a real-time requirement need to make these batches smaller and more frequent to get faster insight over more use cases. The Kafka integration will allow different processing systems to load data seamlessly into Vertica.

The solution presented at the Big Data Conference will leverage Kafka to stream data in parallel across all nodes with low latency and high throughput. Organizations will no longer have to build custom streaming data ingestion solutions and can move away from traditional methods.

What’s Worked

In a traditional bulk load approach, data is loaded through a client such as ODBC or JDBC. This method is easy to set up as there are many tools available for the task. However, it has a limited number of streams and lower throughput. The goal is to make loading data easier while overcoming challenges such as unpredictable bursts from different sources.

Traditional ETL Approach

With a parallel COPY approach, the complexity increases as files are written to a staging area on local nodes. It is further complicated by scripting needed to load in parallel. The benefits of this being that throughput increases and it is typically scalable.

Parallel COPY Approach

With Kafka, the goal is to get the best of both approaches.

Kafka

Kafka is a distributed, scalable, partitioned, and replicated commit log service providing the functionality of a messaging system. In some organizations, it acts as the pipe for which all data flows. Processes which publish messages to Kafka are called producers; processes which subscribe to Kafka are called consumers:

Apache Kafka Producer Consumer

The original use case for Kafka was website activity tracking at LinkedIn. Other use cases such as Metrics, Log Aggregation, Stream Processing, or Event Sourcing are all possible within Kafka.1 Some other organizations leveraging Kafka are below; or visit the Kafka Wiki for a better list with use cases.

LinkedIn
Yahoo
Twitter
Netflix
Square
Spotify
Pinterest
Uber
Goldman Sachs
Tumblr
PayPal
Box
Airbnb
Coursera
Shopify
Cerner
Ancestry.com
DataSift
Hotels.com
Trivago

The producers generate data and connect to Kafka brokers. The data is written as messages to a topic (a subject of information), which in this solution aligns to a Vertica table. After data is written to the Kafka cluster, the producer has finished its process. Next, consumers query Kafka to pull data out. Parallelism is helped by partitioning topics allowing for reading in parallel.

The advantage with Kafka is that data sources are decoupled from the targets. Traditional approaches of writing multiple ETL flows for multiple systems are no longer needed. Kafka decreases the complexity as the source and target systems only need to know about Kafka:

Kafka Decoupling

A large Kafka ecosystem exists (stream processors, Hadoop integrations, management consoles, loggers, etc.) as well as support for a long list of clients (Python, .net, Storm, Perl, Node.js, etc.).

Vertica Integration

In this integration, Vertica will act as a consumer for Kafka data. On the Vertica side, a parallel COPY operation will pull topics from Kafka. The expected supported formats will be JSON and Avro, although if the solution is open-sourced, organizations will be able to write their own parsers. A load scheduler will monitor resource pools to optimize these loads. To close the loop, Vertica will also be able to produce, or export query results to Kafka.

Requirements

From an engineering perspective, some basic requirements had to be met in this solution:

  • Low Latency – data moves fast and must be processed from multiple topics efficiently
  • Parallelism – resource contention must be handled for concurrent COPY statements
  • Data Loss – mitigated through the use of atomic operations in microbatches
  • Limited Resources – dedicated resource pool for Kafka loads
  • Scalability – adding Vertica nodes or Kafka brokers must increase scalability and pipes must be equal on both sides
  • Monitoring/Usability – track and report on offsets and general use from internal system data
Microbatches

The concept of a microbatch covers how offsets are tracked, and data is loaded. Initially, a set of offsets is fetched and a microbatch is started based on the offsets (i.e. offset x). Data is then pulled from Kafka and those messages are loaded into Vertica. Once the microbatch is complete, the offsets are updated and the operation is committed. To mitigate potential issues of offsets with clients and data being missed, offsets are tracked internally in Vertica instead of Zookeeper.

Kafka Microbatch

Inside the COPY statement, the Kafka source will have numerous arguments which indicate the stream topic and offsets, brokers, duration, and execution parallelism. A parser, Avro or JSON, is part of the COPY and the rest remains standard (i.e. REJECTED DATA and DIRECT).

Implementation

With this implementation, topics will be dynamically prioritized to allow for better management around varying durations of topics. After an initial batch, subsequent batches will be resorted based on previous duration. Therefore, low volume microbatches or topics will run in the beginning and allow the remaining time for the high volume microbatches.

Kafka Dynamic Topic Prioritization

If any of the lower volume microbatches would require more time, the batch will have to use its allotted time, and would have more time in the subsequent iteration. Again, the microbatch duration will be configurable within the COPY, and with different instances of schedulers. New system tables will offer visibility into process monitoring over the scheduler history, offsets, and events.

Results

In preliminary tests with 6 servers in a typical recommended configuration, a data set of 1.5 million 5KB JSON messages was loaded in 1.2 TB/hour over a single partition, and at 2.2 TB/hour over three partitions. These results included time to load, parse, and meta overhead while utilizing only 30% CPU and 14GB of memory.

Beta

To try out this upcoming feature, a beta is available through a request form.

1 Kafka 0.8.1 Documentation

About the author / 

Norbert Krupa

Norbert is the founder of vertica.tips and a Solutions Engineer at Talend. He is an HP Accredited Solutions Expert for Vertica Big Data Solutions. He has written the Vertica Diagnostic Queries which aim to cover monitoring, diagnostics and performance tuning. The views, opinions, and thoughts expressed here do not represent those of the user's employer.

2 Comments

  1. Max October 13, 2015 at 4:38 AM -  Reply

    When it is expected the release of integration with Kafka? Long time waiting must have addition such as seamless connection with Apache Spark, but there is almost three months already passing after announcement of Excavator and no other information nor official about this at all. Just one post in this blog, noting more.

    • Norbert Krupa October 13, 2015 at 8:21 AM -  Reply

      Max, I don’t work for HP, but according to a post on vertica-forums.com, looks to be released end of October.

Leave a Reply

Upcoming Events

  • No upcoming events
AEC v1.0.4

Subscribe to Blog via Email

Enter your email address to subscribe and receive notifications of new posts by email.

Read more use cases here.

Notice

This site is not affiliated, endorsed or associated with HPE Vertica. This site makes no claims on ownership of trademark rights. The author contributions on this site are licensed under CC BY-SA 3.0 with attribution required.
%d bloggers like this: