Apache Spark - capturing Kafka data on streaming event to trigger workflow

Tsar Bomba Source

In a nutshell, I'm a developer attempting to use Spark to move data from one system to another. Raw data in one system to crunched, summarized form, into a homegrown analytics system.

I'm very new to Spark - my knowledge limited to what I've been able to dig up and experiment with over the last week or two.

What I'm picturing is; using Spark to watch for an event from Kafka as a trigger. Capture that entity/data on the consumer event and use it to tell me what needs to be updated in the analytics system. I would then run the relevant Spark queries against the raw Cassandra data and write the result to a different table on the analytics side, which the dashboard metrics call as a data source.

I have a simple Kafka structured streaming query working. While I can see the object consumed being outputted to the console, I'm unable to retrieve the Kafka record when the consumer event happens:

try {
    SparkSession spark = SparkSession
        .builder()
        .master(this.sparkMasterAddress)
        .appName("StreamingTest2")
        .getOrCreate();

    //THIS -> None of these events seem to give me the data consumed?
    //...thinking I'd trigger the Cassandra write from here?
    spark.streams().addListener(new StreamingQueryListener() {
        @Override
        public void onQueryStarted(QueryStartedEvent queryStarted) {
            System.out.println("Query started: " + queryStarted.id());
        }
        @Override
        public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
            System.out.println("Query terminated: " + queryTerminated.id());
        }
        @Override
        public void onQueryProgress(QueryProgressEvent queryProgress) {
            System.out.println("Query made progress: " + queryProgress.progress());
        }
    });

    Dataset<Row> reader = spark
        .readStream()
        .format("kafka")
        .option("startingOffsets", "latest")
        .option("kafka.bootstrap.servers", "...etc...")
        .option("subscribe", "my_topic")
        .load();

    Dataset<String> lines = reader
        .selectExpr("cast(value as string)")
        .as(Encoders.STRING());

    StreamingQuery query = lines
        .writeStream()
        .format("console")
        .start();
    query.awaitTermination();
} catch (Exception e) {
    e.printStackTrace();
}

I'm also able to query Cassandra just fine w/ Spark SQL:

try {
    SparkSession spark = SparkSession.builder()
        .appName("SparkSqlCassandraTest")
        .master("local[2]")
        .getOrCreate();

    Dataset<Row> reader = spark
        .read()
        .format("org.apache.spark.sql.cassandra")
        .option("host", this.cassandraAddress)
        .option("port", this.cassandraPort)
        .option("keyspace", "my_keyspace")
        .option("table", "my_table")
        .load();

    reader.printSchema();
    reader.show();

    spark.stop();
} catch (Exception e) {
    e.printStackTrace();
}

My thought is; trigger the latter w/ the former, get this thing bundled as a Spark app/package/whatever, and get it deployed into spark. At that point, I'd expect it to continually push updates to the metric table(s).

Will this be a workable, scalable, reasonable solution to what I need? Am I on the right path? Not opposed to using Scala if that's easier or better, in some way.

Thanks!

EDIT: Here's a diagram of what I'm up against.

enter image description here

javascalaapache-sparkcassandraapache-kafka

Answers

answered 5 days ago Tsar Bomba #1

Got it. Learned about the ForeachWriter. Works great:

        StreamingQuery query = lines
            .writeStream()
            .format("foreach")
            .foreach(new ForeachWriter<String>() {
                @Override
                public void process(String value) {
                    System.out.println("process() value = " + value);
                }

                @Override
                public void close(Throwable errorOrNull) {}

                @Override
                public boolean open(long partitionId, long version) {
                    return true;
                }
            })
            .start(); 

comments powered by Disqus