38. Amazon Web Services (AWS)
AWS is a subsidiary of Amazon that provides various cloud computing services.
38.1. Amazon CloudWatch
Amazon CloudWatch is a set of cloud monitoring services. The CloudWatch Logs service can be used to collect log data from Elastic Compute Cloud (EC2), CloudTrail, Route 53, and other sources. See the CloudWatch documentation for more information about configuring and using CloudWatch Logs.
NXLog can be set up to retrieve CloudWatch log streams in either of two ways:
-
NXLog can connect to the CloudWatch API using the Boto 3 client and poll for logs at regular intervals. This is suitable when a short delay in log collection is acceptable.
-
Or, AWS Lambda can be set up to push log data to NXLog via HTTP. This method offers low latency log collection.
38.1.1. Pulling Logs via the CloudWatch API
-
A service account must be created for accessing the log data. In the AWS web interface, go to Services > IAM.
-
Click the Users option in the left-side panel and click the Add user button.
-
Provide a User name, for example
nxlog
. Tick the checkbox to allow Programmatic access to this account. -
Choose to Attach existing policies directly and select the CloudWatchLogsReadOnly policy. Click Next: Review and then Create user.
-
Save access keys for this user and Close.
-
Install and configure Boto 3, the AWS SDK for Python. See the Boto 3 Quickstart and Credentials documentation for more details.
-
Edit the
region_name
andgroup_name
variables in thecloudwatch.py
script, as necessary. -
Configure NXLog to execute the script with the im_python module.
This example NXLog configuration uses im_python to execute the CloudWatch add-on script. The xm_json parse_json() procedure is then used is parse the JSON log data into fields.
1
2
3
4
5
6
7
8
9
<Extension _json>
Module xm_json
</Extension>
<Input py>
Module im_python
PythonCode cloudwatch.py
Exec parse_json();
</Input>
import nxlog, boto3, json, time
class LogReader:
def __init__(self, time_interval):
client = boto3.client('logs', region_name='eu-central-1')
self.lines = ""
all_streams = []
group_name = '<ENTER GROUP NAME HERE>'
#query CloudWatch for all log streams in the group
stream_batch = client.describe_log_streams(logGroupName=group_name)
all_streams += stream_batch['logStreams']
start_time = int(time.time()-time_interval)*1000
end_time = int(time.time())*1000
while 'nextToken' in stream_batch:
stream_batch = client.describe_log_streams(
logGroupName=group_name, nextToken=stream_batch['nextToken'])
all_streams += stream_batch['logStreams']
nxlog.log_debug(str(len(all_streams)))
#get log data from all available streams
for stream in all_streams:
#get first log batch (up to 10,000 log events)
logs_batch = client.get_log_events(logGroupName=group_name,
logStreamName=stream['logStreamName'],
startTime=start_time,
endTime=end_time)
#write events from the first batch in JSON format
self.json_dump(logs_batch, group_name, stream['logStreamName'])
#get next log batches till all the data is collected
while 'nextToken' in logs_batch:
logs_batch = client.get_log_events(
logGroupName=group_name, logStreamName=stream['logStreamName'],
startTime=start_time, endTime=end_time,
nextToken=logs_batch['nextToken'])
self.json_dump(logs_batch, group_name, stream['logStreamName'])
nxlog.log_debug('Pulling logs: ' + gettime(start_time) + ' - ' +
gettime(end_time) + '\n')
def json_dump(self, cloudwatch_logs, group_name, stream_name):
for event in cloudwatch_logs['events']:
event.update({'group': group_name, 'stream': stream_name })
self.lines += json.dumps(event) + '\n'
def getlogs(self):
if not self.lines:
return None
return self.lines
def gettime(time_miliseconds):
return time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(time_miliseconds/1000))
def read_data(module):
# log pull time interval in seconds
time_interval = 300
module['reader'] = LogReader(time_interval)
reader = module['reader']
logdata = module.logdata_new()
line = reader.getlogs()
if line:
logdata.set_field('raw_event', line)
logdata.post()
nxlog.log_debug("Data posted")
module.set_read_timer(time_interval)
nxlog.log_info("INIT SCRIPT")
38.1.2. Accepting Log Data From Lambda via HTTP
Using a push model follows an event-driven computing approach and allows for low latency. In this scenario, an AWS Lambda function sends log data in JSON format with the HTTP POST method. NXLog listens for connections and accepts log data.
-
In the AWS web interface, go to
and click the Create function button. -
Click the Author from scratch button.
-
Provide the name for the function and choose to create a new role from template. Enter a name for the role associated with Lambda function. Then click the Create function button.
-
Choose to Upload a .ZIP file, select the Python runtime, and change the Handler name to
lambda_function.lambda_handler
. -
Set the correct host and port in
lambda_function.py
. Then upload a ZIP archive with that file (and certificates, if needed). Click Save. -
From the Configuration tab, change to the Triggers tab. Click + Add trigger.
-
Choose CloudWatch Logs as a trigger for the Lambda function. Select the log group that should be forwarded and provide a Filter Name. Then click Submit.
In this example, the im_http module listens for connections from the Lambda script via HTTP. The xm_json parse_json() procedure is then used is parse the JSON log data into fields.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<Extension _json>
Module xm_json
</Extension>
<Input http>
Module im_http
ListenAddr 127.0.0.1
Port 8080
HTTPSCertFile %CERTDIR%/server-cert.pem
HTTPSCertKeyFile %CERTDIR%/server-key.pem
HTTPSCAFile %CERTDIR%/ca.pem
HTTPSRequireCert TRUE
HTTPSAllowUntrusted FALSE
Exec parse_json();
</Input>
import json, base64, zlib, ssl, http.client
print('Loading function')
def lambda_handler(event, context):
compressed_logdata = base64.b64decode(event['awslogs']['data'])
logdata = zlib.decompress(compressed_logdata, 16+ zlib.MAX_WBITS)
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.load_verify_locations("ca.pem")
# For more details regarding the SSLContext.load_cert_chain()
# function, please refer to Python's ssl module documentation at
# <https://docs.python.org/3/library/ssl.html#ssl.SSLContext>
context.load_cert_chain("client.pem")
conn = http.client.HTTPSConnection("<HOST>:<PORT>", context=context)
conn.set_debuglevel(3)
headers = {"Content-type": "application/json"}
conn.request('POST', "", logdata, headers)
conn.close()
38.2. Amazon EC2
Amazon EC2 provides cloud-based virtual computing.
When running NXLog in EC2 instances, it may be helpful to include the current instance ID in the collected logs. For more information about retrieving EC2 instance metadata and adding it to event data, see the Amazon Web Services section of the Cloud Instance Metadata chapter.
38.3. Amazon Simple Storage Service (S3)
Amazon S3 is a high availability, low-latency storage service offered by Amazon. For more information, see the AWS Amazon S3 Overview.
NXLog can be set up to send log data to S3 storage or read log data from S3 storage. For more information, see the Amazon S3 add-on documentation.