Return to
Portfolio

70.3. Azure SQL Database

The Azure SQL Database is a managed cloud database service that shares the SQL Server 2016 engine. Azure SQL Database includes scalability, high availability, data protection, and other features. For more information, see the Azure SQL Database Documentation on Microsoft Docs.

Azure SQL database includes auditing features that can be used to generate events based on audit policies. NXLog can be used as a collector for audit data from an Azure SQL Database instance.

Note
It is also possible to send SQL audit logs directly to OMS Log Analytics. This can be configured on the Azure portal; see Get started with SQL database auditing on Microsoft Docs. In this case, see Azure Operations Management Suite (OMS) for information about integrating NXLog with OMS Log Analytics.

To start with, auditing for an instance must be enabled; see Get started with SQL database auditing in the Azure documentation for detailed steps. Once this is done, NXLog can be configured to periodically download the audit logs using either PowerShell or Python.

70.3.1. Using a PowerShell Script

The im_exec module can be used with the azure-sql.ps1 PowerShell script to download Azure SQL audit logs. The script logs in to the Azure account and downloads the latest audit file from the blob storage container. Then it reads the file from disk and prints the events for the latest time period (default is one hour). Finally, the program waits until the next execution.

  • The script requires Microsoft.SqlServer.XE.Core.dll and Microsoft.SqlServer.XEvent.Linq.dll to run. These libraries are distributed with Microsoft SQL Server installations (including XE edition).

  • Azure PowerShell needs to be installed as well; this can be done by executing Install-Module AzureRM -AllowClobber in PowerShell. For detailed documentation about installing Azure PowerShell, see Install and configure Azure PowerShell in the Azure documentation.

  • There are several variables in the script header that need to be set.

Note
The procedure for non-interactive Azure authentication might vary, depending on the account type. This example assumes that a service principal to access resources has been created. For detailed information about creating an identity for unattended script execution, see Use Azure PowerShell to create a service principal with a certificate in the Azure documentation. Alternatively, Save-AzureRmContext can be used to store account information in a JSON file and it can be loaded later with Import-AzureRmContext.
Example 307. Collecting Azure SQL Audit Logs With PowerShell

This configuration uses im_exec to run the azure-sql.ps1 PowerShell script. The xm_json module is used to parse the JSON event data into NXLog fields.

nxlog.conf [Download file]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<Extension _json>
    Module  xm_json
</Extension>

envvar systemroot
<Input azure_sql>
    Module  im_exec
    Command "%systemroot%\System32\WindowsPowerShell\v1.0\powershell.exe"
    # Bypass the system execution policy for this session only.
    Arg     "-ExecutionPolicy"
    Arg     "Bypass"
    # Skip loading the local PowerShell profile.
    Arg     "-NoProfile"
    # This specifies the path to the PowerShell script.
    Arg     "-File"
    Arg     "%systemroot%\azure_sql.ps1"
    <Exec>
        # Parse JSON
        parse_json();

        # Convert $EventTime field to datetime
        $EventTime = parsedate($event_time);
    </Exec>
</Input>
azure-sql.ps1 [Download file]
# If running 32-bit on a 64-bit system, run 64-bit PowerShell instead.
if ( $env:PROCESSOR_ARCHITEW6432 -eq "AMD64" ) {
  Write-Output "Running 64-bit PowerShell."
  &"$env:SYSTEMROOT\SysNative\WindowsPowerShell\v1.0\powershell.exe" `
  -NonInteractive -NoProfile -ExecutionPolicy Bypass `
  -File "$($myInvocation.InvocationName)" $args
  exit $LASTEXITCODE
}
################################################################################

# Update these parameters.

# The path to MSSQL Server DLLs
$SharedPath = "C:\Program Files\Microsoft SQL Server\140\Shared";

# The path to local working directory
$localTargetDirectory = "C:\temp\"

# Azure details: credentials, resource group, storage account, container
$accountName ="<YOUR ACCOUNT NAME>"
$password = ConvertTo-SecureString "<YOUR PASSWORD>" -AsPlainText -Force
$resourceGroup = "<YOUR SECOURCE GROUP>"
$storageAccountName = "<YOUR STORAGE ACCOUNT NAME>"
$containerName = "sqldbauditlogs"

