Kafka PubSub

The Kafka implementation of the Bullet PubSub can be used on any Backend and Web Service. It uses Apache Kafka as the backing PubSub queue and works on all Backends.

How does it work?

The implementation by default asks you to create two topics in a Kafka cluster - one for queries and another for results. The Web Service publishes queries to the queries topic and reads results from the results topic. Similarly, the Backend reads queries from the queries topic and writes results to the results topic. All messages are sent as PubSubMessages.

You do not need to have two topics - but it makes it simpler to reason about if you do - you can have one but you should use multiple partitions and configure your Web Service and Backend to produce to and consume from the right partitions. See the setup section for more details.

Kafka Client API

The Bullet Kafka implementation uses the Kafka 2.6.0 client APIs. Generally, your forward or backward compatibilities should work as expected.

Setup

Before setting up, you will need a Kafka cluster setup with your topic(s) created. This cluster need only be a couple of machines if it's devoted for Bullet. However, this depends on your query and result volumes. Generally, these are at most a few hundred or thousands of messages per second and a small Kafka cluster will suffice.

To setup Kafka, follow the instructions here.

Plug into the Backend

Depending on how your Backend is built, either add Bullet Kafka to your classpath or include it in your build tool. Head over to our releases page for getting the artifacts. If you're adding Bullet Kafka to the classpath instead of building a fat jar, you will need to get the jar with the classifier: fat since you will need Bullet Kafka and all its dependencies.

Configure the backend to use the Kafka PubSub:

bullet.pubsub.context.name: "QUERY_PROCESSING"
bullet.pubsub.class.name: "com.yahoo.bullet.kafka.KafkaPubSub"
bullet.pubsub.kafka.bootstrap.servers: "server1:port1,server2:port2,..."
bullet.pubsub.kafka.request.topic.name: "your-query-topic"
bullet.pubsub.kafka.response.topic.name: "your-result-topic"

You will then need to configure the Publishers and Subscribers. For details on what to configure and what the defaults are, see the configuration file.

Plug into the Web Service

You will need to head over to our releases page and get the JAR artifact with the fat classifier. For example, you can download the artifact for the 1.3.0 release directly from Maven Central).

You should then plug in this JAR to your Web Service following the instructions here.

For configuration, you should follow the steps here to create and provide a YAML file to the Web Service. Remember to change the context to QUERY_SUBMISSION.

bullet.pubsub.context.name: "QUERY_SUBMISSION"
bullet.pubsub.class.name: "com.yahoo.bullet.kafka.KafkaPubSub"
bullet.pubsub.kafka.bootstrap.servers: "server1:port1,server2:port2,..."
bullet.pubsub.kafka.request.topic.name: "your-query-topic"
bullet.pubsub.kafka.response.topic.name: "your-result-topic"

As with the Backend, you will then need to configure the Publishers and Subscribers. See the configuration file. Remember that your Subscribers in the Backend are reading what the Producers in your Web Service are producing and vice-versa, so make sure to match up the topics and settings accordingly if you have any custom changes.

Passthrough Configuration

You can pass additional Kafka Producer or Consumer properties to the PubSub Publishers and Subscribers by prefixing them with either bullet.pubsub.kafka.producer. for Producers or bullet.pubsub.kafka.consumer. for Consumers. The PubSub configuration uses and provides a few defaults for settings it thinks is important to manage. You can tweak them and add others. For a list of properties that you can configure, see the Producer or Consumer configs in Kafka.

Types for the properties

All Kafka properties are better off specified as Strings since Kafka type casts them accordingly. If you provide types, you might run into issues where YAML types do not match what the Kafka client is expecting.

Partitions

You may choose to partition your topics for a couple of reasons:

  1. You may have one topic for both queries and responses and use partitions as a way to separate them.
  2. You may use two topics and partition one or both for scalability when reading and writing
  3. You may use two topics and partition one or both for sharding across multiple Web Service instances (and multiple instances in your Backend)

