Return to
Portfolio

76. MongoDB

MongoDB is a document-oriented database system.

NXLog can be configured to collect data from a MongoDB database. A proof-of-concept Perl script is shown in the example below.

Example 345. Collecting Data From MongoDB

This configuration uses im_perl to execute a Perl script which reads data from a MongoDB database. The generated events are written to file with om_file.

When new documents are available in the database, the script sorts them by ObjectId and processes them sequentially. Each document is passed to NXLog by calling Log::Nxlog::add_input_data(). The script will poll the database continuously with Log::Nxlog::set_read_timer(). In the event that the MongoDB server is unreachable, the timer delay will be increased to attempt reconnection later.

Warning
After processing, documents are deleted from the collection.
Note
The Perl script shown here is a proof-of-concept only. The script must be modified to correspond with the data to be collected from MongoDB.
nxlog.conf [Download file]
1
2
3
4
5
6
7
8
9
<Input perl>
    Module      im_perl
    PerlCode    mongodb-input.pl
</Input>

<Output file>
    Module      om_file
    File        '/tmp/output.log'
</Output>
mongodb-input.pl [Download file]
#!/usr/bin/perl

use strict;
use warnings;

use FindBin;
use lib $FindBin::Bin;
use Log::Nxlog;
use MongoDB;
use Try::Tiny;

my $counter;
my $client;
my $collection;
my $cur;
my $count;
my $logfile;

sub read_data_int
{
   $counter //= 1;
   # Connect to the server
   $client //= MongoDB::MongoClient->new(host => 'localhost:27017');

   # Select the database and collection
   $collection //= $client->ns('zips.zips');
   # Sort all existing documents by _id.
   $cur = $collection->find()->sort({_id => 1});

   # Do this only the first time around. Make our cursor immortal.
   if ($counter == 1) {
   $cur->immortal(1);
   }
   $counter++;

   # If any new document exist, process them.
   while ($cur->has_next()) {
       my $event = Log::Nxlog::logdata_new();
       my $obj = $cur->next;
       my $line = "ID: " . $obj->{"_id"} . " City: " . $obj->{"city"} . " Loc: " . $obj->{"loc"}[0] . "," . $obj->{"loc"}[1] . " Pop: " . $obj->{"pop"} . " State: " . $obj->{"state"};
       Log::Nxlog::set_field_string($event, 'raw_event', $line);
       Log::Nxlog::add_input_data($event);
       # Once the document is processed, delete it.
       my $result = $collection->delete_one( { _id => $obj->{"_id"} } );
       #print $logfile "Extracted document with _id: " . $obj->{"_id"} . " Deleting returned: " . $result->deleted_count . "\n";
    }
}

sub read_data
{
   # Use a try/catch block in order to resume when mongodb is unreachable.
   #open ($logfile, '>>', '/tmp/perl-input.log') or die "Could not open log file";
   try {
     read_data_int();
     # Adjust this timer for how often to look for new documents.
     Log::Nxlog::set_read_timer(1);
     } catch {
     #print $logfile "Error thrown: $_ Will retry in 10 seconds.";
     # Adjust this timer for how often to try to reconnect.
     Log::Nxlog::set_read_timer(10);
  };
}

For this example, a JSON data set of US ZIP (postal) codes was used. The data set was fed to MondoDB with mongoimport -d zips -c zips --file zips.json.

Input Sample
{ "_id" : "01001", "city" : "AGAWAM", "loc" : [ -72.622739, 42.070206 ], "pop" : 15338, "state" : "MA" }
{ "_id" : "01008", "city" : "BLANDFORD", "loc" : [ -72.936114, 42.182949 ], "pop" : 1240, "state" : "MA" }
{ "_id" : "01010", "city" : "BRIMFIELD", "loc" : [ -72.188455, 42.116543 ], "pop" : 3706, "state" : "MA" }
{ "_id" : "01011", "city" : "CHESTER", "loc" : [ -72.988761, 42.279421 ], "pop" : 1688, "state" : "MA" }
{ "_id" : "01020", "city" : "CHICOPEE", "loc" : [ -72.576142, 42.176443 ], "pop" : 31495, "state" : "MA" }
Output Sample
ID: 01001 City: AGAWAM Loc: -72.622739,42.070206 Pop: 15338 State: MA
ID: 01008 City: BLANDFORD Loc: -72.936114,42.182949 Pop: 1240 State: MA
ID: 01010 City: BRIMFIELD Loc: -72.188455,42.116543 Pop: 3706 State: MA
ID: 01011 City: CHESTER Loc: -72.988761,42.279421 Pop: 1688 State: MA
ID: 01020 City: CHICOPEE Loc: -72.576142,42.176443 Pop: 31495 State: MA