How K.F.C can eat your data one day…

Siddhesh K
8 min readJan 14, 2022

--

Data Quality Framework using Kafka, Flink, and Cassandra.

High-Level Overview:

Here, is the high-level architecture of the framework developed for validation of the incoming JSON records.

The whole idea is to compare the Expected JSON schema and the schema of the incoming records. If it matches, then inserts it to the Cassandra table / can be passed as a JSON Object to the downstream classes/applications for further processing. If it doesn’t match then insert it to the other Cassandra table (used for this use case). So, it is the kind of Auditing that is being done here.

So, the source people can refer to that table and will come to know what exactly went wrong for the records which weren’t processed by the framework so that they can fix it and resend it again.

P.S: This framework is something that will be at the start of the pipeline and this is a sample design architecture. In actual scenarios, Cassandra might be replaced by Kafka for moving records. Cassandra CDC can be implemented to enable reading the stream of changes from Cassandra whilst storing data in Cassandra.

Tools & Versions Used:

  1. Scala 2.11.12
  2. IntelliJ
  3. Flink 1.13.5
  4. Maven Build
  5. Flink-Kafka _2.11 Connector
  6. Flink-Cassandra_2.11 Connector
  7. Flink Single Node Cluster

What this framework will do?

  1. Check whether attributes in the JSON are as expected.
  2. Check whether the data types of those attributes are according to the expectations.

If none of the above conditions matches, then it will be considered as a bad record.

Let's look at the components:

  1. Flink-Kafka Producer
  2. Flink-Kafka Consumer
  3. Low-level conversion operators i.e Process Functions
  4. Sink Interfaces

Flink-Kafka Producer:

This is an EXACTLY_ONCE Flink-Kafka producer that has been developed. The whole idea of using this producer is to generate a few JSON records as we are testing this framework on the local single node and we don’t have any real-time source for now which will generate this dataset for us.

So, since we are generating the records in the loop our module won’t be generating any Bad Records for now but, can be tested by passing the incorrect /invalid records manually over the terminal.

For the producer side, Flink uses a two-phase commit to achieve EXACTLY_ONCE. It is relied on Kafka transactions to write data only if the transaction is committed. This can be achieved using EXACTLY_ONCE semantic.

Relative comments have been added in the code to justify the significance of their existence in the framework. However, would like to point out a few things like the first one below:

.uid(“some-identifier”)

It is called Unique Identifier Operators. It will be used when starting your application from a Savepoint. From the Flink Book,

If you do not assign unique IDs to your operators with the uid() method, Flink
assigns default identifiers, which are hash values that depend on the type of the
operator and all previous operators. Since it is not possible to change the
identifiers in a savepoint, you will have fewer options to update and evolve your
application if you do not manually assign operator identifiers using uid().

Savepoints are important in the scenarios like below:

  1. Upgrading Flink Version (Applicable only if the existing classes in use will be deprecated shortly and will have no support. For example, FlinkKafkaProducer and FlinkKafkaConsumer are deprecated since 1.14.2, and KafkaSource and KafkaSink should be used instead.)
  2. Migrating from one cluster to another.

However, you won’t be able to resume from a Savepoint state if you tamper with any of the below:

  1. Add a state.
  2. Remove a state.
  3. Change the data type of the existing state.

You can add your directory for saving savepoints like below in the code:

env.setDefaultSavepointDirectory()

If a FlinkKafkaProducer is stopped with a savepoint then all the transactions are finalized. Newer classes have their way of tracking the states.

Another important functionality of Flink is Checkpointing.

It is used in case of failure recovery. For the producer side, if the producer task failed, it will restart from the latest checkpoint and re-emit from the offset recorded in the checkpoint. For example, suppose the latest checkpoint records offset A, and after that flink continues to emit B, C, and then failover, then Flink would continue to emit records from B. Note that this would not cause duplication since the state of all the operators is also fallback to the state after processed records A.

This is how EXACTLY_ONCE works…

Flink-Kafka Consumer:

It is also an EXACTLY_ONCE consumer. It has the same Savepoints and Checkpointing features as the Flink-Kafka Producer. Here, the EXACTLY_ONCE is achieved by reading only committed messages by the producer with the below property:

properties.setProperty("isolation.level", "read_committed")

It collects the elements of 10 seconds window and passes them to the process function for further validation. While consuming data, use the below method:

.rebalance()

It will distribute the data across partitions and will help to avoid data skewness. By default, it distributes the data in a round-robin fashion and it is a more efficient way of distributing data.

Restart Strategy called failure rate has been implemented for both Producer and Consumer.

env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
))

