Performance of Bullet on Spark

This section describes how we tune the performance of Bullet on Spark. This is not meant to be a rigorous benchmark.

Prerequisites

You should be familiar with Spark Streaming, Kafka and the Bullet on Spark architecture.

How was this tested?

All tests run here were using Bullet-Spark 0.1.2.

Tools used

Cluster

Data

Configuration

Here are the configurations we used to launch instances of Bullet Spark.

Settings:

bullet.spark.batch.duration.ms: 2000
bullet.spark.receiver.query.block.size: 1
bullet.result.metadata.enable: true
bullet.spark.metrics.enabled: true
bullet.spark.filter.parallel.enabled: true
bullet.spark.filter.parallelism: 16
bullet.spark.filter.parallel.query.min.size: 10
bullet.spark.query.union.checkpoint.duration.multiplier: 20
bullet.spark.join.checkpoint.duration.multiplier: 20

Command line:

./spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --queue default \
  --executor-memory 12g \
  --executor-cores 2 \
  --num-executors 100 \
  --driver-cores 2 \
  --driver-memory 12g \
  --conf spark.streaming.backpressure.enabled=true \
  --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
  --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
  --conf spark.shuffle.consolidateFiles=true \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.storage.memoryFraction=0.1 \
  --conf spark.shuffle.memoryFraction=0.8 \
  --conf spark.default.parallelism=20 \
  ...

Settings:

bullet.spark.batch.duration.ms: 5000
bullet.spark.receiver.query.block.size: 1
bullet.result.metadata.enable: true
bullet.spark.metrics.enabled: true
bullet.spark.filter.parallel.enabled: true
bullet.spark.filter.parallelism: 64
bullet.spark.filter.parallel.query.min.size: 10
bullet.spark.query.union.checkpoint.duration.multiplier: 20
bullet.spark.join.checkpoint.duration.multiplier: 20

Command line:

./spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --queue default \
  --executor-memory 12g \
  --executor-cores 2 \
  --num-executors 400 \
  --driver-cores 2 \
  --driver-memory 12g \
  --conf spark.streaming.backpressure.enabled=true \
  --conf spark.driver.extraJavaOptions="-XX:+UseG1GC" \
  --conf spark.executor.extraJavaOptions="-XX:+UseG1GC" \
  --conf spark.shuffle.consolidateFiles=true \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.storage.memoryFraction=0.1 \
  --conf spark.shuffle.memoryFraction=0.8 \
  --conf spark.default.parallelism=50 \
  ...

Test 1: Latency of Bullet Spark

This test was done on the smaller data. We used a RAW query without any filtering to measure the latency added by Bullet Spark. This is not the end-to-end latency for a query. It is the latency from receiving the query to finishing the query, not including the time spent in Kafka. We ran this query 100 times.

Result

This graph shows the latency of each attempt:

Bullet Spark Latency

Conclusion

The average latency was 1173 ms. This result shows that this is the fastest Bullet Spark can be. It cannot return data any faster than this for meaningful queries.

Test 2: Scalability for smaller data

This test was done on the smaller data. We want to measure how many queries we can have running simultaneously on Bullet Spark. We ran 400, 800, 1500 and 1100 queries each for 10 minutes.

Result

Figure 1. Spark Streaming UI

ui

Figure 2. Queries running

Queries

Figure 3. CPU time

cpu

Figure 4. Heap usage

heap

Figure 5. Garbage collection time

gc time

Figure 6. Garbage collection count

gc count

Figure 1 shows the Spark Streaming UI when running the test.

Figure 2 shows the simultaneous queries we ran.

Figure 3 shows the milliseconds of CPU time used per minute. For example, a value of 300K ms ms for a line (worker) means that the worker used 300K ms/min or 300s/60s or 5 CPU cores (virtual) in that minute.

Figure 4 shows raw numbers for Heap utilizations in bytes.

Figure 5 shows the time in milliseconds spent for garbage collection per minute.

Figure 6 shows the count of garbage collection events per minute.

Conclusion

The average processing time for each batch was 1 second 143 ms which was below the batch duration 2 seconds. On average, 1 CPU core and 3GB memory were used in this experiment. CPU and memory usages go slowly up while queries number goes up but they are still within resource limits. We can easily run up to 1500 RAW queries simultaneously in this test.

Test 3: Scalability for larger data

This test was done on the larger data. We ran 100, 400, 800 and 600 queries each for 10 minutes.

Result

Figure 7. Spark stream UI

ui

Figure 8. Queries running

Queries

Figure 9. CPU time

cpu

Figure 10. Heap usage

heap

Figure 11. Garbage collection time

gc time

Figure 12. Garbage collection count

gc count

Conclusion

The average processing time for each batch was 3 seconds 97 ms which was below the batch duration 5 seconds. On average, 1.2 CPU core and average 5GB memory were used in this experiment. But with queries number goes up, some of the executors memory usage were up to 8-10GB which is close to our resource limits. With more queries running, OOM may happen. So in this experiment, we can only afford up to 800 queries simultaneously.