How K.F.C can eat your data one day…

High-Level Overview:

Tools & Versions Used:

  1. Scala 2.11.12
  2. IntelliJ
  3. Flink 1.13.5
  4. Maven Build
  5. Flink-Kafka _2.11 Connector
  6. Flink-Cassandra_2.11 Connector
  7. Flink Single Node Cluster

What this framework will do?

  1. Check whether attributes in the JSON are as expected.
  2. Check whether the data types of those attributes are according to the expectations.
  1. Flink-Kafka Producer
  2. Flink-Kafka Consumer
  3. Low-level conversion operators i.e Process Functions
  4. Sink Interfaces

Flink-Kafka Producer:

  1. Upgrading Flink Version (Applicable only if the existing classes in use will be deprecated shortly and will have no support. For example, FlinkKafkaProducer and FlinkKafkaConsumer are deprecated since 1.14.2, and KafkaSource and KafkaSink should be used instead.)
  2. Migrating from one cluster to another.
  1. Add a state.
  2. Remove a state.
  3. Change the data type of the existing state.

Flink-Kafka Consumer:

properties.setProperty("isolation.level", "read_committed")
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay

Process Functions:

Sink Functions:

  1. RichSinkFunction
  2. RichAsyncFunction
class CassandraSink extends RichSinkFunction[(String,String)] {
val logger = LoggerFactory.getLogger(classOf[CassandraSink])
var cluster: Cluster = _
var session: Session = _
var rawData: String = _
override def open(parameters: Configuration): Unit = {
cluster = Cluster.builder().addContactPoint(“localhost”).build() //
session = cluster.connect(“example”)
override def invoke(value: (String,String)): Unit = {

override def close(): Unit = {

.setQuery("INSERT INTO example.failedJsonRecords(data,description) values (?, ?);")

Running The Framework:

  1. Older ( for older versions)
  2. Latest (for newer versions)





Data | FCBarcelona | JackieChan

Siddhesh K

Siddhesh K

Data | FCBarcelona | JackieChan

