Skip to main content

Transformers

This page contains references for all the custom transformers available on Dagger.

List of Transformers#

ClearColumnTransformer#

  • Transformation Class:
    • ClearColumnTransformer
  • Contract:
    • After Selecting columns by SQL, you need to reselect the desired columns with the help of an internal source. Following transformation arguments can be passed:
      • targetColumnName: The field that needs to be cleared.
  • Functionality:
    • Allows clearing the specified column of data produced by the dagger.
    • Can be used only on post-processor
  • Example:
    • SQL:
      SELECT  event_timestamp,  data1,  data2FROM  data_stream
    • POST PROCESSOR CONFIG:
      {  "internal_source": [    {      "output_field": "*",      "value": "*",      "type": "sql"    }  ],  "transformers": [    {      "transformation_class": "ClearColumnTransformer",      "transformation_arguments": {        "targetColumnName": "data1"      }    }  ]}

DeDuplicationTransformer#

  • Transformation Class:
    • DeDuplicationTransformer
  • Contract:
    • After Selecting columns by SQL, you need to reselect the desired columns with the help of an internal source. Following transformation arguments can be passed:
      • key_column: This value will be used as the deduplication key (other events with the same key will be stopped).
      • ttl_in_seconds: The TTL configuration will decide how long to keep the keys in memory. Once the keys are cleared from memory the data with the same keys will be sent again.
  • Functionality:
    • Allows deduplication of data produced by the dagger i.e records with the same key will not be sent again till the TTL expires.
    • Can be used both on post-processor and pre-processor
  • Example:
    • SQL:
      SELECT  data1,  data2FROM  data_stream
    • POST PROCESSOR CONFIG:
      {  "internal_source": [    {      "output_field": "data1",      "value": "data1",      "type": "sql"    },    {      "output_field": "data2",      "value": "data2",      "type": "sql"    }  ],  "transformers": [    {      "transformation_arguments": {        "key_column": "data1",        "ttl_in_seconds": "3600"      },      "transformation_class": "DeDuplicationTransformer"    }  ]}

FeatureTransformer#

  • Transformation Class:
    • FeatureTransformer
  • Contract:
    • After Selecting columns by SQL, you need to reselect the desired columns with the help of an internal source. Following transformation arguments can be passed:
      • keyColumnName: This value will be used to form the key of the feature.
      • valueColumnName: This value will be used as a value in the feature.
  • Functionality:
    • Converts to feast Features from post processors.
    • Can be used only on post-processor
  • Example:
    • SQL:
      SELECT   data1,   features FROM  data_stream
    • POST PROCESSOR CONFIG:
      {  "internal_source": [    {      "output_field": "*",      "value": "*",      "type": "sql"    }  ],  "transformers": [    {      "transformation_arguments": {        "keyColumnName": "data1",        "valueColumnName": "features"      },      "transformation_class": "FeatureTransformer"    }  ]}

FeatureWithTypeTransformer#

  • Transformation Class:
    • FeatureWithTypeTransformer
  • Contract:
    • After Selecting columns by SQL, you need to reselect the desired columns with the help of an internal source. Following transformation arguments can be passed:
      • outputColumnName: The column where the final feature will be written and FeatureRow are synonyms with FeaturesWithType UDF and a single feature is represented by an element in an array.
  • Functionality:
    • Converts to feast Features from post processors. This is required to do aggregation and feature transformation from a single dagger.
    • Can be used only on post-processor
  • Example:
    • SQL:
      SELECT  data1,  data2FROM  data_stream
    • POST PROCESSOR CONFIG:
      {  "internal_source": [    {      "output_field": "features",      "value": "test",      "type": "constant"    },    {      "output_field": "data1",      "value": "data1",      "type": "sql"    },    {      "output_field": "data2",      "value": "data2",      "type": "sql"    }  ],  "transformers": [    {      "transformation_class": "FeatureTransformer",      "transformation_arguments": {        "outputColumnName": "features",        "data": [          {            "keyColumnName": "data1",            "valueColumnName": "data2",            "type": "StringType"          }        ]      }    }  ]}

HashTransformer#

  • Transformation Class:
    • HashTransformer
  • Contract:
    • After Selecting columns by SQL, you need to reselect the desired columns with the help of an internal source. Following transformation arguments can be passed:
      • maskColumns: A list of fields that need to be encrypted/masked.
  • Functionality:
    • Enables encryption on a set of fields as configured. Used in Data forwarding daggers to clone production data to integration environments with encryption on sensitive data fields. We are using SHA-256 hashing to encrypt data.
    • Can be used only on post-processor
  • Limitations:
    • Currently support masking on Non-Complex Fields of Data type Integer, Big Integer, and String. However, you can encrypt nested fields of complex data using . notations. For example test_data.customer_id is a valid argument which will encrypt the customer_id inside test_data.
    • All other Data types including Arrays, complex fields, and other primitive types like boolean are not supported.
  • Example:
    • SQL:
      SELECT  event_timestamp,  test_dataFROM  data_stream
    • POST PROCESSOR CONFIG:
      {  "internal_source": [    {      "output_field": "*",      "value": "*",      "type": "sql"    }  ],  "transformers": [    {      "transformation_class": "HashTransformer",      "transformation_arguments": {        "maskColumns": [          "test_data.data1"        ]      }    }  ]}

InvalidRecordFilterTransformer#

  • Transformation Class:
    • InvalidRecordFilterTransformer
  • Contract:
    • Following transformation arguments can be passed:
      • transformation_arguments: A key-value map required for parameters required for the custom transformation class.
  • Functionality:
    • Filter the invalid records produced by dagger.
    • Can be used only on pre-processor
  • Example:
    • SQL:
      SELECT   data1,   data2,   event_timestamp FROM   data_stream
    • PRE PROCESSOR CONFIG:
      {  "table_transformers": [    {      "table_name": "testtable",      "transformers": [        {          "transformation_class": "InvalidRecordFilterTransformer",          "transformation_arguments": "testtable"        }      ]    }  ]}

SQLTransformer#

  • Transformation Class:
    • SQLTransformer
  • Contract:
    • After Selecting columns by SQL, you need to reselect the desired columns with the help of an internal source. Following transformation arguments can be passed:
      • sqlQuery: The SQL query for transformation
      • tableName(optional): The table name to be used in the above SQL(default: data_stream)
      • allowedLatenessInMs(optional): The allowed lateness for the events streaming in Kafka(default: 0))
  • Functionality:
    • Enables applying a SQL transformation on top of streaming data in post processors. Primarily useful if users want to apply SQL transformation/aggregation using fields added via External/Internal Post Processors.
    • Can be used only on post-processor
  • Example:
    • SQL:
      SELECT  data1,  data2,  rowtimeFROM  data_stream
    • POST PROCESSOR CONFIG:
      {  "internal_source": [    {      "output_field": "data1",      "value": "data1",      "type": "sql"    },    {      "output_field": "rowtime",      "value": "rowtime",      "type": "sql"    },    {      "output_field": "data2",      "value": "data2",      "type": "sql"    }  ],  "transformers": [    {      "transformation_class": "SQLTransformer",      "transformation_arguments": {        "sqlQuery": "SELECT count(distinct data1) AS `count`, data2, TUMBLE_END(rowtime, INTERVAL '60' SECOND) AS event_timestamp FROM data_stream group by TUMBLE (rowtime, INTERVAL '60' SECOND), data2"      }    }  ]}