Clients can connect to Apache Kafka for BigQuery clusters by using the open source Apache Kafka API. Applications must communicate with Apache Kafka for BigQuery over TLS by using the credentials of an authorized Google Cloud service account. Apache Kafka for BigQuery clusters support only TLS connections; plaintext communication is not supported. Apache Kafka for BigQuery manages the certificate logic generation and maintenance.
Apache Kafka for BigQuery supports authentication over OAUTHBEARER and SASL to convey Google Cloud principal identifiers. The Google Cloud principal is used within Apache Kafka for BigQuery as the client principal. An example of a Google Cloud principal is a service account email.
Learn more about the following:
Grant the Managed Kafka client role to the service account
You must grant the roles/managedkafka.client
role on the project containing
the cluster to the service account that you are going to use to connect to the
cluster.
The Managed Kafka client role includes the permission
managedkafka.clusters.connect
required for all connections.
To grant the Managed Kafka client role to the service account,
follow these steps:
Console
- In the Google Cloud console, go to the IAM page.
Go to IAM - Check that the project is set to the consumer project that the Apache Kafka for BigQuery client would be accessing.
- Click Grant access.
- In the new page, for Add Principals, enter the email address of the service account that you are using.
- For Assign roles, select the Managed Kafka client role.
- Click Save.
gcloud CLI
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
-
Run the
gcloud projects add-iam-policy-binding
command:gcloud projects add-iam-policy-binding PROJECT_ID \ --member serviceAccount:SERVICE_ACCOUNT_EMAIL \ --role roles/managedkafka.client
Replace the following:
-
PROJECT_ID is the project ID.
-
SERVICE_ACCOUNT_EMAIL is the email address of the service account.
-
Configure the Kafka client for authenticating to Google Cloud
You can authenticate Kafka clients to Google Cloud by using one of the following mechanisms:
OAUTHBEARER (Recommended): This mechanism requires using Application Default Credentials (ADC). ADC is a strategy used by the authentication libraries to automatically find credentials based on the application environment. For more information about where ADC looks for credentials and in what order, see How Application Default Credentials works.
SASL_PLAIN: This mechanism requires using a username and password that are derived from the service account keys.
In general, OAUTHBEARER is the recommended option. However, SASL_PLAIN might be a more convenient mechanism for testing.
OAuthBearer authentication
This section outlines the OAuthBearer authentication methods for Kafka clients connecting to Google Cloud by using ADC. This approach leverages the automated and secure distribution of credentials already present in Compute Engine virtual machines and local development environments.
For OAuthBearer authentication, choose one of the following methods:
Use a custom login handler for Java clients: This method utilizes a custom Java library within the
kafka-java-auth
package to facilitate authentication. This method is suitable for environments where you can add new Java libraries to your client's Java classpath.Run a local authentication server: This method is suitable for all clients.
Kafka Java auth client handler
Within the managedkafka repository,
you'll find an implementation of
Kafka's AuthenticateCallbackHandler
that is suited to authenticating your Kafka
clients to Google Cloud.
To complete your client setup, perform the following steps:
Check out the following GitHub repository.
git clone https://github.com/googleapis/managedkafka.git
Navigate to the
kafka-java-auth
directory and run themvn
command:cd kafka-java-auth mvn install
This command generates all the relevant artifacts and place them in your local maven repository.
Include the following dependency in your build.
<dependency> <groupId>com.google.cloud</groupId> <artifactId>google-managed-kafka-auth-login-handler</artifactId> <version>1.0.0</version> </dependency>
Modify your Kafka client configuration file to include the following properties:
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
This configuration instructs the client to use the Google Auth Java library for authentication. In Google Cloud environments, the library automatically utilizes environment default credentials. To override this behavior and specify different credentials, set the
GOOGLE_APPLICATION_CREDENTIALS
environment variable as described in the Getting Application Default Credentials section.
Local authentication server
This method utilizes a local Python server for authentication, offering broader client compatibility.
Within kafka-auth-local-server
, you'll find a Python script that lets
you run a local auth server that lets the Kafka clients authenticate to
Google Cloud by using the environment default credentials.
Complete the following steps:
Check out the following GitHub repository.
git clone https://github.com/googleapis/managedkafka.git
Create a virtual python environment and install the server dependencies.
pip install virtualenv virtualenv <your-env> source <your-env>/bin/activate <your-env>/bin/pip install packaging <your-env>/bin/pip install urllib3 <your-env>/bin/pip install google-auth
These commands create a virtual Python environment named
<your-env>
, activate it, and then install the necessary dependencies for the authentication server.Run the local authentication server.
kafka-auth-local-server/kafka_gcp_credentials_server.py
This command starts the local authentication server, which listens for incoming connections from Kafka clients. The server prints a message indicating its listening address and port. An example of this message is:
Serving on localhost:14293. This is not accessible outside of the current machine.
Configure your Kafka client with the following properties:
security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.oauthbearer.token.endpoint.url=http://localhost:14293 sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \ required clientId="admin" clientSecret="unused";
Ensure that the
sasl.oauthbearer.token.endpoint.url
parameter points to the address and port of your running local authentication server.
SASL_PLAIN authentication
This method is applicable to all Kafka clients.
Service account keys are a security risk if not managed correctly. You are responsible for the security of the private key and for other operations described by Best practices for managing service account keys. If you are prevented from creating a service account key, service account key creation might be disabled for your organization. For more information, see Managing secure-by-default organization resources.Download a service account key JSON file for the service account that you intend to use for your client.
Encode the service account file by using base64-encode to use as your authentication string. Assume the filename as
my_service_account.json
.On Linux or macOS systems, use the
base64
command (often installed by default) as follows:base64 < my_service_account.json > password.txt
This command performs the following actions:
base64 < my_service_account.json
: Reads the contents of the filemy_service_account.json
.Encodes the file's contents using base64 encoding. Base64 encoding is a way to represent binary data (such as JSON data in your service account file) as ASCII text. This is often used for transmitting data over channels that are designed for text.
> password.txt
: Redirects the output of thebase64
command (the base64 encoded version of your service account file) into a new file namedpassword.txt
.
You can use the contents of the password file for authentication with the following parameters.
security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="service account email" \ password="contents of base64 encoded password file";
When authenticating incoming connections to the cluster, Apache Kafka for BigQuery checks the following:
The provided service account principal has the permission
managedkafka.clusters.connect
(included inroles/managedkafka.client
) on the cluster.The provided username matches the service account whose key is used in the password.
Troubleshoot authentication errors
If an Apache Kafka for BigQuery client cannot authenticate to Apache Kafka for BigQuery, you see an error message similar to the following:
Exception in thread "main" java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.SaslAuthenticationException: A
uthentication failed: Invalid username or password
To resolve the issue, check for the following causes:
The password is malformed, and doesn't represent a valid service account key JSON blob when base64 decoded.
The principal in the service account key doesn't have IAM permissions on the cluster.
The provided username doesn't match the credential's principal.