public class Querier extends Object implements Monoidal
RunningQuery
that is currently being executed. It can consume(BulletRecord)
records for the
query, and combine(byte[])
serialized data from another instance of the running query. It can also merge
itself with another instance of running query using Monoidal.merge(Monoidal)
. Use finish()
to retrieve the
final results and terminate the query.
Ideally to implement Bullet, you would parallelize into two stages:
Querier
objects for that query (identified by the ID)
If no, create an instance of Querier
for the query in Querier.Mode.PARTITION
mode if and only if you are
going to be persisting the querier for the duration of the query. If you are throwing away the querier, such as
after processing your partitioned data in mini-batches and recreating it every new mini-batch, then you need not
change the mode. If any exceptions or errors initializing, throw away the querier since the errors are handled in
the Join stage below.
BulletRecord
, call consume(BulletRecord)
on all the Querier
objects
unless isDone()
isDone()
, call getData()
and also remove the Querier
.
isClosed()
, use getData()
to emit the intermediate data to the Join stage for
the query ID. Then, call reset()
.
isClosed()
,
you should check if isExceedingRateLimit()
is true after calling getData()
. If yes, you should
cancel the query and emit a RateLimitError to the Join stage to kill the query. You can use getRateLimitError()
to get the RateLimitError
to pass to the Join stage.
isDone()
and
isClosed()
and do the emits either on a timer or at fixed intervals so that your queries
are checked for results and maintain their windowing guarantees.
hasNewData()
to check if there is any new data to emit if you need to know a successful
consumption or combining happened.
If you do not want to call getData()
, you can serialize Querier using non-native serialization frameworks
and use Monoidal.merge(Monoidal)
in the Join stage to merge them into an empty Querier for the query. This will be
equivalent to calling combine(byte[])
on getData()
.
(String id, String queryBody, Metadata metadata) = Query if (metadata.hasSignal(Signal.KILL) || metadata.hasSignal(Signal.COMPLETE)) remove Querier for id else create new Querier(id, queryBody, config) and initialize it;
for Querier q : queriers: if (q.isDone()) emit(q.getData()) remove q else if (q.isClosed()) emit(q.getData()) q.reset() q.consume(record) if (q.isExceedingRateLimit()) emit(q.getRateLimitError()) remove q
for Querier q : queriers: if (q.isDone()) emit(q.getData()) remove q if (q.isClosed()) emit(q.getData()) q.reset() if (q.isExceedingRateLimit()) emit(q.getRateLimitError()) remove q
Querier
for the query in Querier.Mode.ALL
mode. If any exceptions
or errors initializing it, make BulletError objects from them and return them as a Clip
back through the PubSub.
finish()
, and add to the Meta
a
RateLimitError
. Emit this through the regular PubSub Publisher for results. Also emit a KILL signal
PubSubMessage to a Publisher for queries so that it is fed back to the Filter stage.
combine(byte[])
for the querier for that id.
isDone()
, call getResult()
to emit the final result and remove the querier. Emit a
COMPLETE signal to the PubSub Publisher for queries to feed back the complete status to the Filter stage.
isClosed()
, use getResult()
()} to emit the intermediate result and call reset()
isExceedingRateLimit()
, you can use
getRateLimitError()
and then RateLimitError.makeMeta()
to create and emit a
Clip.add(Meta)
. Make sure to remove the querier and send a KILL signal to a PubSub Publisher for queries
to feed back the kill status to the Filter stage.
isDone()
and isClosed()
periodically if your
data volume is too low.
isClosed()
at the
same time the data from the Filter stage partitions arrive, you should not immediately emit getResult()
if
isClosed()
and then reset()
. There are two ways to handle this. You could delay the start of the
query by a bit in the Join stage so that windows from the Filter stage always arrive a bit earlier. Or you could
buffer the results in the Join stage for a bit for each window as results trickle in. The issue with the latter
approach is that you will slowly add the buffer time to the duration of your windows in your Join stage and
eventually you will get two windows in one. The former approach does not have this problem. However, that approach
could lead to results that are sent immediately (for record based windows) being dropped while the delay is
happening. To solve these issues, you should buffer the final results for all queries for whom shouldBuffer()
is true. This should be true for time-based windows and false for all record-based windows or queries with no
window. So you can use the negation of shouldBuffer()
to find out if the latter queries can be delayed.
This delay will ensure that results from the filter phase are collected in their entirety before emitting from the
Join phase. To aid you in doing this, you can buffer it and use restart()
to mark the delayed start of the
query.
(String id, String queryBody, Metadata metadata) = Query if (metadata.hasSignal(Signal.KILL)) remove Querier for id return try { create new Querier(id, queryBody, config) initialize it (see note above regarding delaying start) and if errors present: emit(Clip.of(Meta.of(errors.get())); catch (Exception e) { Clip clip = Clip.of(Meta.of(asList(BulletError.makeError(e, queryBody))) emit(clip)
(String id, RateLimitError error) = KILL message querier = Querier for id Clip clip = querier.finish() clip.add(Meta.of(error)) emit(clip) queryPubSubPublisher.emit((id, KILL signal)) remove querier
(String id, byte[] data) = Data querier = Querier for id querier.combine(data) if (querier.isDone()) Clip clip = querier.finish() emit(clip) queryPubSubPublisher.emit( else if (querier.isClosed()) Clip clip = querier.getResult() // See note above regarding buffering if querier.shouldBuffer() emit(clip) querier.reset() if (querier.isExceedingRateLimit()) Clip clip = merge q.finish() with q.getRateLimitError() emit(clip) queryPubSubPublisher.emit((id, KILL signal)) remove q
for Querier q : queriers: if (q.isDone()) emit(q.finish()) remove q if (q.isClosed()) emit(q.getResult()) q.reset() if (q.isExceedingRateLimit()) Clip clip = merge q.finish() with q.getRateLimitError() emit(clip) queryPubSubPublisher.emit((id, KILL signal)) remove q
Modifier and Type | Class and Description |
---|---|
static class |
Querier.Mode
This is used to determine if this operates in partitioned mode or not.
|
Modifier and Type | Field and Description |
---|---|
static String |
TRY_AGAIN_LATER |
Constructor and Description |
---|
Querier(Querier.Mode mode,
RunningQuery query,
BulletConfig config)
Constructor that takes a
Querier.Mode , RunningQuery instance and a configuration to use. |
Querier(RunningQuery query,
BulletConfig config)
Constructor that takes a
RunningQuery instance and a configuration to use. |
Modifier and Type | Method and Description |
---|---|
void |
combine(byte[] data)
Presents the query with a serialized data representation of a prior result for the query.
|
void |
consume(com.yahoo.bullet.record.BulletRecord record)
Consume a
BulletRecord for this query. |
Clip |
finish()
Terminate the query and return the final result.
|
byte[] |
getData()
Get the result emitted so far after the last window.
|
Meta |
getMetadata()
Returns the
Meta of the result so far. |
Query |
getQuery()
Gets the
Query instance for this Query. |
RateLimitError |
getRateLimitError()
Returns a
RateLimitError if the rate limit had exceeded the rate from a prior call to
isExceedingRateLimit() . |
List<com.yahoo.bullet.record.BulletRecord> |
getRecords()
Returns the
List of BulletRecord result so far. |
Clip |
getResult()
Gets the resulting
Clip of the results so far. |
boolean |
hasNewData()
Returns whether there is any new data to emit at all since the last
reset() . |
boolean |
isClosed()
Depending on the
Querier.Mode.ALL mode this is operating in, returns true if and only if the query window is
closed and you should emit the result at this time. |
boolean |
isDone()
Returns true if the query has expired and will never accept any more data.
|
boolean |
isExceedingRateLimit()
Returns whether this is exceeding the rate limit.
|
void |
reset()
Resets this object.
|
void |
restart()
Forces a restart of a valid query to mark the
correct start of this object if it was previously created but delayed in starting it (by using the negation of
shouldBuffer() . |
boolean |
shouldBuffer()
Returns if this query should buffer before emitting the final results.
|
String |
toString() |
public static final String TRY_AGAIN_LATER
public Querier(RunningQuery query, BulletConfig config)
RunningQuery
instance and a configuration to use. This also starts executing
the query.query
- The running query.config
- The validated BulletConfig
configuration to use.public Querier(Querier.Mode mode, RunningQuery query, BulletConfig config)
Querier.Mode
, RunningQuery
instance and a configuration to use.
This also starts executing the query.mode
- The mode for this querier.query
- The running query.config
- The validated BulletConfig
configuration to use.public void restart()
shouldBuffer()
. You might be using this if you were delaying the start of the query in the
Join phase. This does not revalidate the query or reset any data this might have already consumed.public void consume(com.yahoo.bullet.record.BulletRecord record)
BulletRecord
for this query. The record may or not be actually incorporated into the query
results. This depends on whether the query can accept more data, if it is expired or not or if the record matches
any query filtering criteria.public void combine(byte[] data)
isClosed()
or isDone()
.public byte[] getData()
public List<com.yahoo.bullet.record.BulletRecord> getRecords()
getRecords
in interface Monoidal
public Meta getMetadata()
Meta
of the result so far. See getResult()
for the full result with the data.getMetadata
in interface Monoidal
public boolean isClosed()
Querier.Mode.ALL
mode this is operating in, returns true if and only if the query window is
closed and you should emit the result at this time.public void reset()
getResult()
or getData()
after
verifying whether this is isClosed()
.public Query getQuery()
Query
instance for this Query.Query
instance for this object.public boolean isDone()
public boolean hasNewData()
reset()
. Use this method if you are
driving how data is consumed by this instance (for instance, microbatches) and need to emit data outside the
windowing standards.public boolean isExceedingRateLimit()
public RateLimitError getRateLimitError()
RateLimitError
if the rate limit had exceeded the rate from a prior call to
isExceedingRateLimit()
.public boolean shouldBuffer()
isDone()
.public Clip finish()
Clip
representing the final result.Copyright © 2021. All rights reserved.