Kafka Connect Flatten (Confluent) SMT Usage Reference for Confluent Cloud

The following provides usage information for the Confluent Single Message Transforms (SMT) io.confluent.connect.transforms.Flatten.

Description

The Confluent Flatten SMT transforms a record’s key or value by converting complex nested structures (such as structs) into flat, top-level fields using a delimiter. This SMT also handles nested maps encountered within struct schemas. You can configure the behaviour.on.nested.map configuration to either stop processing records (FAIL) if such nested maps are found, or to include them as is (INCLUDE) without flattening their contents. The JSON SMT delimiter is the period . character, which is also the default.

You can use the concrete transformation type designed for the record key (io.confluent.connect.transforms.Flatten$Key) or value (io.confluent.connect.transforms.Flatten$Value).

When using an Avro schema, the default SMT delimiter period . cannot be used. Use an underscore _ for the delimiter. Use the following properties to change the delimiter:

"transforms": "flatten",
"transforms.flatten.type": "io.confluent.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"

JSON example

This configuration snippet shows how to use Flatten to flatten a record’s value while including a nested map without flattening its internal structure. The default delimiter (.) is used.

"transforms": "flatten",
"transforms.flatten.type": "io.confluent.connect.transforms.Flatten$Value",
"transforms.flatten.behaviour.on.nested.map": "INCLUDE",
"transforms.flatten.delimiter": "."

Before:

{
  "user": {
      "id": "1234",
      "attributes": { "role": "admin", "dept": "engineering" },
      "profile": {
          "address": {
          "city": "Bangalore",
          "metadata": { "zone": "east", "block": "A" }
         },
      "age": 30
      }
     }
 }

After:

{
  "user.id": "1234",
  "user.attributes": { "role": "admin", "dept": "engineering" },
  "user.profile.address.city": "Bangalore",
  "user.profile.address.metadata": { "zone": "east", "block": "A" },
  "user.profile.age": 30
 }

If transforms.flatten.behaviour.on.nested.map is set to FAIL, the connector does not process records with nested maps and fails to launch.

Avro example

The Avro schema specification only allows alphanumeric characters and the underscore _ character in field names. The following configuration snippet shows how to use Flatten to concatenate field names with the underscore _ delimiter character.

"transforms": "flatten",
"transforms.flatten.type": "io.confluent.connect.transforms.Flatten$Value",
"transforms.flatten.behaviour.on.nested.map": "INCLUDE",
"transforms.flatten.delimiter": "_"

Before:

{
  "user": {
      "id": "1234",
      "attributes": { "role": "admin", "dept": "engineering" },
      "profile": {
          "address": {
          "city": "Bangalore",
          "metadata": { "zone": "east", "block": "A" }
         },
      "age": 30
      }
     }
 }

After:

{
  "user_id": "1234",
  "user_attributes": { "role": "admin", "dept": "engineering" },
  "user_profile_address_city": "Bangalore",
  "user_profile_address_metadata": { "zone": "east", "block": "A" },
  "user_profile_age": 30
 }

If transforms.flatten.behaviour.on.nested.map is set to FAIL, the connector does not process records with nested maps and fails to launch.

Tip

For additional examples, see Flatten for managed connectors.

Properties

Name

Description

Type

Default

Valid Values

Importance

delimiter

Delimiter to insert between field names from the input record when generating field names for the output record.

string

.

medium

behaviour.on.nested.map

Behaviour when a nested map is found within a struct schema. FAIL: Fail the transformation if a nested map is encountered in a struct schema. INCLUDE: Include the nested map as is without flattening the nested map. The key for the map must be a string.

string

FAIL

FAIL, INCLUDE

medium

Predicates

Transformations can be configured with predicates so that the transformation is applied only to records which satisfy a condition. You can use predicates in a transformation chain and, when combined with the Kafka Connect Filter (Kafka) SMT Usage Reference for Confluent Cloud, predicates can conditionally filter out specific records. For details and examples, see Predicates.