# The timestamp is saved to this file for resuming.
$CacheFile = 'C:\nxlog_mssql_auditlog_position.txt'

# The audit file is downloaded and read at this interval in seconds.
$PollInterval = 3600

# Allow this many seconds for new logs to be written to the audit file.
$ReadDelay = 30

################################################################################
# Get audit data from $containerName container in $storageAccountName Azure storage
# in range $start to $end.
function Get-Audit-Data {
   param( $accountName, $password, $resourceGroup, $storageAccountName, $containerName, $start, $end )

   $xeCore = [System.IO.Path]::Combine($SharedPath,
       "Microsoft.SqlServer.XE.Core.dll");
   $xeLinq = [System.IO.Path]::Combine($SharedPath,
       "Microsoft.SqlServer.XEvent.Linq.dll");
   Add-Type -Path $xeLinq;

   # Notes on "Microsoft.SqlServer.XE.Core.dll":
   #  • For SQL 2014, it is a dependency of "Microsoft.SqlServer.XEvent.Linq.dll".
   #  • For SQL 2012, the file does not exist.
   if( [System.IO.File]::Exists($xeCore) ) { Add-Type -Path $xeCore; }

   # Log in to Azure account
   $credential = New-Object System.Management.Automation.PSCredential($accountName, $password)
   Login-AzureRMAccount -Credential $credential

   # Get Azure storage account
   $storageAccount = Get-AzureRmStorageAccount -ResourceGroupName $resourceGroup -AccountName $storageAccountName
   $ctx = $storageAccount.Context

   # Download latest audit file
  $lastAuditFile = Get-AzureStorageBlob -Container $containerName -Context $ctx | select Name | select -Last 1
  $blobName = $lastAuditFile.Name
  Get-AzureStorageBlobContent -Blob $blobName `
    -Container $containerName `
    -Destination $localTargetDirectory `
    -Context $ctx

  # Get path to the local file
  $blobName = $localTargetDirectory+$blobName | % { [System.Io.Path]::GetFullPath($_) }

  [Microsoft.SqlServer.XEvent.Linq.QueryableXEventData] $xEvents = $null;

  Try {
       # Get events from audit file
       $xEvents =
         New-Object -TypeName Microsoft.SqlServer.XEvent.Linq.QueryableXEventData(
           $blobName
         )

        # These fields contain binary arrays, so they need special handling
        $binaryValues = @("permission_bitmask","server_principal_sid","target_server_principal_sid")

       ForEach($publishedEvent in $xEvents) {
         # Read only events from latest time period
         if(($publishedEvent.Timestamp -ge $start) -and ($publishedEvent.Timestamp -le $end)) {

           # Create hash table
           $record = @{}

           ForEach ($fld in $publishedEvent.Fields) {

                # Check for binary arrays
                if($binaryValues.Contains($fld.Name)){
                  # Convert binary arrays to hex
                  $value = ($fld.Value |ForEach-Object ToString X2) -join ''
                }
                else {
                  # Convert to string
                  $value = $fld.Value.ToString();
                }
                $record += @{$fld.Name = $value }
           }
           # Return record as JSON
           $record | ConvertTo-Json -Compress | Write-Output

         }
       }
     }
     Catch {
       Write-Host "Exception Message: $($_.Exception.Message)"
     }
     Finally {
       if ($xEvents -is [IDisposable]) {
         $xEvents.Dispose();
       }
     }

     # Remove downloaded XEL file
     Remove-Item $blobName
}

# Get position timestamp from cache file. On first run, create file using
# current time.
function Get-Position {
    param( $file )
    Try {
        if (Test-Path $file) {
            $timestamp = (Get-Date (Get-Content $file -First 1))
            $timestamp = $timestamp.ToUniversalTime()
            $start = $timestamp.AddTicks(-($timestamp.Ticks % 10000000))
        }
        else {
            $timestamp = [System.DateTime]::UtcNow
            $timestamp = $timestamp.AddTicks(-($timestamp.Ticks % 10000000))
            Save-Position $file $timestamp
        }
        return $start
    }
    Catch {
        Write-Error "Failed to read timestamp from position file."
        exit 1
    }
}

# Save position timestamp to cache file.
function Save-Position {
    param( $file, $time )
    Try { Out-File -FilePath $file -InputObject $time.ToString('o') }
    Catch {
        Write-Error "Failed to write timestamp to position file."
        exit 1
    }
}

# Main
Try {
    $start = Get-Position $CacheFile
    Write-Debug "Got start time of $($start.ToString('o'))."
    $now = [System.DateTime]::UtcNow
    $now = $now.AddTicks(-($now.Ticks % 10000000))
    Write-Debug "Got current time of $($now.ToString('o'))."
    $diff = ($now - $start).TotalSeconds
    # Check whether waiting is required to comply with $ReadDelay.
    if (($diff - $PollInterval) -lt $ReadDelay) {
        $wait = $ReadDelay - $diff + $PollInterval
        Write-Debug "Waiting $wait seconds to start collecting logs."
        Start-Sleep -Seconds $wait
    }
    # Repeatedly read from the audit log
    while($true) {
        Write-Debug "Using range start time of $($start.ToString('o'))."
        $now = [System.DateTime]::UtcNow
        $now = $now.AddTicks(-($now.Ticks % 10000000))
        $end = $now.AddSeconds(-($ReadDelay))
        Write-Debug "Using range end time of $($end.ToString('o'))."
        foreach($site in $sites) { Get-Audit-Data $accountName $password $resourceGroup $storageAccountName $containerName $start $end }
        Write-Debug "Saving position timestamp to cache file."
        Save-Position $CacheFile $end
        Write-Debug "Waiting $PollInterval seconds before reading again."
        Start-Sleep -Seconds $PollInterval
        $start = $end
    }
}
Catch {
    Write-Error "An unhandled exception occurred!"
    exit 1
}

70.3.2. Using a Python Script

The azure-sql.py script can be used with the im_python module to query the audit file from the database level, save rows into objects, and pass them to NXLog as events.

Example 308. Collecting Azure SQL Audit Logs With Python

This configuration uses the im_python module to execute the azure-sql.py Python script. The script logs in to Azure, collects audit logs, and creates NXLog events.

nxlog.conf [Download file]
1
2
3
4
5
<Input sql>
    Module      im_python
    PythonCode  azure_sql.py
    Exec        $EventTime = parsedate($EventTime);
</Input>
azure-sql.py [Download file]
import binascii, collections, datetime, nxlog, pyodbc
from azure.storage.blob import PageBlobService

################################################################################

# Update these parameters.

# MSSQL details
DRIVER = "{ODBC Driver 13 for SQL Server}"
SERVER = 'tcp:XXXXXXXX.database.windows.net'
DATABASE = 'XXXXXXXX'
USERNAME = 'XXXXXXXX@XXXXXXXX'
PASSWORD = 'XXXXXXXX'

# Azure Storage details
STORAGE_ACCOUNT = 'XXXXXXXX'
STORAGE_KEY = 'XXXXXXXX=='
CONTAINER_NAME = 'sqldbauditlogs'

# Log pull time interval in seconds
TIME_INTERVAL = 300

# The timestamp is saved to this file for resuming.
CACHE_FILE = '/opt/nxlog/var/spool/nxlog/nxlog_mssql_auditlog_position.txt'

################################################################################

# Save position to the cache file
def save_position(timestamp):
    # Truncate temp file and open for writing
    with open(CACHE_FILE, 'w+') as f:
        f.write(str(timestamp))

# Read the last time of log query, return None if cache file is empty
def read_position():
    # Open or create a new empty temp file
    with open(CACHE_FILE, 'a+') as f:
        line = f.readline()
        if line != "":
            position = datetime.datetime.strptime(line, "%Y-%m-%d %H:%M:%S.%f")
        else:
            position = None
    return position

# Get name of a Blob with the latest audit file from Azure Storage
def get_blob():
    # Connect to Blob Service
    page_blob_service = PageBlobService(account_name=STORAGE_ACCOUNT,
                                            account_key=STORAGE_KEY)
    # List all blobs in the container
    generator = page_blob_service.list_blobs(CONTAINER_NAME)
    # Get the latest blob
    last_blob = list(page_blob_service.list_blobs(CONTAINER_NAME))[-1]
    latest_audit_log = last_blob.name
    # Construct blob URL
    blob = "https://{}.blob.core.windows.net/{}/{}".format(STORAGE_ACCOUNT,
                                                              CONTAINER_NAME,
                                                              latest_audit_log)
    return blob


# Build an ordered dictionary from a database row
def build_event(row):
    event = [
        ('EventTime', row[0]),
        ('SequenceNumber', row[1]),
        ('ActionId', row[2]),
        ('Succeeded', row[3]),
        ('PermissionBitmask', binascii.hexlify(row[4])),
        ('IsColumnPermission', row[5]),
        ('SessionId', row[6]),
        ('ServerPrincipalId', row[7]),
        ('DatabasePrincipalId', row[8]),
        ('TargetServerPrincipalId', row[9]),
        ('TargetDatabasePrincipalId', row[10]),
        ('ObjectId', row[11]),
        ('ClassType', row[12]),
        ('SessionServerPrincipalName', row[13]),
        ('ServerPrincipalName', row[14]),
        ('ServerPrincipalSid', binascii.hexlify(row[15])),
        ('DatabasePrincipalName', row[16]),
        ('TargetServerPrincipalName', row[17]),
        ('TargetServerPrincipalSid', row[18]),
        ('TargetDatabasePrincipalName', row[19]),
        ('ServerInstanceName', row[20]),
        ('DatabaseName', row[21]),
        ('SchemaName', row[22]),
        ('ObjectName', row[23]),
        ('Statement', row[24]),
        ('AdditionalInformation', row[25]),
        ('FileName', row[26]),
        ('AuditFileOffset', row[27]),
        ('UserDefinedEventId', row[28]),
        ('UserDefinedInformation', row[29]),
        ('AuditSchemaVersion', row[30]),
        ('SequenceGroupId', binascii.hexlify(row[31])),
        ('TransactionId', row[32]),
        ('ClientIp', row[33]),
        ('ApplicationName', row[34]),
        ('DurationMilliseconds', row[35]),
        ('ResponseRows', row[36]),
        ('AffectedRows', row[37])
    ]
    return collections.OrderedDict(event)

# Collect log data
def read_logs(query_start, query_end, module):

    # Get Blob name from Azure Cloud
    blob = get_blob()

    # Connect to MSSQL database
    cnxn = pyodbc.connect(
        "DRIVER={};SERVER={};DATABASE={};UID={};PWD={}".format(DRIVER,
                                                               SERVER,
                                                               DATABASE,
                                                               USERNAME,
                                                               PASSWORD))
    cursor = cnxn.cursor()

    # Query Blob file
    cursor.execute(
        """SELECT * FROM sys.fn_get_audit_file(?, default,default)
        WHERE event_time > ? AND event_time <= ?""",
        blob, str(query_start),str(query_end))
    row = cursor.fetchone()

    while row:
        event = build_event(row)
        kv_pairs = ', '.join(['{}={}'.format(k, event[k]) for k in event])
        logdata = module.logdata_new()
        logdata.set_field('raw_event', kv_pairs)
        for k in event:
            logdata.set_field(k, str(event[k]))
        logdata.post()
        row = cursor.fetchone()
    cursor.close()

    # Save end time
    module['end'] = query_end
    save_position(query_end)

def read_data(module):
    # Get last run time
    if 'end' in module:
        nxlog.log_info("Last run finished at "+ str(module['end']))
    else:
        module['end'] = read_position()

    # Get start time for new query
    if module['end'] is None:
        query_start =  datetime.datetime.now() - datetime.timedelta(seconds=TIME_INTERVAL)
    else:
        query_start = module['end']

    # Get end time for new query; use a delay in case of time differences
    query_end = datetime.datetime.now() - datetime.timedelta(seconds=5)

    # Run the query for logs
    read_logs(query_start, query_end, module)
    # Set next run of the module
    module.set_read_timer(TIME_INTERVAL)

nxlog.log_info("INIT SCRIPT")