70.2. Azure Operations Management Suite (OMS)
The Azure Operations Management Suite is a set of Microsoft cloud services providing log management, backup, automation, and high availability features. Azure Log Analytics is the part of OMS used for log collection, correlation, and analysis.
NXLog can be configured to connect to the OMS Log Analytics service and forward or collect log data via its REST API. See the Azure OMS and Log Analytics documentation for more information about configuring and using Azure OMS and its log management service.
70.2.1. Forwarding Data to Log Analytics
A Python script can be used to perform REST API calls to send log data to the Log Analytics service. To configure NXLog, complete the following steps.
-
Log in to the Azure portal and go to the Log Analytics service (for instance by typing the service name into the search bar).
-
Select an existing OMS Workspace or create a new one by clicking the Add button.
-
From the Management section in the main workspace screen, click OMS Portal.
-
In the Microsoft Operations Management Suite, click the settings icon in the top right corner, navigate to Settings > Connected Sources > Linux Servers, and copy the WORKSPACE ID and PRIMARY KEY values. These are needed for API access.
-
Enable Custom Logs. As of this writing it is a preview feature, available under Settings > Preview Features > Custom Logs.
-
Place the
oms-pipe.py
script in a location accessible by NXLog and make sure it is executable by NXLog. -
Set the customer ID, shared key, and log type values in the script.
-
Configure NXLog to execute the script with the om_exec module. The contents of the
$raw_event
field will be forwarded.
This configuration reads raw events from file and forwards them to Azure OMS.
1
2
3
4
5
6
7
8
9
<Input messages>
Module im_file
File '/var/log/messages'
</Input>
<Output azure_oms>
Module om_exec
Command oms-pipe.py
</Output>
#!/usr/bin/env python
# This is a PoF script that can be used with 'om_exec' NXLog module to
# ship logs to Microsoft Azure Cloud (Log Analytics / OMS) via REST API.
# NXLog configuration:
# -------------------
# <Output out>
# Module om_exec
# Command /tmp/samplepy
# </Output>
# -------------------
import requests
import datetime
import hashlib
import hmac
import base64
import fileinput
# Update the customer ID to your Operations Management Suite workspace ID
customer_id = '<cid>'
# For the shared key, use either the primary or the secondary Connected Sources client authentication key
shared_key = "<skey>"
# The log type is the name of the event that is being submitted
log_type = 'STDIN_PY'
# Build the API signature
def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource):
x_headers = 'x-ms-date:' + date
string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource
bytes_to_hash = bytes(string_to_hash).encode('utf-8')
decoded_key = base64.b64decode(shared_key)
encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest())
authorization = "SharedKey {}:{}".format(customer_id,encoded_hash)
return authorization
# Build and send a request to the POST API
def post_data(customer_id, shared_key, body, log_type):
method = 'POST'
content_type = 'application/json'
resource = '/api/logs'
rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
content_length = len(body)
signature = build_signature(customer_id, shared_key, rfc1123date, content_length, method, content_type, resource)
uri = 'https://' + customer_id + '.ods.opinsights.azure.com' + resource + '?api-version=2016-04-01'
headers = {
'content-type': content_type,
'Authorization': signature,
'Log-Type': log_type,
'x-ms-date': rfc1123date
}
response = requests.post(uri,data=body, headers=headers)
if (response.status_code >= 200 and response.status_code <= 299):
print 'Accepted'
else:
print "Response code: {}".format(response.status_code)
for body in fileinput.input():
post_data(customer_id, shared_key, body, log_type)
With this configuration, NXLog Enterprise Edition reads W3C records with from file with im_file, parses the records with xm_w3c, converts the internal event fields to JSON format with xm_json to_json(), and forwards the result to Azure OMS with om_exec.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<Extension _json>
Module xm_json
</Extension>
<Extension w3c_parser>
Module xm_w3c
</Extension>
<Input messages>
Module im_file
File '/var/log/httpd-log'
InputType w3c_parser
</Input>
<Output azure_oms>
Module om_exec
Command oms-pipe.py
Exec to_json();
</Output>
70.2.2. Downloading Data From Log Analytics
It is also possible to download data from Log Analytics with a Python script. To set this up with NXLog, follow these steps:
-
Register an application in Azure Active Directory and generate an access key for the application.
-
Under your Subscription, go to Access control (IAM) and assign the Log Analytics Reader role to this application.
-
Place the
oms-download.py
script in a location accessible by NXLog. -
Set the resource group, workspace, subscription ID, tenant ID, application ID, and application key values in the script. Adjust the query details as required.
NoteThe Tenant ID can be found as Directory ID under the Azure Active Directory Properties tab. -
Configure NXLog to execute the script with the im_python module.
Detailed instructions on this topic can be found in the Azure documentation.
This configuration uses the im_python module and the
oms-download.py
script to periodically collect log data from the Log
Analytics service.
import datetime
import json
import requests
import adal
import nxlog
class LogReader:
def __init__(self, time_interval):
# Details of workspace. Fill in details for your workspace.
resource_group = '<YOUR_RESOURCE_GROUP>'
workspace = '<YOUR_WORKSPACE>'
# Details of query. Modify these to your requirements.
query = "Type=*"
end_time = datetime.datetime.utcnow()
start_time = end_time - datetime.timedelta(seconds=time_interval)
num_results = 100000 # If not provided, a default of 10 results will be used.
# IDs for authentication. Fill in values for your service principal.
subscription_id = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
tenant_id = 'xxxxxxxx-xxxx-xxxx-xxx-xxxxxxxxxxxx'
application_id = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxx'
application_key = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
# URLs for authentication
authentication_endpoint = 'https://login.microsoftonline.com/'
resource = 'https://management.core.windows.net/'
# Get access token
context = adal.AuthenticationContext('https://login.microsoftonline.com/' + tenant_id)
token_response = context.acquire_token_with_client_credentials('https://management.core.windows.net/', application_id, application_key)
access_token = token_response.get('accessToken')
# Add token to header
headers = {
"Authorization": 'Bearer ' + access_token,
"Content-Type":'application/json'
}
# URLs for retrieving data
uri_base = 'https://management.azure.com'
uri_api = 'api-version=2015-11-01-preview'
uri_subscription = 'https://management.azure.com/subscriptions/' + subscription_id
uri_resourcegroup = uri_subscription + '/resourcegroups/'+ resource_group
uri_workspace = uri_resourcegroup + '/providers/Microsoft.OperationalInsights/workspaces/' + workspace
uri_search = uri_workspace + '/search'
#store log data for NXLog here
self.lines = ""
# Build search parameters from query details
search_params = {
"query": query,
"top": num_results,
"start": start_time.strftime('%Y-%m-%dT%H:%M:%S'),
"end": end_time.strftime('%Y-%m-%dT%H:%M:%S')
}
# Build URL and send post request
uri = uri_search + '?' + uri_api
response = requests.post(uri,json=search_params,headers=headers)
# Response of 200 if successful
if response.status_code == 200:
# Parse the response to get the ID and status
data = response.json()
search_id = data["id"].split("/")
id = search_id[len(search_id)-1]
status = data["__metadata"]["Status"]
# If status is pending, then keep checking until complete
while status == "Pending":
# Build URL to get search from ID and send request
uri_search = uri_search + '/' + id
uri = uri_search + '?' + uri_api
response = requests.get(uri,headers=headers)
# Parse the response to get the status
data = response.json()
status = data["__metadata"]["Status"]
else:
# Request failed
print (response.status_code)
response.raise_for_status()
print ("Total records:" + str(data["__metadata"]["total"]))
print ("Returned top:" + str(data["__metadata"]["top"]))
#write a JSON dump of all events
for event in data['value']:
self.lines += json.dumps(event) + '\n'
def getlogs(self):
if not self.lines:
return None
return self.lines
def read_data(module):
# log pull time interval in seconds
time_interval = 300
module['reader'] = LogReader(time_interval)
reader = module['reader']
logdata = module.logdata_new()
line = reader.getlogs()
if line:
logdata.set_field('raw_event', line)
logdata.post()
nxlog.log_debug("Data posted")
module.set_read_timer(time_interval)
nxlog.log_info("INIT SCRIPT")