Security
The primary goals of the Dagger security needs are to enable secure data access for jobs within a cluster from source.
Supported secure data access sources
We currently support secure data access from kafka source using simple authentication security layer(SASL) and SSL encryption for data-in-transit between the source kafka and the dagger client.
#
Supported SASL authentication methods and mechanismsSASL (Simple Authentication Security Layer) is a framework that provides developers of applications and shared libraries with mechanisms for authentication, data integrity-checking, and encryption.
Dagger currently support SASL based authentication with KAFKA_SOURCE
and KAFKA_CONSUMER
sources using -
Note: You must configure your Kafka cluster to enable SASL authentication. See the Kafka documentation for your Kafka version to learn how to configure SASL authentication.
#
ConfigurationsTo consume data from SASL/SCRAM or SASL/PLAIN authentication enabled kafka, following variables need to be configured as part of STREAMS JSON
SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL
#
Defines the security protocol used to communicate with ACL enabled kafka. Dagger supported values are -
- SASL_SSL: if SSL encryption is enabled
- SASL_PLAINTEXT: if SSL encryption is not enabled
- Example value:
SASL_PLAINTEXT
- Type:
optional
required only for ACL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM
#
Defines the Simple Authentication and Security Layer (SASL) mechanism used for client connections with ACL enabled kafka. Dagger supported values are: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512.
- Configure PLAIN, if kafka cluster is configured with simple username/password authentication mechanism.
- Configure SCRAM-SHA-256 or SCRAM-SHA-512 if kafka cluster is configured with Salted Challenge Response Authentication Mechanism (SCRAM).
- Example value:
SCRAM-SHA-512
- Type:
optional
required only for ACL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG
#
Defines the SASL Java Authentication and Authorization Service (JAAS) Config used for JAAS login context parameters for SASL connections in the format used by JAAS configuration files. JAAS configuration file format is described here.
There are two ways to configure KAFKA_CONSUMER
or KAFKA_SOURCE
to provide the necessary information for JAAS:
- Specify the JAAS configuration using the
SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG
configuration property - Pass a static JAAS configuration file into the flink-conf.yaml using the
env.java.opts: -Djava.security.auth.login.config=<jaas_file_path>/<jaas_file_name>
property, at runtime dagger job will use static JAAS config details configured with the flink TaskManager JVM.
If a Dagger specifies both the client property SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG
and the static JAAS configuration system property java.security.auth.login.config
, then the client property SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG
will be used.
- Example value:
org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";
- Type:
optional
required only for ACL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
if static JAAS configuration system propertyjava.security.auth.login.config
is not configured in flink cluster.
#
ExampleSTREAMS config to consume data from the SASL/SCRAM authentication enabled kafka -
STREAMS = [ { "SOURCE_KAFKA_TOPIC_NAMES": "test-topic", "INPUT_SCHEMA_TABLE": "data_stream", "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost:9092", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group", "SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL": "SASL_PLAINTEXT", "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM": "SCRAM-SHA-512", "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkaclient1\" password=\"kafkaclient1-secret\";", "SOURCE_KAFKA_NAME": "local-kafka-stream", "SOURCE_DETAILS": [ { "SOURCE_TYPE": "UNBOUNDED", "SOURCE_NAME": "KAFKA_CONSUMER" } ] }]
STREAMS config to consume data from multiple kafka source-
STREAMS = [ { "SOURCE_KAFKA_TOPIC_NAMES": "test-topic-0", "INPUT_SCHEMA_TABLE": "data_stream_0", "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost-1:9092", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group", "SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL": "SASL_PLAINTEXT", "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM": "SCRAM-SHA-512", "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkaclient1\" password=\"kafkaclient1-secret\";""SOURCE_KAFKA_NAME": "local-kafka-stream", "SOURCE_KAFKA_NAME": "local-kafka-stream-0", "SOURCE_DETAILS": [ { "SOURCE_TYPE": "UNBOUNDED", "SOURCE_NAME": "KAFKA_CONSUMER" } ] }, { "SOURCE_KAFKA_TOPIC_NAMES": "test-topic-1", "INPUT_SCHEMA_TABLE": "data_stream_1", "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost-2:9092", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group", "SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL": "SASL_SSL", "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_MECHANISM": "SCRAM-SHA-512", "SOURCE_KAFKA_CONSUMER_CONFIG_SASL_JAAS_CONFIG": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"kafkaclient2\" password=\"kafkaclient2-secret\";", "SOURCE_KAFKA_NAME": "local-kafka-stream-1", "SOURCE_DETAILS": [ { "SOURCE_TYPE": "UNBOUNDED", "SOURCE_NAME": "KAFKA_CONSUMER" } ] }, { "SOURCE_KAFKA_TOPIC_NAMES": "test-topic-2", "INPUT_SCHEMA_TABLE": "data_stream-2", "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost:9092", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group", "SOURCE_KAFKA_NAME": "local-kafka-stream-2", "SOURCE_DETAILS": [ { "SOURCE_TYPE": "UNBOUNDED", "SOURCE_NAME": "KAFKA_CONSUMER" } ] } ]
#
Encryption and Authentication using SSLSSL is used for encryption of traffic as well as authentication. By default, SSL is disabled in dagger kafka source but can be turned on if needed.
Dagger currently support SSL based encryption and authentication with KAFKA_SOURCE
and KAFKA_CONSUMER
.
Note: You must configure your Kafka cluster to enable encryption and authentication using SSL. See the Kafka documentation for your Kafka version to learn how to configure SSL encryption of traffic as well as authentication.
#
ConfigurationsTo consume data from SSL/TLS enabled kafka, following variables need to be configured as part of STREAMS JSON
SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL
#
Defines the security protocol used to communicate with SSL enabled kafka. Other than SASL supported values, as mentioned above, Dagger supports,
SSL
: to enable SSL/TLS for encryption and authenticationExample value:
SSL
Type:
optional
required only for SSL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
SOURCE_KAFKA_CONSUMER_CONFIG_SSL_PROTOCOL
#
Defines the security protocol used to communicate with SSL enabled kafka. Find more details on this config here Dagger supported values are: TLSv1.2, TLSv1.3, TLS, TLSv1.1, SSL, SSLv2 and SSLv3
Example value 1:
SSL
Type:
optional
required only for SSL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
Example value 2:
TLS
Type:
optional
required only for TLS enabledKAFKA_CONSUMER
orKAFKA_SOURCE
SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEY_PASSWORD
#
Defines the SSL Key Password for Kafka source. Find more details on this config here
- Example value:
myKeyPass
- Type:
optional
required only for SSL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION
#
Defines the SSL KeyStore location or path for Kafka source. Find more details on this config here
- Example value:
/tmp/myKeyStore.jks
- Type:
optional
required only for SSL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD
#
Defines the SSL KeyStore password for Kafka source. Find more details on this config here
- Example value:
myKeyStorePass
- Type:
optional
required only for SSL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE
#
Defines the SSL KeyStore Type like JKS, PKCS12 etc for Kafka source. Find more details on this config here Dagger supported values are: JKS, PKCS12, PEM
- Example value:
JKS
- Type:
optional
required only for SSL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_LOCATION
#
Defines the SSL TrustStore location or path for Kafka source. Find more details on this config here
- Example value:
/tmp/myTrustStore.jks
- Type:
optional
required only for SSL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD
#
Defines the SSL TrustStore password for Kafka source. Find more details on this config here
- Example value:
myTrustStorePass
- Type:
optional
required only for SSL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE
#
Defines the SSL TrustStore Type like JKS, PKCS12 for Kafka source. Find more details on this config here Dagger supported values are: JKS, PKCS12, PEM
- Example value:
JKS
- Type:
optional
required only for SSL enabledKAFKA_CONSUMER
orKAFKA_SOURCE
#
ExampleSTREAMS configurations to consume data from the SSL/TLS enabled kafka -
STREAMS = [ { "SOURCE_KAFKA_TOPIC_NAMES": "test-topic", "INPUT_SCHEMA_TABLE": "data_stream", "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost:9092", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group", "SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL": "SSL", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_PROTOCOL": "TLS", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION": "my-keystore.jks", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD": "test-keystore-pass", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE": "JKS", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_LOCATION": "my-truststore.jks", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD": "test-truststore-pass", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE": "JKS", "SOURCE_KAFKA_NAME": "local-kafka-stream", "SOURCE_DETAILS": [ { "SOURCE_TYPE": "UNBOUNDED", "SOURCE_NAME": "KAFKA_CONSUMER" } ] }]
STREAMS configurations to consume data from multiple kafka sources -
STREAMS = [ { "SOURCE_KAFKA_TOPIC_NAMES": "test-topic", "INPUT_SCHEMA_TABLE": "data_stream", "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost:9092", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group", "SOURCE_KAFKA_CONSUMER_CONFIG_SECURITY_PROTOCOL": "SSL", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_PROTOCOL": "TLS", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_LOCATION": "my-keystore.jks", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_PASSWORD": "test-keystore-pass", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_KEYSTORE_TYPE": "JKS", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_LOCATION": "my-truststore.jks", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_PASSWORD": "test-truststore-pass", "SOURCE_KAFKA_CONSUMER_CONFIG_SSL_TRUSTSTORE_TYPE": "JKS", "SOURCE_KAFKA_NAME": "local-kafka-stream", "SOURCE_DETAILS": [ { "SOURCE_TYPE": "UNBOUNDED", "SOURCE_NAME": "KAFKA_CONSUMER" } ] }, { "SOURCE_KAFKA_TOPIC_NAMES": "test-topic-2", "INPUT_SCHEMA_TABLE": "data_stream-2", "INPUT_SCHEMA_PROTO_CLASS": "com.tests.TestMessage", "INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX": "41", "SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS": "localhost:9091", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE": "false", "SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET": "latest", "SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID": "dummy-consumer-group", "SOURCE_KAFKA_NAME": "local-kafka-stream-2", "SOURCE_DETAILS": [ { "SOURCE_TYPE": "UNBOUNDED", "SOURCE_NAME": "KAFKA_CONSUMER" } ] }]