SET Statement in Confluent Cloud for Apache Flink
Confluent Cloud for Apache Flink® enables setting Flink SQL shell properties to different values.
Syntax
SET 'key' = 'value';
Description
Modify or list the Flink SQL shell configuration.
If no key and value are specified, SET prints all of the properties that you have assigned for the session.
To reset a session property to its default value, use the RESET Statement in Confluent Cloud for Apache Flink.
Note
In a Cloud Console workspace, the SET statement can’t be run separately and must be submitted along with another Flink SQL statement, like SELECT, CREATE, or INSERT, for example:
SET 'sql.current-catalog' = 'default';
SET 'sql.current-database' = 'cluster_0';
SELECT * FROM pageviews;
Example
The following examples show how to run a SET statement in the Flink SQL shell.
SET 'sql.local-time-zone' = 'America/Los_Angeles';
Your output should resemble:
Statement successfully submitted.
Statement phase is COMPLETED.
configuration updated successfully.
To list the current session settings, run the SET command with no parameters.
SET;
Your output should resemble:
Statement successfully submitted.
Statement phase is COMPLETED.
+----------------------+--------------------------+
| Key | Value |
+----------------------+--------------------------+
| sql.current-catalog | default (default) |
| sql.current-database | <your_cluster> (default) |
| sql.local-time-zone | America/Los_Angeles |
+----------------------+--------------------------+
The SET; operation is not supported in Cloud Console workspaces.
Available SET options
The following configuration options are available for the SET statement in Confluent Cloud for Apache Flink.
Session-level SET options apply to all tables in newly created queries. Many of these options correspond to table-level WITH options that you can specify per-table by using CREATE TABLE … WITH or ALTER TABLE … SET. When both are specified, the table-level WITH option takes precedence.
For a comparison of option names with corresponding options in Apache Flink, see Configuration options.
Table options
Key | Default | Type | Description |
|---|---|---|---|
sql.current-catalog | (None) | String | Defines the current catalog. Semantically equivalent with USE CATALOG [catalog_name]. Required if object identifiers are not fully qualified. |
sql.current-database | (None) | String | Defines the current database. Semantically equivalent with USE [database_id]. Required if object identifiers are not fully qualified. |
sql.dry-run |
| Boolean | If |
sql.inline-result |
| Boolean | If |
sql.local-time-zone | “UTC” | String | Specifies the local time zone offset for TIMESTAMP_LTZ conversions. When converting to data types that don’t include a time zone (for example, TIMESTAMP, TIME, or simply STRING), this time zone is used. The input for this option is either a Time Zone Database (TZDB) ID, like “America/Los_Angeles”, or fixed offset, like “GMT+03:00”. |
sql.snapshot.mode | “off” | String | Specifies the mode for snapshot queries. Valid values are “now” and “off”. If not specified, the default value is “off”. For more information, see Snapshot Queries in Confluent Cloud for Apache Flink. |
sql.snapshot.write-mode | “default” | String | Specifies the write mode for snapshot (batch) queries. Valid values are “default” and “fast-write”. The “fast-write” mode disables exactly-once delivery for improved performance. This option is valid only in batch mode. |
sql.state-ttl | 0 ms | Duration | Specifies a minimum time interval for how long idle state, which is state that hasn’t been updated, is retained. The system decides on actual clearance after this interval. If set to the default value of |
sql.tables.initial-offset-from | (None) | String | Specifies the name of a reference statement from which to carry over topic offsets when creating a new statement. Applies only when replacing an existing statement in the same organization, environment, and region. For details, see Carry Over Offsets. |
sql.tables.scan.bounded.timestamp-millis | (None) | Long | Overwrites scan.bounded.timestamp-millis for Confluent-native tables used in newly created queries. This option is not applied if the table uses a value that differs from the default value. |
sql.tables.scan.bounded.mode | (None) |
| Overwrites scan.bounded.mode for Confluent-native tables used in newly created queries. This option is not applied if the table uses a value that differs from the default value. |
sql.tables.scan.idle-timeout | 0 ms | Duration | Specifies the timeout interval for progressive idleness detection. Setting this value to |
sql.tables.scan.watermark-alignment.max-allowed-drift | 5 min | Duration | Specifies the maximum allowed drift for watermark alignment across different splits or partitions to ensure even processing. Setting to |
sql.tables.scan.startup.timestamp-millis | (None) | Long | Overwrites scan.startup.timestamp-millis for Confluent-native tables used in newly created queries. This option is not applied if the table uses a value that differs from the default value. |
sql.tables.scan.source-operator-parallelism | (None) | Integer | Determines the parallelism of the source operator for Confluent-native tables used in newly created queries. This option is not applied if the table uses a value that differs from the default value. |
sql.tables.scan.startup.mode | (None) |
| Overwrites scan.startup.mode for Confluent-native tables used in newly created queries. This option is not applied if the table uses a value that differs from the default value. |
sql.tables.scan.startup.specific-offsets | (None) | String | Overwrites scan.startup.specific-offsets for Confluent-native tables used in newly created queries. This option is not applied if the table uses a value that differs from the default value. |
Flink SQL shell options
The following SET options are available only in the Flink SQL shell.
In a Cloud Console workspace, the only client option you can set is client.statement-name.
Key | Default | Type | Description |
|---|---|---|---|
client.output-format | standard | String | Output format. Valid values are “standard” or “plain-text”. |
client.results-timeout | 600000 | Long | Total amount of time, in milliseconds, to wait before timing out the request waiting for results to be ready. |
client.service-account | (None) | String | Service account to use instead of running statements attached to your user account. For more information, see Production workloads (service accounts). |
client.statement-name | (None) | String | Give your Flink statement a meaningful name that can help you identify it more easily. Instead of an autogenerated name, like |