• Apache Flink
- This article includes information that was originally written by Bret McGuire on GitHub
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale. This tutorial will show you step-by-step how to use Astra as a sink for results computed by Flink. These instructions are intended to demonstrate how to enable such support when using a Flink DataStream.
This code is intended as a fairly simple demonstration of how to enable an Apache Flink job to interact with DataStax Astra. There is certainly room for optimization here. A simple example: Flink's CassandraSink will open a new Session on each open() call even though these Session objects are thread-safe. A more robust implementation would be more aggressive about memoizing Sessions, encouraging a minimal number of open sessions for multiple operations on the same JVM. This work may be undertaken in the future, but for the moment it is beyond the scope of what we're aiming for here.
- You should have an Astra account
- You should Create an Astra Database
- You should Have an Astra Token
- You should clone this GitHub Repository
- You should have Apache Flink, Gradle, and Java installed in your system.
For this tutorial, you will need either Java 8 or Java 11 to run it. Any other version might run into an exception and cause build failure.
Installation and Setup¶
Now that you have gathered all of your prerequisites, you are ready to configure and setup for this example.
- Create a keyspace named
examplein your Astra database. At the moment, this name will be hard-coded.
- Download the secure connect bundle (SCB) for your database. You can find this under the "Connect" tab in the UI.
- Once you have downloaded your secure connect bundle, place it in
app/src/main/resourcesin your GitHub directory (You do not have to unzip the file).
- Create a properties file titled
app.properties, and place it in
- Add properties specifying your Astra client ID, Astra secret, and SCB file name. These should map to the "astra.clientid", "astra.secret", and "astra.scb" properties respectively. Your
app.propertiesfile should look something like this:
Test and Validate¶
Once you have completed all of the prerequisites along with the section above, you can move on to this section to run the sample app and validate the connection between Flink and Astra.
- In your
flink-astracloned GitHub directory, run
- Verify that the application runs and exits normally. If this completed successfully you should see the following message:
- Navigate back to the Astra UI to use the CQL Console. You can run this sample query to confirm that the defined data from the sample app has been loaded properly:
🏠 Back to home
token@cqlsh:example> select * from wordcount ; word | count --------+------- dogs | 1 lazier | 1 least | 1 foxes | 1 jumped | 1 at | 1 are | 1 just | 1 quick | 1 than | 1 fox | 1 our | 1 dog | 2 or | 1 over | 1 brown | 1 lazy | 1 the | 2 (18 rows) token@cqlsh:example>