Example Queries
Dagger uses Apache Calcite as the querying framework. Find the documentation for the same here. Templates of some of the commonly used types of queries are listed down.
#
Sample input schema#
Sample booking event schemamessage SampleBookingInfo { string order_number = 1; string order_url = 2; string status = 3; google.protobuf.Timestamp event_timestamp = 4; string customer_id = 5; string driver_id = 6; string service_type = 7; string service_area_id = 8; repeated Item items = 9; map<string, string> metadata = 10;}
message Item { string id = 1; int32 quantity = 2; string name = 3;}
#
Sample payment event schemamessage SamplePaymentInfo { string order_id = 1; string customer_id = 2; google.protobuf.Timestamp event_timestamp = 3; int64 order_amount = 4;}
#
Influx Sink- While using Dagger with InfluxDB sink,
tag_
should be appended to the beginning of those columns which you want as dimensions. Dimensions will help you slice the data in InfluxDB-Grafana. InfluxDB tags are essentially the columns on which data is indexed. Find more on influxDB tags here. - DO NOT use
tag_
for high cardinal data points such as customer_id, merchant_id etc unless you provide a filtering condition; this will create tag explosion & affect the InfluxDB. - Ensure there is at least one value field present in the query(not starting with
tag_
). - In case you want your dimensions without the prefix
tag_
you can uselabel_
prefix. The name of the dimension will not havetag_
orlabel_
prefix.
#
Example queryHere booking denotes the booking events stream with sample booking schema.
SELECT order_number, service_type as tag_service_id, statusfrom `booking`
#
Kafka SinkTag_
prefix should not be used before the dimensions.- Ensure that sink type is selected as Kafka.
- Dimensions & metrics from the SELECT section in the query should be mapped exactly to the field names in the output proto.
- Data types of the selected fields should exactly match to the output proto fields.
- Unlike Influx sink Dagger, high cardinality should not be an issue in Kafka sink.
#
Example queryHere booking denotes the booking events stream with sample booking schema.
SELECT order_number, service_type, statusfrom `booking`
#
BigQuery SinkTag_
prefix should not be used before the dimensions.- Ensure that sink type is selected as bigquery.
- Dimensions & metrics from the SELECT section in the query should be mapped exactly to the field names in the output protobuf type configured for bigquery sink
- Data types of the selected fields should exactly match to the output protobuf type configured for bigquery sink
#
Example queryHere booking denotes the booking events stream with sample booking schema.
SELECT order_number, service_type, statusfrom `booking`
#
Aggregation - Tumble window- Use this for aggregating data points using a TUMBLE window function (data aggregated for all points in the window at the end of each cycle).
- Use the windowing function for Tumble window using the keyword TUMBLE_END(datetime, INTERVAL 'duration' unit) & TUMBLE (datetime, INTERVAL 'duration' unit ) in SELECT & GROUP BY section respectively (duration is a number & unit can be SECOND/MINUTE/HOUR).
#
Example queryHere booking denotes the booking events stream with sample booking schema.
SELECT count(1) as booking_count, TUMBLE_END(rowtime, INTERVAL '60' SECOND) AS window_timestampfrom `booking`GROUP BY TUMBLE (rowtime, INTERVAL '60' SECOND)
#
Aggregation - Hop window- Use this for aggregating data points using a HOP window function (data aggregated at every slide internal for all points in the window interval).
- Use the windowing function for Hop window using the keyword HOP_END(datetime, SLIDE_INTERVAL 'duration' unit, WINDOW_INTERVAL 'duration' unit) & HOP (datetime, SLIDE_INTERVAL 'duration' unit, WINDOW_INTERVAL 'duration' unit) in SELECT & GROUP BY section respectively (both slide interval & window interval are numbers & units can be SECOND/MINUTE/HOUR).
#
Example queryHere booking denotes the booking events stream with sample booking schema.
SELECT service_area_id AS tag_service_area_id, count(1) AS number_of_bookings, HOP_END( rowtime, INTERVAL '60' SECOND, INTERVAL '1' HOUR ) AS window_timestampFROM `booking`GROUP BY HOP( rowtime, INTERVAL '60' SECOND, INTERVAL '1' HOUR ), service_area_id
#
Subquery/Inner Query- You can use as many inner queries as required.
- In case you want to use window aggregations both in inner and outer query, use TUMBLE_ROWTIME to get the output rowtimes from the inner window.
#
Example queryHere booking denotes the booking events stream with sample booking schema.
SELECT booking_count, cancelled_order, window_timestampFROM ( SELECT count(1) as booking_count, cast( ( Sum( Case When status in ( 'CUSTOMER_CANCELLED' ) Then 1 Else 0 End ) ) as float ) as cancelled_order, TUMBLE_END(rowtime, INTERVAL '60' SECOND) AS window_timestamp from `booking` GROUP BY TUMBLE (rowtime, INTERVAL '60' SECOND) )
#
Feast Row TransformationThis sample query is for transforming data to Feature rows for Feast using Features UDF.
#
Example queryHere booking denotes the booking events stream with sample booking schema.
SELECT 'MINUTE' AS granularity, customer_id AS entityKey, 'customers' AS entityName, Features( LOWER(service_type), customer_id ) AS features, TUMBLE_END(rowtime, INTERVAL '1' MINUTE) AS eventTimestampFROM `booking`GROUP BY TUMBLE (rowtime, INTERVAL '1' MINUTE), customer_id
#
Inner Join- In order to keep state size in check, we recommend using time interval joins.
- Time interval should be in the format table2.rowtime BETWEEN table1.rowtime - INTERVAL 'first duration' unit AND table1.rowtime + INTERVAL 'second duration' unit (both durations are numbers & units can be SECOND/MINUTE/HOUR)
#
Example query(no with)Here booking denotes the booking events stream with sample booking schema and payment denotes payment stream with sample payment schema.
SELECT booking.service_type as tag_service_type, count(order_number) as number_of_orders, sum(order_amount) as total_amount, TUMBLE_END(booking.rowtime, INTERVAL '60' MINUTE) AS window_timestampfrom `booking` join `payment` ON booking.order_number = payment.order_id AND payment.rowtime BETWEEN booking.rowtime AND booking.rowtime + INTERVAL '5' MINUTEGROUP BY TUMBLE (booking.rowtime, INTERVAL '60' MINUTE), booking.service_type
#
Example query(using with)Here booking denotes the booking events stream with sample booking schema and payment denotes payment stream with sample payment schema.
WITH booking_info AS ( SELECT service_type, customer_id, order_number, rowtime FROM `booking`),payment_info AS ( SELECT order_amount order_id, rowtime FROM `payment`)SELECT booking_info.service_type as tag_service_type, count(booking_info.order_number) as number_of_orders, sum(payment_info.order_amount) as total_amount, TUMBLE_END(booking_info.rowtime, INTERVAL '60' MINUTE) AS window_timestampfrom `booking_info` join `payment_info` ON booking_info.order_number = payment_info.order_id AND payment_info.rowtime BETWEEN booking_info.rowtime AND booking_info.rowtime + INTERVAL '5' MINUTEGROUP BY TUMBLE (booking_info.rowtime, INTERVAL '60' MINUTE), booking_info.service_type
#
UnionIn union, the fields selected from 2 streams can be combined to create a new stream. Ensure that the same field names are present on both the stream select outputs.
#
Example queryHere booking denotes the booking events stream with sample booking schema and payment denotes payment stream with sample payment schema.
SELECT customer_id, service_type, order_number, service_area_id, event_timestampFROM `booking`UNION ALL (SELECT customer_id, '' AS service_type, order_id AS order_number, '' AS service_area_id, event_timestamp FROM `payment`)
#
Unnest- Use Unnest to flatten arrays present in the schema
- In the FROM section, use the Unnest function after the table name in the following syntax FROM booking, UNNEST(booking.items) AS items (id, quantity, name)
- All fields from the array object should be added while unnesting
In the above example, to retrieve the unnested fields, use items.name, items.quantity etc.
#
Example query(list)Here booking denotes the booking events stream with sample booking schema.
SELECT tag_itemname, num_orders, window_timestampFROM ( SELECT items.name as tag_itemname, count(1) as num_orders, TUMBLE_END(rowtime, INTERVAL '5' MINUTE) AS window_timestamp FROM booking, UNNEST(booking.items) AS items (id, quantity, name) GROUP BY TUMBLE (rowtime, INTERVAL '5' MINUTE), items.name )WHERE num_orders > 100
#
Example query(map)In Dagger, we deserialize maps also as list so unnest works in the following way.
Here booking denotes the booking events stream with sample booking schema.
SELECT index_map.`key` AS data_in_key, index_map.`value` AS data_in_value,FROM booking, UNNEST(booking.metadata) AS index_map (`key`, `value`)