Skip to main content

Pre Processors

Pre processors enable the users to add Flink operators/transformations before passing on the stream to the SQL query. Each stream registered on dagger can have chained pre processors. They will run and transform the data before SQL processing.

Type of Preprocessors

Currently, there is only one type of pre-processor.

Data flow in preprocessors

In the above diagram:

  • Data is getting ingested from two different streams.
  • The InvalidRecordFilterTransformer is applied on both the streams in order to filter out invalid records.
  • Then the filtered records are passed to further operators.
  • Data is finally pushed to sink.

Configuration

Following variables need to be configured as part of PROCESSOR_PREPROCESSOR_CONFIG JSON

table_transformers#

A list of transformer configs.

  • Example value: [{"table_name": "testtable","transformers": [{"transformation_class": "InvalidRecordFilterTransformer"}]}]
  • Type: required

table_name#

Table name for the transformer.

  • Example value: testtable
  • Type: required

transformers#

List of transformers per table.

  • Example value: [{"transformation_class": "InvalidRecordFilterTransformer"}]
  • Type: required

transformation_class#

Fully qualified name of the class to be used for transformation.

  • Example value: "InvalidRecordFilterTransformer"
  • Type: required

transformation_arguments#

A key-value map required for parameters required for the custom transformation class.

  • Example value: {"table_name": "testtable"}
  • Type: optional

Sample config

FLINK_SQL_QUERY = "SELECT data_1, data_2, event_timestamp from data_stream"PROCESSOR_PREPROCESSOR_ENABLE = truePROCESSOR_PREPROCESSOR_CONFIG = {  "table_transformers": [{    "table_name": "data_stream",    "transformers": [{      "transformation_class": "InvalidRecordFilterTransformer"    }]  }]}