So, for example, if you already have a pipeline in place and suppose at a specific time, the source DB connection failed if somebody by mistake stopped the DB server. So, if I use the failure rate restart strategy, it will give you a buffer during which if the DB is up and running again, your pipeline won’t be affected, and if it is still not up and if the time exceeds the time interval, then only it will be failed and have to restart it again when the DB is up and running again.

If I use another strategy called Fixed Delay Restart which is by default strategy used by Flink, it will try to restart for the mentioned number of attempts and delay between those attempts. If the DB is still not up and running, then it will eventually fail.

WE SHOULD NEVER USE A FIXED DELAY RESTART STRATEGY!!!

The reason being you should give time for the pipeline to recover from failure by considering some extra buffer time.

This module also includes Process Function and flink way of error handling i.e Side Outputs which will be described below.

Process Functions:

From the Flink Book,

The DataStream API provides a family of low-level transformations,
the process functions, which can also access record timestamps and
watermarks and register timers that trigger at a specific time in the
future. Moreover, process functions feature side outputs to emit records
to multiple output streams. Process functions are commonly used to
build event-driven applications and to implement custom logic for
which predefined windows and transformations might not be suitable.
For example, most operators for Flink’s SQL support are implemented
using process functions.

The function is similar to the flatMap function when it comes to behavior.

It gives you the most flexibility when dealing with time in streaming applications.

Access to keyed state and timers (which are also scoped to a key) is only available if the ProcessFunction is applied on a KeyedStream.

Process functions can also be implemented to support late data. For eg: A process function can easily filter out late events by comparing their
timestamps with the current watermark.

In our use case, we use a process function that involves the logic for validating the incoming JSON record by defining the expected schema and comparing it with the record schema. We force to raise a Failure case if the parsed record is not according to the expected schema. This is handled by using Side Outputs which is also one of the features of Process Functions and also Flink way of error-handling.

Side Outputs are used to collect data that can be passed to another data stream. In our case, we are passing both good and bad records to the side stream and we create a data stream out of it and apply sink functions.

Usually, this is used for corrupted data and to avoid jobs from the fail-restart-fail cycle.

Sink Functions:

If I define a class with a few methods and if I call those methods and if I try to insert the records, it will create a connection per record which is not a good practice. For this purpose, Flink has provided different Interfaces like below:

  1. RichSinkFunction
  2. RichAsyncFunction

It will create a connection pool instead of establishing a connection per record.

RichSinkFunction and RichAsyncFunction have the below template:

class CassandraSink extends RichSinkFunction[(String,String)] {
val logger = LoggerFactory.getLogger(classOf[CassandraSink])
var cluster: Cluster = _
var session: Session = _
var rawData: String = _
println(“inside….”)
override def open(parameters: Configuration): Unit = {
cluster = Cluster.builder().addContactPoint(“localhost”).build() //
session = cluster.connect(“example”)
println(“Connected….”)
}
override def invoke(value: (String,String)): Unit = {

}
override def close(): Unit = {
//session.close()

}
}

The only difference is that RichAsyncFunction takes the output parameter as well. RichAsyncFunction / AsyncIO runs in Sync mode by default. Also, we can use executorService it for creating separate threads when we need to read/write data asynchronously and not in a sequential fashion.

These interfaces can be used to implement your custom database, sink classes. But, it has a lot of work involved, like initializing a client, handling DB logic under the invoke()method.

For our use case, we are gonna use CassandraSink provided by Flink with various properties. It internally used RichSinkFunction with Cassandra client’s Async API. It also provides property such as .setMaxConcurrentRequests()which can send the max number of requests from the same session.

Snippet for it looks like below:

CassandraSink.addSink(badRecords)
.setQuery("INSERT INTO example.failedJsonRecords(data,description) values (?, ?);")
.setHost("localhost")
.setMaxConcurrentRequests(10)
.build()

Running The Framework:

You can go through the framework using this link. GenerateRecords and ConsumeRecords/SideOutput are the main invokers for this framework. You can create a JAR file and use CLI to run parallel sessions for producers and consumers.

You can get the commands from the below official documentation links:

  1. Older ( for older versions)
  2. Latest (for newer versions)

You can try various options like triggering Savepoints for a job and so on.

You can also run these objects in IDE by manually producing records via Kafka Producer and using ConsumeRecords/SideOutput for consuming records.

Note: You will have to start Flink, Kafka and, Cassandra Cluster when testing / using this framework.

Conclusion:

This was the very first attempt with Flink and I learned a lot of things. I am thankful to the Flink community for developing such an amazing streaming framework.

This framework might help someone to integrate it with their existing data pipelines where they are dealing with unbounded data streams. Since we are dealing with the unbounded streams, we can validate the record at the same time they arrive in Flink unlike in bounded datasets where for identifying a single bad record you will have to read the entire dataset and write a logic accordingly.

I hope it helps someone someday!

--

--

No responses yet