Bullet DSL
Bullet DSL is a configuration-based DSL that allows users to plug their data into the Bullet Backend. Instead of having users write their own code to set up the Backend on their data, users can now accomplish the same thing by simply providing the appropriate configuration to Bullet.
To support this, Bullet DSL provides two major components. The first is for reading data from a pluggable data source (the connectors for talking to various data sources), and the second is for converting data (the converters for understanding your data formats) into BulletRecords. By enabling Bullet DSL in the Backend and configuring Bullet DSL, your backend will use the two components to read from the configured data source and convert the data into BulletRecords, without you having to write any code.
There is also an optional minor component that acts as the glue between the connectors and the converters. These are the deserializers. They exist if the data coming out of connector is of a format that cannot be understood by a converter. Typically, this happens for serialized data that needs to be deserialized first before a converter can understand it.
The four interfaces that the DSL uses are:
- The BulletConnector : Bullet DSL's reading component
- The BulletDeserializer : Bullet DSL's optional deserializing component
- The BulletRecordConverter : Bullet DSL's converting component
- The Bullet Backend : The implementation of Bullet on a Stream Processor
BulletConnector
BulletConnector is an abstract Java class that can be implemented to read data from different pluggable data sources. As with all our components, we provide and maintain implementations while providing an interface to add new ones. Currently, we support two BulletConnector implementations:
- KafkaConnector for connecting to Apache Kafka
- PulsarConnector for connecting to Apache Pulsar
When using Bullet DSL, you will need to specify the particular BulletConnector to use. For example, if you wanted to use the KafkaConnector, you would add the following to your configuration file:
# The classpath to the BulletConnector to use (need this for Bullet DSL!)
bullet.dsl.connector.class.name: "com.yahoo.bullet.dsl.connector.KafkaConnector"
# The read timeout duration in ms (defaults to 0)
bullet.dsl.connector.read.timeout.ms: 0
# Whether or not to asynchronously commit messages (defaults to true)
bullet.dsl.connector.async.commit.enable: true
All BulletConnector configuration can be found in the default configuration file along with specific default configuration for both implementations.
Note
If you have an unsupported data source and you want to use Bullet DSL, you will have to implement your own BulletConnector. If you do, please do consider contributing it back! Check out the BulletConnector interface here.
KafkaConnector
The KafkaConnector configuration requires a few settings that are necessary to read from Kafka, including the bootstrap servers, group id, and key/value deserializers, and the topics to subscribe to.
# The list of Kafka topics to subscribe to (required)
bullet.dsl.connector.kafka.topics:
- ""
# Whether or not the KafkaConsumer should seek to the end of its subscribed topics at initialization (defaults to false)
bullet.dsl.connector.kafka.start.at.end.enable: false
# Required consumer properties
bullet.dsl.connector.kafka.bootstrap.servers: "localhost:9092"
bullet.dsl.connector.kafka.group.id:
bullet.dsl.connector.kafka.key.deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
bullet.dsl.connector.kafka.value.deserializer:
You can also pass additional Kafka properties to the KafkaConnector by prefixing them with bullet.dsl.connector.kafka.
For a complete list of properties, see the Kafka Consumer configs.
PulsarConnector
The PulsarConnector configuration requires a few settings that are necessary to read from Pulsar. It also provides additional options to enable authentication and/or TLS.
# The list of Pulsar topics to subscribe to (required)
bullet.dsl.connector.pulsar.topics:
- ""
# The Pulsar Schema to use (required)
bullet.dsl.connector.pulsar.schema.type: "BYTES"
# The classpath to the Pulsar Schema to use (required only if using JSON, AVRO, PROTOBUF, or CUSTOM schema)
bullet.dsl.connector.pulsar.schema.class.name:
# Required client property
bullet.dsl.connector.pulsar.client.serviceUrl: "pulsar://localhost:6650"
# Authentication properties (disabled by default)
bullet.dsl.connector.pulsar.auth.enable: false
bullet.dsl.connector.pulsar.auth.plugin.class.name:
bullet.dsl.connector.pulsar.auth.plugin.params.string:
# Required consumer properties
bullet.dsl.connector.pulsar.consumer.subscriptionName: ""
bullet.dsl.connector.pulsar.consumer.subscriptionType: "Shared"
Most important to note is that the connector requires a Pulsar schema whose type can be either BYTES, STRING, JSON, AVRO, PROTOBUF, or CUSTOM (defaults to BYTES). If the schema is any type except CUSTOM, the connector will load the schema natively supported by Pulsar. For JSON, AVRO, and PROTOBUF, the POJO class to wrap must be specified. For a CUSTOM schema, the schema class must be specified instead.
You can also pass additional Pulsar Client and Consumer properties to the PulsarConnector by prefixing them with bullet.dsl.connector.pulsar.client
and bullet.dsl.connector.pulsar.consumer
For both lists of properties, see Pulsar ClientConfigurationData and ConsumerConfigurationData.
BulletRecordConverter
BulletRecordConverter is an abstract Java class that can be implemented to convert different types of data formats into BulletRecords.
Currently, we support three BulletRecordConverter implementations:
- POJOBulletRecordConverter to convert POJOs
- MapBulletRecordConverter for Java Maps of Objects
- AvroBulletRecordConverter for Apache Avro
When using Bullet DSL, you will need to specify the appropriate BulletRecordConverter to use. The converters also support taking in an optional schema (see the [Schema section][#schema] for more details and the benefits to using one). For example, to use AvroBulletRecordConverter, you would add the following to your configuration file:
# The classpath to the BulletRecordConverter to use
bullet.dsl.converter.class.name: "com.yahoo.bullet.dsl.converter.AvroBulletRecordConverter"
# The path to the schema file to use
bullet.dsl.converter.schema.file: "your-schema-file.json"
Supported Converters
At this moment, these are the converters that we maintain. If you do need a specific converter though that is not yet available, you can write your own (and hopefully contribute it back!). Check out the BulletRecordConverter interface here.
POJOBulletRecordConverter
The POJOBulletRecordConverter uses Java Reflection to convert POJOs into BulletRecords. In the configuration, you need to specify the POJO class you want to convert, and when the converter is created, it will inspect the POJO with Reflection. Without a schema, the converter will look through all fields and accept only the fields that have valid types. With a schema, the converter will only look for the fields referenced, but it will also accept getter methods. It is recommended to specify getters as references where possible.
bullet.dsl.converter.pojo.class.name: "com.your.package.YourPOJO"
MapBulletRecordConverter
The MapBulletRecordConverter is used to convert Java Maps of Objects into BulletRecords. Without a schema, it simply inserts every entry in the Map into a BulletRecord without any type-checking. If the Map contains objects that are not types supported by the BulletRecord, you might have issues when serializing the record.
JSONBulletRecordConverter
The JSONBulletRecordConverter is used to convert String JSON representations of records into BulletRecords. Without a schema, it simply inserts every entry in the JSON object into a BulletRecord without any type-checking and it only uses the Double type for all numeric values (since it is unable to guess whether records might need a wider type). You should use a schema and mention the appropriate types if you want more specific numeric types for the fields in your record. If the JSON contains objects that are not types supported by the BulletRecord, you might have issues when serializing the record.
AvroBulletRecordConverter
The AvroBulletRecordConverter is used to convert Avro records into BulletRecords. Without a schema, it inserts every field into a BulletRecord without any type-checking. With a schema, you get type-checking, and you can also specify a RECORD field, and the converter will accept Avro Records in addition to Maps, flattening them into the BulletRecord. This converter also handles container types (such as Maps and Lists) that contain heterogenous nested types as well having more nesting levels than the types we support. It maps then to the appropriate Bullet UNKNOWN_MAP
, UNKNOWN_MAP_MAP
etc types so that queries can still be written that pull out these fields and if they are types that Bullet understands, the query can still execute.
Schema
The schema consists of a list of fields each described by a name, reference, type, and subtype.
name
: The name of the field in the BulletRecordreference
: The field to extract from the to-be-converted objecttype
: The type of the field
When using the schema:
- The
name
of the field in the schema will be the name of the field in the BulletRecord. - The
reference
of the field in the schema is the field/value to be extracted from an object when it is converted to a BulletRecord. - If the
reference
is null, it is assumed that thename
and thereference
are the same. - The
type
must be specified and can be used for type-checking. If you provide a schema and set thebullet.dsl.converter.schema.type.check.enable
setting, then the converter will validate that the types in the source data matches the given type here. Otherwise, the type provided will be assumed. This is useful when initially using the DSL and you are not sure of the types.
Types
- BOOLEAN
- INTEGER
- LONG
- FLOAT
- DOUBLE
- STRING
- BOOLEAN_MAP
- INTEGER_MAP
- LONG_MAP
- FLOAT_MAP
- DOUBLE_MAP
- STRING_MAP
- BOOLEAN_MAP_MAP
- INTEGER_MAP_MAP
- LONG_MAP_MAP
- FLOAT_MAP_MAP
- DOUBLE_MAP_MAP
- STRING_MAP_MAP
- BOOLEAN_LIST
- INTEGER_LIST
- LONG_LIST
- FLOAT_LIST
- DOUBLE_LIST
- STRING_LIST
- BOOLEAN_MAP_LIST
- INTEGER_MAP_LIST
- LONG_MAP_LIST
- FLOAT_MAP_LIST
- DOUBLE_MAP_LIST
- STRING_MAP_LIST
Special Type for a RECORD
There is a special case where if you omit the type
and the name
for an entry in the schema, the reference is assumed to be a map containing arbitrary fields with types in the list above. You can use this if you have a map field that contains various objects with one or more types in the list above and want to flatten that map out into the target record using the respective types of each field in the map. The names of the fields in the map will be used as the top-level names in the resulting record.
Example Schema
{
"fields": [
{
"name": "myBool",
"type": "BOOLEAN"
},
{
"name": "myBoolMap",
"type": "BOOLEAN_MAP"
},
{
"name": "myLongMapMap",
"type": "LONG_MAP_MAP"
},
{
"name": "myIntFromSomeMap",
"reference": "someMap.myInt",
"type": "INTEGER"
},
{
"name": "myIntFromSomeIntList",
"reference": "someIntList.0",
"type": "INTEGER"
},
{
"name": "myIntFromSomeNestedMapsAndLists",
"reference": "someMap.nestedMap.nestedList.0",
"type": "INTEGER"
},
{
"reference" : "someMap"
}
]
}
BulletDeserializer
BulletDeserializer is an abstract Java class that can be implemented to deserialize/transform output from BulletConnector to input for BulletRecordConverter. It is an optional component and whether it's necessary or not depends on the output of your data sources. If one is not needed, the IdentityDeserializer
can be used. For example, if your KafkaConnector outputs byte arrays that are actually Java-serialized Maps, and you're using a MapBulletRecordConverter, you would use the JavaDeserializer, which would deserialize byte arrays into Java Maps for the converter.
Currently, we support two BulletDeserializer implementations:
For Bullet DSL, if you wanted to use AvroDeserializer, you would add the following to your configuration file:
# The classpath to the BulletDeserializer to use
bullet.dsl.deserializer.class.name: "com.yahoo.bullet.dsl.deserializer.AvroDeserializer"
JavaDeserializer
The JavaDeserializer uses Java Serialization to deserialize (Java-serialized) byte arrays into objects.
AvroDeserializer
The AvroDeserializer uses Avro to deserialize (Avro-serialized) byte arrays into Avro GenericRecords.
The deserializer must be given the Avro schema for the Avro records you want to deserialize. In the configuration, you can either provide the Avro schema file (note the file://
prefix) or the Avro class itself (the class must be in the classpath).
# The path to the Avro schema file to use prefixed by "file://"
bullet.dsl.deserializer.avro.schema.file: "file://example.avsc"
# The class name of the Avro record class to deserialize
bullet.dsl.deserializer.avro.class.name: "com.your.package.YourAvro"