How K.F.C can eat your data one day…

High-Level Overview:

Tools & Versions Used:

  1. Scala 2.11.12
  2. IntelliJ
  3. Flink 1.13.5
  4. Maven Build
  5. Flink-Kafka _2.11 Connector
  6. Flink-Cassandra_2.11 Connector
  7. Flink Single Node Cluster

What this framework will do?

  1. Check whether attributes in the JSON are as expected.
  2. Check whether the data types of those attributes are according to the expectations.
  1. Flink-Kafka Producer
  2. Flink-Kafka Consumer
  3. Low-level conversion operators i.e Process Functions
  4. Sink Interfaces

Flink-Kafka Producer:

  1. Upgrading Flink Version (Applicable only if the existing classes in use will be deprecated shortly and will have no support. For example, FlinkKafkaProducer and FlinkKafkaConsumer are deprecated since 1.14.2, and KafkaSource and KafkaSink should be used instead.)
  2. Migrating from one cluster to another.
  1. Add a state.
  2. Remove a state.
  3. Change the data type of the existing state.
env.setDefaultSavepointDirectory()

Flink-Kafka Consumer:

properties.setProperty("isolation.level", "read_committed")
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // max failures per unit
Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
Time.of(10, TimeUnit.SECONDS) // delay
))

Process Functions:

Sink Functions:

  1. RichSinkFunction
  2. RichAsyncFunction
class CassandraSink extends RichSinkFunction[(String,String)] {
val logger = LoggerFactory.getLogger(classOf[CassandraSink])
var cluster: Cluster = _
var session: Session = _
var rawData: String = _
println(“inside….”)
override def open(parameters: Configuration): Unit = {
cluster = Cluster.builder().addContactPoint(“localhost”).build() //
session = cluster.connect(“example”)
println(“Connected….”)
}
override def invoke(value: (String,String)): Unit = {

}
override def close(): Unit = {
//session.close()

}
}
CassandraSink.addSink(badRecords)
.setQuery("INSERT INTO example.failedJsonRecords(data,description) values (?, ?);")
.setHost("localhost")
.setMaxConcurrentRequests(10)
.build()

Running The Framework:

  1. Older ( for older versions)
  2. Latest (for newer versions)

Conclusion:

--

--

--

Data | FCBarcelona | JackieChan

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

The cloud is on and the meter’s running — avoid the sticker shock of ‘pay as you go’

Introduction to Continuous Integration/Deployment using CircleCI

Hibernate > Shutdown

Adding some Caffeine to Kotlin SpringBoot☕

Essential Skills For Programming

Developing .NET Isolated Process Azure Functions

Class and instance attributes

Be aware of this pitfall in pandas

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Siddhesh K

Siddhesh K

Data | FCBarcelona | JackieChan

More from Medium

Blockchain in the life-cycle of a product #3: from Maintenance to Recycling

Relax Into What You Are Becoming — Part 15

Poetry project median.