Pre Processors
Pre processors enable the users to add Flink operators/transformations before passing on the stream to the SQL query. Each stream registered on dagger can have chained pre processors. They will run and transform the data before SQL processing.
Type of Preprocessors
Currently, there is only one type of pre-processor.
Data flow in preprocessors
In the above diagram:
- Data is getting ingested from two different streams.
- The
InvalidRecordFilterTransformer
is applied on both the streams in order to filter out invalid records. - Then the filtered records are passed to further operators.
- Data is finally pushed to sink.
Configuration
Following variables need to be configured as part of PROCESSOR_PREPROCESSOR_CONFIG JSON
table_transformers
#
A list of transformer configs.
- Example value:
[{"table_name": "testtable","transformers": [{"transformation_class": "InvalidRecordFilterTransformer"}]}]
- Type:
required
table_name
#
Table name for the transformer.
- Example value:
testtable
- Type:
required
transformers
#
List of transformers per table.
- Example value:
[{"transformation_class": "InvalidRecordFilterTransformer"}]
- Type:
required
transformation_class
#
Fully qualified name of the class to be used for transformation.
- Example value:
"InvalidRecordFilterTransformer"
- Type:
required
transformation_arguments
#
A key-value map required for parameters required for the custom transformation class.
- Example value:
{"table_name": "testtable"}
- Type:
optional
Sample config
FLINK_SQL_QUERY = "SELECT data_1, data_2, event_timestamp from data_stream"PROCESSOR_PREPROCESSOR_ENABLE = truePROCESSOR_PREPROCESSOR_CONFIG = { "table_transformers": [{ "table_name": "data_stream", "transformers": [{ "transformation_class": "InvalidRecordFilterTransformer" }] }]}