Return to

38. Amazon Web Services (AWS)

AWS is a subsidiary of Amazon that provides various cloud computing services.

38.1. Amazon CloudWatch

Amazon CloudWatch is a set of cloud monitoring services. The CloudWatch Logs service can be used to collect log data from Elastic Compute Cloud (EC2), CloudTrail, Route 53, and other sources. See the CloudWatch documentation for more information about configuring and using CloudWatch Logs.

NXLog can be set up to retrieve CloudWatch log streams in either of two ways:

  • NXLog can connect to the CloudWatch API using the Boto 3 client and poll for logs at regular intervals. This is suitable when a short delay in log collection is acceptable.

  • Or, AWS Lambda can be set up to push log data to NXLog via HTTP. This method offers low latency log collection.

38.1.1. Pulling Logs via the CloudWatch API

  1. A service account must be created for accessing the log data. In the AWS web interface, go to Services > IAM.

  2. Click the Users option in the left-side panel and click the Add user button.

  3. Provide a User name, for example nxlog. Tick the checkbox to allow Programmatic access to this account.

    AWS user configuration, screen 1
  4. Choose to Attach existing policies directly and select the CloudWatchLogsReadOnly policy. Click Next: Review and then Create user.

    AWS user configuration, screen 2
  5. Save access keys for this user and Close.

  6. Install and configure Boto 3, the AWS SDK for Python. See the Boto 3 Quickstart and Credentials documentation for more details.

  7. Edit the region_name and group_name variables in the script, as necessary.

  8. Configure NXLog to execute the script with the im_python module.

Example 191. Using a Amazon CloudWatch Add-On

This example NXLog configuration uses im_python to execute the CloudWatch add-on script. The xm_json parse_json() procedure is then used is parse the JSON log data into fields.

nxlog.conf [Download file]
<Extension _json>
    Module      xm_json

<Input py>
    Module      im_python
    Exec        parse_json();
</Input> [Download file]
import nxlog, boto3, json, time

class LogReader:
    def __init__(self, time_interval):
        client = boto3.client('logs', region_name='eu-central-1')

        self.lines = ""
        all_streams = []
        group_name = '<ENTER GROUP NAME HERE>'

        #query CloudWatch for all log streams in the group
        stream_batch = client.describe_log_streams(logGroupName=group_name)
        all_streams += stream_batch['logStreams']
        start_time = int(time.time()-time_interval)*1000
        end_time = int(time.time())*1000

        while 'nextToken' in stream_batch:
            stream_batch = client.describe_log_streams(
                logGroupName=group_name, nextToken=stream_batch['nextToken'])
            all_streams += stream_batch['logStreams']

        #get log data from all available streams
        for stream in all_streams:
            #get first log batch (up to 10,000 log events)
            logs_batch = client.get_log_events(logGroupName=group_name,
            #write events from the first batch in JSON format
            self.json_dump(logs_batch, group_name, stream['logStreamName'])

            #get next log batches till all the data is collected
            while 'nextToken' in logs_batch:
                logs_batch = client.get_log_events(
                    logGroupName=group_name, logStreamName=stream['logStreamName'],
                    startTime=start_time, endTime=end_time,
                self.json_dump(logs_batch, group_name, stream['logStreamName'])
        nxlog.log_debug('Pulling logs: ' + gettime(start_time) + ' -  ' +
                        gettime(end_time) +  '\n')

    def json_dump(self, cloudwatch_logs, group_name, stream_name):
        for event in cloudwatch_logs['events']:
            event.update({'group': group_name, 'stream': stream_name })
            self.lines += json.dumps(event) + '\n'

    def getlogs(self):
        if not self.lines:
            return None
        return self.lines

def gettime(time_miliseconds):
    return time.strftime('%Y-%m-%d %H:%M:%S',

def read_data(module):
    # log pull time interval in seconds
    time_interval = 300

    module['reader'] = LogReader(time_interval)
    reader = module['reader']
    logdata = module.logdata_new()
    line = reader.getlogs()

    if line:
        logdata.set_field('raw_event', line)
        nxlog.log_debug("Data posted")


nxlog.log_info("INIT SCRIPT")

38.1.2. Accepting Log Data From Lambda via HTTP

Using a push model follows an event-driven computing approach and allows for low latency. In this scenario, an AWS Lambda function sends log data in JSON format with the HTTP POST method. NXLog listens for connections and accepts log data.

  1. In the AWS web interface, go to Services  Lambda and click the Create function button.

  2. Click the Author from scratch button.

  3. Provide the name for the function and choose to create a new role from template. Enter a name for the role associated with Lambda function. Then click the Create function button.

    AWS user configuration, screen 3
  4. Choose to Upload a .ZIP file, select the Python runtime, and change the Handler name to lambda_function.lambda_handler.

  5. Set the correct host and port in Then upload a ZIP archive with that file (and certificates, if needed). Click Save.

    AWS user configuration, screen 4
  6. From the Configuration tab, change to the Triggers tab. Click + Add trigger.

  7. Choose CloudWatch Logs as a trigger for the Lambda function. Select the log group that should be forwarded and provide a Filter Name. Then click Submit.

    AWS user configuration, screen 5
Example 192. Lambda Collection via HTTPS Input

In this example, the im_http module listens for connections from the Lambda script via HTTP. The xm_json parse_json() procedure is then used is parse the JSON log data into fields.

nxlog.conf [Download file]
<Extension _json>
    Module  xm_json

<Input http>
    Module              im_http
    Port                8080
    HTTPSCertFile       %CERTDIR%/server-cert.pem
    HTTPSCertKeyFile    %CERTDIR%/server-key.pem
    HTTPSCAFile         %CERTDIR%/ca.pem
    HTTPSRequireCert    TRUE
    HTTPSAllowUntrusted FALSE
    Exec                parse_json();
</Input> [Download file]
import json, base64, zlib, ssl, http.client

print('Loading function')

def lambda_handler(event, context):
    compressed_logdata = base64.b64decode(event['awslogs']['data'])
    logdata = zlib.decompress(compressed_logdata, 16+ zlib.MAX_WBITS)
    context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)

    # For more details regarding the SSLContext.load_cert_chain()
    # function, please refer to Python's ssl module documentation at
    # <>

    conn = http.client.HTTPSConnection("<HOST>:<PORT>", context=context)
    headers = {"Content-type": "application/json"}
    conn.request('POST', "", logdata, headers)

38.2. Amazon EC2

Amazon EC2 provides cloud-based virtual computing.

When running NXLog in EC2 instances, it may be helpful to include the current instance ID in the collected logs. For more information about retrieving EC2 instance metadata and adding it to event data, see the Amazon Web Services section of the Cloud Instance Metadata chapter.

38.3. Amazon Simple Storage Service (S3)

Amazon S3 is a high availability, low-latency storage service offered by Amazon. For more information, see the AWS Amazon S3 Overview.

NXLog can be set up to send log data to S3 storage or read log data from S3 storage. For more information, see the Amazon S3 add-on documentation.