Microsoft Azure Sentinel 101: Log Source, DataTable & End Point Monitoring — Be alerted when a source goes down or stops responding.
One of the most important thing is monitoring log ingestion and making alerts for when sources go down.
Previously Developed Content
If you have not already, checkout the daily report guides:
https://medium.com/@truvis.thornton/microsoft-azure-sentinel-101-using-logic-apps-to-build-dynamic-automated-email-reports-for-75f2f6a92f93 https://medium.com/@truvis.thornton/microsoft-azure-sentinel-101-daily-reports-for-cost-analysis-c17618c2e32a
Be sure to also checkout these dashes that were created:
With the shameless plugs dropped, lets move on to creating the alerts.
Data Connector Monitoring
There are many different ways you can do this, and Microsoft has attempted to give us this by offering a way to check Data Connector heath. However, this only works for some of the connectors and if they fully go down, and sometimes, even when they do, they still show up.
Enable Data Connector Monitoring
Alert Queries
Detect latest failure events per connector:
SentinelHealth
| where TimeGenerated > ago(3d)
| where OperationName == 'Data fetch status change'
| where Status in ('Success', 'Failure')
| summarize TimeGenerated = arg_max(TimeGenerated,*) by SentinelResourceName, SentinelResourceId
| where Status == 'Failure'
Detect connectors with changes from fail to success state:
let lastestStatus = SentinelHealth
| where TimeGenerated > ago(12h)
| where OperationName == 'Data fetch status change'
| where Status in ('Success', 'Failure')
| project TimeGenerated, SentinelResourceName, SentinelResourceId, LastStatus = Status
| summarize TimeGenerated = arg_max(TimeGenerated,*) by SentinelResourceName, SentinelResourceId;
let nextToLastestStatus = SentinelHealth
| where TimeGenerated > ago(12h)
| where OperationName == 'Data fetch status change'
| where Status in ('Success', 'Failure')
| join kind = leftanti (lastestStatus) on SentinelResourceName, SentinelResourceId, TimeGenerated
| project TimeGenerated, SentinelResourceName, SentinelResourceId, NextToLastStatus = Status
| summarize TimeGenerated = arg_max(TimeGenerated,*) by SentinelResourceName, SentinelResourceId;
lastestStatus
| join kind=inner (nextToLastestStatus) on SentinelResourceName, SentinelResourceId
| where NextToLastStatus == 'Failure' and LastStatus == 'Success'
Detect connectors with changes from success to fail state:
let lastestStatus = SentinelHealth
| where TimeGenerated > ago(12h)
| where OperationName == 'Data fetch status change'
| where Status in ('Success', 'Failure')
| project TimeGenerated, SentinelResourceName, SentinelResourceId, LastStatus = Status
| summarize TimeGenerated = arg_max(TimeGenerated,*) by SentinelResourceName, SentinelResourceId;
let nextToLastestStatus = SentinelHealth
| where TimeGenerated > ago(12h)
| where OperationName == 'Data fetch status change'
| where Status in ('Success', 'Failure')
| join kind = leftanti (lastestStatus) on SentinelResourceName, SentinelResourceId, TimeGenerated
| project TimeGenerated, SentinelResourceName, SentinelResourceId, NextToLastStatus = Status
| summarize TimeGenerated = arg_max(TimeGenerated,*) by SentinelResourceName, SentinelResourceId;
lastestStatus
| join kind=inner (nextToLastestStatus) on SentinelResourceName, SentinelResourceId
| where NextToLastStatus == 'Success' and LastStatus == 'Failure'
Log Source Monitoring
There are two main ways to go about this. DataTables and EndPoints.
DataTables are the tables like “SecuirtyEvent” and “Syslog”
EndPoints are the actual end point source like a Server or Firewall.
It’s important to know that if you monitor DataTables with several endpoints, you won’t know if one drops. Even if you do spike/drop type monitoring, it can still miss just one going down.
EndPoint monitoring is ideal, but it can be noisy, especially if you have lots of machines that go up and down, so the best idea is to break this one out by types and a naming convention to monitor.
领英推荐
DataTable Monitoring
If we wanted to go the spike way, we could do something like this. You will want to play with the settings till you find something that works for your configuration.
Detects a spike in events on a data table
//+3 is the suggested number and it indicates a strong anomaly though you can modify it : Outlier - Wikipedia
let UpperThreshold = 30;
Usage
//we are only interested in tables getting notified when a spike is detected in a billable table
| where IsBillable == "true"
//Allows you to report only on variations that are above a certain threshold that you deem significant enough to warrant an alert
| where Quantity > 10
//creates a time series to look at the ingestion pattern over the period defined in the LookBack variable
| make-series Qty=sum(Quantity) on TimeGenerated from ago(30d) to now() step 1d by DataType
//takes the time series of ingested data across the days specified in the ‘LookBack’ variable and extract anomalous points with scores based on predicted values using the linear regression concept. See https://docs.microsoft.com/en-us/azure/data-explorer/kusto/query/series-decompose-anomaliesfunction for a detailed explanation of each argument. For an explanation of 'ctukey' read: Outlier - Wikipedia.
| extend (anomalies, score, baseline) = series_decompose_anomalies(Qty, 1.5, 7, 'linefit', 1, 'ctukey', 0.01)
//the output of series_decompose_anomalies function is three things: A ternary (as opposed to binary) series containing (+1, -1, 0) marking up/down/no anomaly respectively, the Anomaly score and the predicted value or baseline.
| where anomalies[-1] == 1 or anomalies[-1] == -1
//this part picks up the anomaly state from the most recent run. -1 indicates a position in the array.
| extend Score = score[-1]
//compare with strong anomaly indicator values extracted from the time series data
| where Score >= 30
| extend PercentageQtyIncrease = ((round(todouble(Qty[-1]),0)-round(todouble(baseline[-1]),1))/round(todouble(Qty[-1]),0) * 100)
| project DataType,ExpectedQty=round(todouble(baseline[-1]),0), ActualQty=round(todouble(Qty[-1]),0),round(PercentageQtyIncrease,0)
| order by round(todouble(PercentageQtyIncrease),0) desc
//only alert if the percentage increase exceeds the threshold beyond which you specified that you wish to be notified
| where PercentageQtyIncrease > 10
Detects a percent change in a table
let lookback = Syslog
| where TimeGenerated between (ago(2h) .. ago(1h))
| summarize count() by bin(TimeGenerated, 1m)
| extend counttemp = count_ / 60
| summarize jo = "jo", preveps = avg(counttemp);
Syslog
| where TimeGenerated between (ago(1h) .. now())
| summarize count() by bin(TimeGenerated, 1m)
| extend counttemp = count_ / 60
| summarize jo = "jo", noweps = avg(counttemp)
| join lookback on $left.jo == $right.jo
| extend PercentageChange=todouble(preveps) / todouble(noweps) * 100
// no change is 100. go up or down to detect a spike over the specified timeframe
| where PercentageChange < 95
If we want to know when a DataTable went down we could do something like this
// Find tables that previously had logs that are now missing logs
let lookback = 2d;
let IgnoreTables = dynamic(['AzureMetrics', 'BehaviorAnalytics', 'ProtectionStatus', 'SecurityAlert', 'SecurityBaseline', 'Update', 'UpdateSummary', 'Usage', 'UserAccessAnalytics', 'UserPeerAnalytics']);
let AllTables = union withsource=tbl *
| where TimeGenerated > ago(lookback)
| where tbl !in (IgnoreTables);
AllTables
| where TimeGenerated < ago(lookback/2)
| summarize Previous = count() by tbl
| join kind=leftanti (
AllTables
| where TimeGenerated > ago(lookback/2)
// Ignore weekends
| where dayofweek(TimeGenerated)/1d between (1 .. 5)
| summarize Current = count() by tbl
) on tbl
| extend
timestamp = now(),
HostCustomEntity = tbl,
Current = 0
EndPoint Monitoring
Global search all tables find a dropped source(Endpoint)
This is a great way to get all EndPoints, but it can be noisy, so you may wish to change this to match a certain naming convention and also set it to a specific table.
// Find any endpoint anywhere that went down
union withsource = _TableName *
| where isnotempty(Computer)
| summarize last_log = datetime_diff("hour",now(), max(TimeGenerated)) by Computer
| where last_log > 1
| project ['Endpoint Name'] = Computer, ['Last Record Received (Minutes)'] = last_log
| order by ['Last Record Received (Minutes)'] desc
a specific table by type to find a dropped source(Endpoint)
This can be a great way to monitor primary devices like Firewalls.
// Find end point that went down by table
SecurityEvent
| where isnotempty(Computer)
| summarize last_log = datetime_diff("hour",now(), max(TimeGenerated)) by Computer
| where last_log > 1
| project ['Endpoint Name'] = Computer, ['Last Record Received (Minutes)'] = last_log
| order by ['Last Record Received (Minutes)'] desc
Conclusion
Hopefully this gives you some ideas and way on how to find downed sources and monitor your environment as a whole.
While this only touched on a few ways and concepts, I advise you to take these ideas and expand on them for your environment.
—
If you are new to my content, be sure to follow/connect with me on all my other social media networks for new ideas and solutions to complicated real world problems.