70.3. Azure SQL Database
The Azure SQL Database is a managed cloud database service that shares the SQL Server 2016 engine. Azure SQL Database includes scalability, high availability, data protection, and other features. For more information, see the Azure SQL Database Documentation on Microsoft Docs.
Azure SQL database includes auditing features that can be used to generate events based on audit policies. NXLog can be used as a collector for audit data from an Azure SQL Database instance.
Note
|
It is also possible to send SQL audit logs directly to OMS Log Analytics. This can be configured on the Azure portal; see Get started with SQL database auditing on Microsoft Docs. In this case, see Azure Operations Management Suite (OMS) for information about integrating NXLog with OMS Log Analytics. |
To start with, auditing for an instance must be enabled; see Get started with SQL database auditing in the Azure documentation for detailed steps. Once this is done, NXLog can be configured to periodically download the audit logs using either PowerShell or Python.
70.3.1. Using a PowerShell Script
The im_exec module can be used with the azure-sql.ps1
PowerShell
script to download Azure SQL audit logs. The script logs in to the Azure
account and downloads the latest audit file from the blob storage
container. Then it reads the file from disk and prints the events for the
latest time period (default is one hour). Finally, the program waits until the
next execution.
-
The script requires
Microsoft.SqlServer.XE.Core.dll
andMicrosoft.SqlServer.XEvent.Linq.dll
to run. These libraries are distributed with Microsoft SQL Server installations (including XE edition). -
Azure PowerShell needs to be installed as well; this can be done by executing
Install-Module AzureRM -AllowClobber
in PowerShell. For detailed documentation about installing Azure PowerShell, see Install and configure Azure PowerShell in the Azure documentation. -
There are several variables in the script header that need to be set.
Note
|
The procedure for non-interactive Azure authentication might vary,
depending on the account type. This example assumes that a service
principal to access resources has been created. For detailed information
about creating an identity for unattended script execution, see
Use
Azure PowerShell to create a service principal with a certificate in
the Azure documentation. Alternatively, Save-AzureRmContext can be
used to store account information in a JSON file and it can be loaded
later with Import-AzureRmContext .
|
This configuration uses im_exec to run the azure-sql.ps1
PowerShell script. The xm_json module is used to parse the JSON
event data into NXLog fields.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<Extension _json>
Module xm_json
</Extension>
envvar systemroot
<Input azure_sql>
Module im_exec
Command "%systemroot%\System32\WindowsPowerShell\v1.0\powershell.exe"
# Bypass the system execution policy for this session only.
Arg "-ExecutionPolicy"
Arg "Bypass"
# Skip loading the local PowerShell profile.
Arg "-NoProfile"
# This specifies the path to the PowerShell script.
Arg "-File"
Arg "%systemroot%\azure_sql.ps1"
<Exec>
# Parse JSON
parse_json();
# Convert $EventTime field to datetime
$EventTime = parsedate($event_time);
</Exec>
</Input>
# If running 32-bit on a 64-bit system, run 64-bit PowerShell instead.
if ( $env:PROCESSOR_ARCHITEW6432 -eq "AMD64" ) {
Write-Output "Running 64-bit PowerShell."
&"$env:SYSTEMROOT\SysNative\WindowsPowerShell\v1.0\powershell.exe" `
-NonInteractive -NoProfile -ExecutionPolicy Bypass `
-File "$($myInvocation.InvocationName)" $args
exit $LASTEXITCODE
}
################################################################################
# Update these parameters.
# The path to MSSQL Server DLLs
$SharedPath = "C:\Program Files\Microsoft SQL Server\140\Shared";
# The path to local working directory
$localTargetDirectory = "C:\temp\"
# Azure details: credentials, resource group, storage account, container
$accountName ="<YOUR ACCOUNT NAME>"
$password = ConvertTo-SecureString "<YOUR PASSWORD>" -AsPlainText -Force
$resourceGroup = "<YOUR SECOURCE GROUP>"
$storageAccountName = "<YOUR STORAGE ACCOUNT NAME>"
$containerName = "sqldbauditlogs"
# The timestamp is saved to this file for resuming.
$CacheFile = 'C:\nxlog_mssql_auditlog_position.txt'
# The audit file is downloaded and read at this interval in seconds.
$PollInterval = 3600
# Allow this many seconds for new logs to be written to the audit file.
$ReadDelay = 30
################################################################################
# Get audit data from $containerName container in $storageAccountName Azure storage
# in range $start to $end.
function Get-Audit-Data {
param( $accountName, $password, $resourceGroup, $storageAccountName, $containerName, $start, $end )
$xeCore = [System.IO.Path]::Combine($SharedPath,
"Microsoft.SqlServer.XE.Core.dll");
$xeLinq = [System.IO.Path]::Combine($SharedPath,
"Microsoft.SqlServer.XEvent.Linq.dll");
Add-Type -Path $xeLinq;
# Notes on "Microsoft.SqlServer.XE.Core.dll":
# • For SQL 2014, it is a dependency of "Microsoft.SqlServer.XEvent.Linq.dll".
# • For SQL 2012, the file does not exist.
if( [System.IO.File]::Exists($xeCore) ) { Add-Type -Path $xeCore; }
# Log in to Azure account
$credential = New-Object System.Management.Automation.PSCredential($accountName, $password)
Login-AzureRMAccount -Credential $credential
# Get Azure storage account
$storageAccount = Get-AzureRmStorageAccount -ResourceGroupName $resourceGroup -AccountName $storageAccountName
$ctx = $storageAccount.Context
# Download latest audit file
$lastAuditFile = Get-AzureStorageBlob -Container $containerName -Context $ctx | select Name | select -Last 1
$blobName = $lastAuditFile.Name
Get-AzureStorageBlobContent -Blob $blobName `
-Container $containerName `
-Destination $localTargetDirectory `
-Context $ctx
# Get path to the local file
$blobName = $localTargetDirectory+$blobName | % { [System.Io.Path]::GetFullPath($_) }
[Microsoft.SqlServer.XEvent.Linq.QueryableXEventData] $xEvents = $null;
Try {
# Get events from audit file
$xEvents =
New-Object -TypeName Microsoft.SqlServer.XEvent.Linq.QueryableXEventData(
$blobName
)
# These fields contain binary arrays, so they need special handling
$binaryValues = @("permission_bitmask","server_principal_sid","target_server_principal_sid")
ForEach($publishedEvent in $xEvents) {
# Read only events from latest time period
if(($publishedEvent.Timestamp -ge $start) -and ($publishedEvent.Timestamp -le $end)) {
# Create hash table
$record = @{}
ForEach ($fld in $publishedEvent.Fields) {
# Check for binary arrays
if($binaryValues.Contains($fld.Name)){
# Convert binary arrays to hex
$value = ($fld.Value |ForEach-Object ToString X2) -join ''
}
else {
# Convert to string
$value = $fld.Value.ToString();
}
$record += @{$fld.Name = $value }
}
# Return record as JSON
$record | ConvertTo-Json -Compress | Write-Output
}
}
}
Catch {
Write-Host "Exception Message: $($_.Exception.Message)"
}
Finally {
if ($xEvents -is [IDisposable]) {
$xEvents.Dispose();
}
}
# Remove downloaded XEL file
Remove-Item $blobName
}
# Get position timestamp from cache file. On first run, create file using
# current time.
function Get-Position {
param( $file )
Try {
if (Test-Path $file) {
$timestamp = (Get-Date (Get-Content $file -First 1))
$timestamp = $timestamp.ToUniversalTime()
$start = $timestamp.AddTicks(-($timestamp.Ticks % 10000000))
}
else {
$timestamp = [System.DateTime]::UtcNow
$timestamp = $timestamp.AddTicks(-($timestamp.Ticks % 10000000))
Save-Position $file $timestamp
}
return $start
}
Catch {
Write-Error "Failed to read timestamp from position file."
exit 1
}
}
# Save position timestamp to cache file.
function Save-Position {
param( $file, $time )
Try { Out-File -FilePath $file -InputObject $time.ToString('o') }
Catch {
Write-Error "Failed to write timestamp to position file."
exit 1
}
}
# Main
Try {
$start = Get-Position $CacheFile
Write-Debug "Got start time of $($start.ToString('o'))."
$now = [System.DateTime]::UtcNow
$now = $now.AddTicks(-($now.Ticks % 10000000))
Write-Debug "Got current time of $($now.ToString('o'))."
$diff = ($now - $start).TotalSeconds
# Check whether waiting is required to comply with $ReadDelay.
if (($diff - $PollInterval) -lt $ReadDelay) {
$wait = $ReadDelay - $diff + $PollInterval
Write-Debug "Waiting $wait seconds to start collecting logs."
Start-Sleep -Seconds $wait
}
# Repeatedly read from the audit log
while($true) {
Write-Debug "Using range start time of $($start.ToString('o'))."
$now = [System.DateTime]::UtcNow
$now = $now.AddTicks(-($now.Ticks % 10000000))
$end = $now.AddSeconds(-($ReadDelay))
Write-Debug "Using range end time of $($end.ToString('o'))."
foreach($site in $sites) { Get-Audit-Data $accountName $password $resourceGroup $storageAccountName $containerName $start $end }
Write-Debug "Saving position timestamp to cache file."
Save-Position $CacheFile $end
Write-Debug "Waiting $PollInterval seconds before reading again."
Start-Sleep -Seconds $PollInterval
$start = $end
}
}
Catch {
Write-Error "An unhandled exception occurred!"
exit 1
}
70.3.2. Using a Python Script
The azure-sql.py
script can be used with the im_python module
to query the audit file from the database level, save rows into objects, and
pass them to NXLog as events.
-
The script requires installation of the Microsoft ODBC Driver; see Installing the Microsoft ODBC Driver for SQL Server on Linux and macOS on Microsoft Docs.
-
The azure-storage and pyodbc Python packages are also required.
-
There are several variables in the script header that need to be set.
This configuration uses the im_python module to execute the
azure-sql.py
Python script. The script logs in to Azure, collects audit
logs, and creates NXLog events.
1
2
3
4
5
<Input sql>
Module im_python
PythonCode azure_sql.py
Exec $EventTime = parsedate($EventTime);
</Input>
import binascii, collections, datetime, nxlog, pyodbc
from azure.storage.blob import PageBlobService
################################################################################
# Update these parameters.
# MSSQL details
DRIVER = "{ODBC Driver 13 for SQL Server}"
SERVER = 'tcp:XXXXXXXX.database.windows.net'
DATABASE = 'XXXXXXXX'
USERNAME = 'XXXXXXXX@XXXXXXXX'
PASSWORD = 'XXXXXXXX'
# Azure Storage details
STORAGE_ACCOUNT = 'XXXXXXXX'
STORAGE_KEY = 'XXXXXXXX=='
CONTAINER_NAME = 'sqldbauditlogs'
# Log pull time interval in seconds
TIME_INTERVAL = 300
# The timestamp is saved to this file for resuming.
CACHE_FILE = '/opt/nxlog/var/spool/nxlog/nxlog_mssql_auditlog_position.txt'
################################################################################
# Save position to the cache file
def save_position(timestamp):
# Truncate temp file and open for writing
with open(CACHE_FILE, 'w+') as f:
f.write(str(timestamp))
# Read the last time of log query, return None if cache file is empty
def read_position():
# Open or create a new empty temp file
with open(CACHE_FILE, 'a+') as f:
line = f.readline()
if line != "":
position = datetime.datetime.strptime(line, "%Y-%m-%d %H:%M:%S.%f")
else:
position = None
return position
# Get name of a Blob with the latest audit file from Azure Storage
def get_blob():
# Connect to Blob Service
page_blob_service = PageBlobService(account_name=STORAGE_ACCOUNT,
account_key=STORAGE_KEY)
# List all blobs in the container
generator = page_blob_service.list_blobs(CONTAINER_NAME)
# Get the latest blob
last_blob = list(page_blob_service.list_blobs(CONTAINER_NAME))[-1]
latest_audit_log = last_blob.name
# Construct blob URL
blob = "https://{}.blob.core.windows.net/{}/{}".format(STORAGE_ACCOUNT,
CONTAINER_NAME,
latest_audit_log)
return blob
# Build an ordered dictionary from a database row
def build_event(row):
event = [
('EventTime', row[0]),
('SequenceNumber', row[1]),
('ActionId', row[2]),
('Succeeded', row[3]),
('PermissionBitmask', binascii.hexlify(row[4])),
('IsColumnPermission', row[5]),
('SessionId', row[6]),
('ServerPrincipalId', row[7]),
('DatabasePrincipalId', row[8]),
('TargetServerPrincipalId', row[9]),
('TargetDatabasePrincipalId', row[10]),
('ObjectId', row[11]),
('ClassType', row[12]),
('SessionServerPrincipalName', row[13]),
('ServerPrincipalName', row[14]),
('ServerPrincipalSid', binascii.hexlify(row[15])),
('DatabasePrincipalName', row[16]),
('TargetServerPrincipalName', row[17]),
('TargetServerPrincipalSid', row[18]),
('TargetDatabasePrincipalName', row[19]),
('ServerInstanceName', row[20]),
('DatabaseName', row[21]),
('SchemaName', row[22]),
('ObjectName', row[23]),
('Statement', row[24]),
('AdditionalInformation', row[25]),
('FileName', row[26]),
('AuditFileOffset', row[27]),
('UserDefinedEventId', row[28]),
('UserDefinedInformation', row[29]),
('AuditSchemaVersion', row[30]),
('SequenceGroupId', binascii.hexlify(row[31])),
('TransactionId', row[32]),
('ClientIp', row[33]),
('ApplicationName', row[34]),
('DurationMilliseconds', row[35]),
('ResponseRows', row[36]),
('AffectedRows', row[37])
]
return collections.OrderedDict(event)
# Collect log data
def read_logs(query_start, query_end, module):
# Get Blob name from Azure Cloud
blob = get_blob()
# Connect to MSSQL database
cnxn = pyodbc.connect(
"DRIVER={};SERVER={};DATABASE={};UID={};PWD={}".format(DRIVER,
SERVER,
DATABASE,
USERNAME,
PASSWORD))
cursor = cnxn.cursor()
# Query Blob file
cursor.execute(
"""SELECT * FROM sys.fn_get_audit_file(?, default,default)
WHERE event_time > ? AND event_time <= ?""",
blob, str(query_start),str(query_end))
row = cursor.fetchone()
while row:
event = build_event(row)
kv_pairs = ', '.join(['{}={}'.format(k, event[k]) for k in event])
logdata = module.logdata_new()
logdata.set_field('raw_event', kv_pairs)
for k in event:
logdata.set_field(k, str(event[k]))
logdata.post()
row = cursor.fetchone()
cursor.close()
# Save end time
module['end'] = query_end
save_position(query_end)
def read_data(module):
# Get last run time
if 'end' in module:
nxlog.log_info("Last run finished at "+ str(module['end']))
else:
module['end'] = read_position()
# Get start time for new query
if module['end'] is None:
query_start = datetime.datetime.now() - datetime.timedelta(seconds=TIME_INTERVAL)
else:
query_start = module['end']
# Get end time for new query; use a delay in case of time differences
query_end = datetime.datetime.now() - datetime.timedelta(seconds=5)
# Run the query for logs
read_logs(query_start, query_end, module)
# Set next run of the module
module.set_read_timer(TIME_INTERVAL)
nxlog.log_info("INIT SCRIPT")