In this article you will learn how to create a MapR Streams Consumer that saves all the messages into a MapR-DB JSON Table.
Install and Run the sample MapR Streams application
The steps to install and run the applications are the same as the one defined in the following article:
Once you have the default producer and consumer running in your environment using the commands:
Producer:
$ java -cp $(mapr classpath):./mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run producer
Consumer:
$ java -cp $(mapr classpath):./mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run consumer
Save messages into MapR-DB JSON
The DBConsumer class is a copy of the Consumer class with small changes to save the messages coming from the /sample-stream:fast-messages
topic into a MapR-DB table named /apps/fast-messages
.
1- Add MapR-DB Maven dependency to your project
Edit the pom.xml
file and add the following entry in the dependencies
tag:
<dependency>
<groupId>com.mapr.db</groupId>
<artifactId>maprdb</artifactId>
<version>5.1.0-mapr</version>
</dependency>
This add support for:
- OJAI Open JSON Application Interface
- MapR-DB JSON API
2- Create and Get a JSON Table
To save the messages, the application must access a JSON Table, for this just call the MapRDB.getTable(TABLE_PATH)
method. If the table does not exist, create it with the MapRDB.createTable(TABLE_PATH)
.
This is what the DBConsumer.getTable(TABLE_PATH)
method is doing.
private static Table getTable(String tablePath) {
if ( ! MapRDB.tableExists(tablePath)) {
return MapRDB.createTable(tablePath);
} else {
return MapRDB.getTable(tablePath);
}
}
When the DBConsumer starts the getTable("/apps/fast-messages")
method is called.
Table fastMessagesTable = getTable("/apps/fast-messages");
The table fastMessagesTable
is not available to the consumer.
3- Save messages into the JSON Table
Messages can be saved into the table using the MapR-DB JSON Java API.
The producer sends the message as JSON String that is converted into a JSON object names msg
. This object can be used to create an OJAI Document:
Document messageDocument = MapRDB.newDocument(msg);
To be saved into MapR-DB, a document must have a _id
field. In this example let’s use the message number generated by the producer (JSON field k
).
messageDocument.setId( Integer.toString(messageDocument.getInt("k")));
Let’s now save the document into the table:
fastMessagesTable.insertOrReplace( messageDocument );
Each time the producer will be executed, the message number counter will be initialized to 0. So the document _id will be the same, and the document into the table must be replaced; this is why the insertOrReplace
method is used.
Let’s run the new consumer.
4- Run the DBConsumer
To run the DBConsumer just pass the parameter dbconsumer
as follow:
Consumer:
$ java -cp $(mapr classpath):./mapr-streams-examples-1.0-SNAPSHOT-jar-with-dependencies.jar com.mapr.examples.Run dbconsumer
Note that a new group is created to be sure that messages are read by the two different consumers (Consumer and DBConsumer).
5- Query the messages saved into MapR-DB
Messages are saved into the /apps/fast-messages
table, let’s used the MapR DBShell to query the data. On your cluster run the following commands, as mapr
:
$ mapr dbshell
maprdb mapr:> find /apps/fast-messages --id 100
{"_id":"100","type":"test","t":64986.787,"k":{"$numberLong":100}}
Conclusion
In this very simple example, the DBConsumer takes each message and saved it as a simple JSON Document into MapR-DB JSON. The table can be used to create any type of application, or using Apache Drill (1.6 or later) to do some analytics.
In a real application the messages will probably be modified, enriched and/or aggregated and then the result be saved into MapR-DB Table. The goal of this sample is just to show that it is easy to integrate MapR Streams and MapR-DB.
You have also other alternative to achieve the same thing using for example:
- Spark Streaming
- 3rd Party ETL and Tools