You can accomplish all this with partition maps. You can configure what partitions your Publishers (Web Service or Backend) will write to using bullet.pubsub.kafka.request.partitions and what partitions your Subscribers will read from using bullet.pubsub.kafka.response.partitions. Providing these to an instance of the Web Service or the Backend in the YAML file ensures that the Publishers in that instance only write to these request partitions and Subscribers only read from the response partitions. The Publishers will randomly adds one of the response partitions in the messages sent to ensure that the responses only arrive to one of those partitions this instance's Subscribers are waiting on. For more details, see the configuration file. You can choose to disable this latter behavior by disabling bullet.pubsub.kafka.partition.routing.enable and letting Kafka decide which partition to send the message to. This is useful if you are using Bullet for asynchronous queries primarily and you do not care which Web Service instance (assuming you have many) handles the results.

Security

If you're using secure Kafka, you will need to do the necessary metadata setup to make sure your principals have access to your topic(s) for reading and writing. If you're using SSL for securing your Kafka cluster, you will need to add the necessary SSL certificates to the keystore for your JVM before launching the Web Service or the Backend.

Storm

We have tested Kafka with Bullet Storm using Kerberos and SSL from the Storm cluster and SSL from the Web Service.

SSL

For SSL, you will need to configure the various SSL credentials for both your publishers and subscribers. You can either provide a key and truststore with the passwords as required by Kafka. If your SSL keys and certs expire, you will need to update these stores and restart the JVMs using them. If you are using Kafka Client 2.6 and above, we provide an SSLEngineFactory that can automatically refresh your SSL cert/key credentials for your keystore equivalents. Note that the truststore is still provided as a file in the JKS format because it is not expected to change. See below for a sample configuration:

bullet.pubsub.kafka.producer.security.protocol: "SSL"
# This class will automatically refresh certs in the Kafka client producer at a configurable interval
bullet.pubsub.kafka.producer.ssl.engine.factory.class: "com.yahoo.bullet.kafka.CertRefreshingSSLEngineFactory"
bullet.pubsub.kafka.producer.ssl.truststore.location: "my-truststore.jks"
bullet.pubsub.kafka.producer.ssl.truststore.password: "changeit"
bullet.pubsub.kafka.producer.ssl.cert.refreshing.cert.location: "my-cert.pem"
bullet.pubsub.kafka.producer.ssl.cert.refreshing.key.location: "my-key.pem"
bullet.pubsub.kafka.producer.ssl.cert.refreshing.refresh.interval.ms: 3600000

# This class will automatically refresh certs in the Kafka client consumer at a configurable interval
bullet.pubsub.kafka.consumer.security.protocol: "SSL"
bullet.pubsub.kafka.consumer.ssl.engine.factory.class: "com.yahoo.bullet.kafka.CertRefreshingSSLEngineFactory"
bullet.pubsub.kafka.consumer.ssl.truststore.location: "my-truststore.jks"
bullet.pubsub.kafka.consumer.ssl.truststore.password: "changeit"
bullet.pubsub.kafka.consumer.ssl.cert.refreshing.cert.location: "my-cert.pem"
bullet.pubsub.kafka.consumer.ssl.cert.refreshing.key.location: "my-key.pem"
bullet.pubsub.kafka.consumer.ssl.cert.refreshing.refresh.interval.ms: 3600000

Kerberos

For Kerberos, you may need to add a JAAS config file to the Storm BlobStore and add it to your worker JVMs. To do this, you will need a JAAS configuration entry. For example, if your Kerberos KDC is shared with your Storm cluster's KDC, you may be adding a jaas_file.conf with

KafkaClient {
   org.apache.storm.security.auth.kerberos.AutoTGTKrb5LoginModule required
   serviceName="kafka";
};

Put this file into Storm's BlobStore using:

storm blobstore create --file jaas_file.conf --acl o::rwa,u:$USER:rwa --repl-fctr 3 jaas_file.conf

Then while launching your topology, you should provide as arguments to the storm jar command, the following arguments:

-c topology.blobstore.map='{"jaas_file.conf": {} }' \
-c topology.worker.childopts="-Djava.security.auth.login.config=./jaas_file.conf" \

This will add this to all your worker JVMs. You can refresh Kerberos credentials periodically and push credentials to Storm as mentioned here.