In this article you will learn how to create a MapR Streams Consumer that saves all the messages into a MapR-DB JSON Table.
Install and Run the sample MapR Streams application
The steps to install and run the applications are the same as the one defined in the following article:
Once you have the default producer and consumer running in your environment using the commands:
Producer:
1
|
|
Consumer:
1
|
|
Save messages into MapR-DB JSON
The DBConsumer class is a copy of the Consumer class with small changes to save the messages coming from the /sample-stream:fast-messages
topic into a MapR-DB table named /apps/fast-messages
.
1- Add MapR-DB Maven dependency to your project
Edit the pom.xml
file and add the following entry in the dependencies
tag:
1 2 3 4 5 |
|
This add support for:
- OJAI Open JSON Application Interface
- MapR-DB JSON API
2- Create and Get a JSON Table
To save the messages, the application must access a JSON Table, for this just call the MapRDB.getTable(TABLE_PATH)
method. If the table does not exist, create it with the MapRDB.createTable(TABLE_PATH)
.
This is what the DBConsumer.getTable(TABLE_PATH)
method is doing.
1 2 3 4 5 6 7 |
|
When the DBConsumer starts the getTable("/apps/fast-messages")
method is called.
1
|
|
The table fastMessagesTable
is not available to the consumer.
3- Save messages into the JSON Table
Messages can be saved into the table using the MapR-DB JSON Java API.
The producer sends the message as JSON String that is converted into a JSON object names msg
. This object can be used to create an OJAI Document:
1
|
|
To be saved into MapR-DB, a document must have a _id
field. In this example let’s use the message number generated by the producer (JSON field k
).
1
|
|
Let’s now save the document into the table:
1
|
|
Each time the producer will be executed, the message number counter will be initialized to 0. So the document _id will be the same, and the document into the table must be replaced; this is why the insertOrReplace
method is used.
Let’s run the new consumer.
4- Run the DBConsumer
To run the DBConsumer just pass the parameter dbconsumer
as follow:
Consumer:
1
|
|
Note that a new group is created to be sure that messages are read by the two different consumers (Consumer and DBConsumer).
5- Query the messages saved into MapR-DB
Messages are saved into the /apps/fast-messages
table, let’s used the MapR DBShell to query the data. On your cluster run the following commands, as mapr
:
1 2 3 |
|
Conclusion
In this very simple example, the DBConsumer takes each message and saved it as a simple JSON Document into MapR-DB JSON. The table can be used to create any type of application, or using Apache Drill (1.6 or later) to do some analytics.
In a real application the messages will probably be modified, enriched and/or aggregated and then the result be saved into MapR-DB Table. The goal of this sample is just to show that it is easy to integrate MapR Streams and MapR-DB.
You have also other alternative to achieve the same thing using for example:
- Spark Streaming
- 3rd Party ETL and Tools