Summarizing Flow captures with Scapy and Python

Introduction

#scapy #python #pcap #netdev #devnet

Most of my activities have me working in brownfield environments setting up or troubleshooting networks.

The major difficulty in these environments is the legacy applications leave behind with little or no documentation that can help derive network behavior.

So one of my major tools when working with firewalling is packet captures.

Recently I was challenged by a CyberSecurity team for requesting the documented range of ports for the SAP application, this lead me to capturing all traffic from the users to the SAP desitnation subnet, this capture was verbose, and, although it contained all the information I needed, I didn't have a tool to quickly summarize it into something more friendly to the eye.

So I built one :)

Since I'm not a Dev but more of a NetDev I made the most of this side project to learn Test-Driven Development and Unit testing with Pytest.

TL;DR :

Source code is here : https://github.com/craigarms/pcap_summary

And the package is available on Pypi: pip install pcap_summary

Sprint #1

Minimum Viable Product

In this first sprint I'll start with a packet capture of some traffic that has to many lines for my liking, the goal is to retain only the source and destination socket information and the protocol.

The PoC for this step rely's on reading the capture pcap, iterating over the packets and recording the sockets and protocol into a list.

from scapy.all import *
from scapy.layers.inet import TCP, IP, UDP
from tabulate import tabulate

packets = rdpcap('http.pcap') # This demo pcap contains 95 packets
flows = list()

for packet in packets:
    if packet.haslayer(IP):
        if packet.haslayer(TCP):
            protocol = "TCP"
            source_socket = f"{packet[IP].src}:{packet[TCP].sport}"
            destination_socket = f"{packet[IP].dst}:{packet[TCP].dport}"
        if packet.haslayer(UDP):
            protocol = "UDP"
            source_socket = f"{packet[IP].src}:{packet[UDP].sport}"
            destination_socket = f"{packet[IP].dst}:{packet[UDP].dport}"
        flows.append([protocol, source_socket, destination_socket])        

This short code will create a list named flows of all the 95 packets in the demo capture.

To summerize this list we need to only insert a flow into the list if it isn't there, and also not insert it if the reverse is already present.

Which gives us this code snippet :

from scapy.all import *
from scapy.layers.inet import TCP, IP, UDP
from tabulate import tabulate

packets = rdpcap('http.pcap') # This demo pcap contains 95 packets
flows = list()

for packet in packets:
    if packet.haslayer(IP):
        if packet.haslayer(TCP):
            protocol = "TCP"
            source_socket = f"{packet[IP].src}:{packet[TCP].sport}"
            destination_socket = f"{packet[IP].dst}:{packet[TCP].dport}"
        if packet.haslayer(UDP):
            protocol = "UDP"
            source_socket = f"{packet[IP].src}:{packet[UDP].sport}"
            destination_socket = f"{packet[IP].dst}:{packet[UDP].dport}"

        if protocol:
            forward_socket = [protocol, source_socket, destination_socket]
            reverse_socket = [protocol, destination_socket, source_socket]

            if forward_socket not in flows:
                if reverse_socket not in flows:
                    flows.append(forward_socket)        

The flows list now only contains a summary of the capture which is 20 lines, much better :)

Sprint Conclusion

In a few lines of code we are able to make the capture file much more readable, this MVP now needs to be wrapped in a nice script where we can perform Unit testing and add some functions to enable us to lose less information in the summary.

Here we have totally discarded the return portion of the traffic, so we don't know if the flow got a response or not we need to add a counter for this.

Also we need want to retain the TCP flag information to easily identify flows with no SYN-ACK or with only SYN and RESET flags.

Sprint Backlog

This MVP works and gave me lots of ideas, yes sure I could open the capture in Wireshark and look at the conversation statistics, but this means I need to have the capture on a workstation which has a GUI, which most of the time isn't the case when copying captures to a Linux host.

Next sprints until fully satisfied to be able to release will address:

  • Unit testing of all the steps
  • Add a counter for packets seen in each direction
  • Add the TCP flags to the summary

Sprint #2

Unit testing

Unit testing is a familiar term for me, and i understand the concepts, but I'd never created a full package with working tests and high test coverage.

To perform the unit testing I'm going to use Pytest and setup the following directory structure:

pcap_summary
├─── pcap_summary
│? ? ├─── __init__.py           # Empty file
│? ? └─── pcap_summary.py       # Where the code resides
└─── tests
? ? ?├─── __init__.py           # Empty file
? ? ?├─── test_pcap_summary.py  # Where the unit tests reside
? ? ?└─── mock_data
? ? ? ? ? └─── http.pcap        # Demo capture file        

The previous code performs 2 functions:

  1. Reads the Capture file into an iterable object
  2. Filters/summarizes the sockets into a list object

The philosophy of TDD (Test Driven Dev) is to first write the test, and then write the function that produces the desired output.

So we'll define a first test to ensure the capture file was read into an object and that that object contains 95 items :

def test_read_pcap():
    flows = read_pcap('tests/mock_data/http.pcap')
    assert len(flows) == 95        

Next we'll write the test to ensure we can filter the output down to 20 flows as identified in the MVP :

def test_summarize_packets():
    capture = read_pcap('tests/mock_data/http.pcap')
    flows = summarize_packets(capture)
    assert len(flows) == 20        

Without forgetting to import the Python package we are creating:

from pcap_summary.pcap_summary import *        

At the top of the file.

Now lets refactor the code from the MVP into 2 functions satisfying these tests:

from scapy.all import *
from scapy.layers.inet import TCP, IP, UDP


def read_pcap(pcap_file):
    packets = rdpcap(pcap_file) # This demo pcap contains 95 packets
    flows = list()

    for packet in packets:
        if packet.haslayer(IP):
            if packet.haslayer(TCP):
                protocol = "TCP"
                source_socket = f"{packet[IP].src}:{packet[TCP].sport}"
                destination_socket = f"{packet[IP].dst}:{packet[TCP].dport}"
            if packet.haslayer(UDP):
                protocol = "UDP"
                source_socket = f"{packet[IP].src}:{packet[UDP].sport}"
                destination_socket = f"{packet[IP].dst}:{packet[UDP].dport}"
            if protocol:
                forward_socket = [protocol, source_socket, destination_socket]
                flows.append(forward_socket)
    return flows


def summarize_packets(flows):
    summarized_flows = list()
    for flow in flows:
            forward_socket = [flow[0], flow[1], flow[2]]
            reverse_socket = [flow[0], flow[2], flow[1]]

            if forward_socket not in summarized_flows:
                if reverse_socket not in summarized_flows:
                    summarized_flows.append(forward_socket)
    return summarized_flows        

Now to check if it worked, full disclosure the code posted here works but my unit tests never work first time around :)

In the command line at the root of the project, we type

pytest -q
..? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? [100%]
2 passed in 1.91s        

The dots represent test success, so the first function returns 95 flows, and the second 20 !

Sprint Conclusion

We've refactored the MVP code into 2 usable and testable functions which pass our defined unit test, currently we have 100% code coverage with unit tests.

Sprint Backlog

Now we have the framework setup, we need to add the rest of our requirements:

  • Add a counter for packets seen in each direction
  • Add the TCP flags to the summary

Sprint #3

In this sprint we want to add packet count information to know how many packets associated with a socket pair are going in one direction and in the other.

We'll also squeeze in a main function to output the results in a nice tabular format to make this truely useful.

First step is to setup the test, we want to increment a counter each time we see a socket pair in one direction, and another counter if its the same socket pair reversed.

This is what a summerized flow currently looks like:

['TCP', '24.6.173.220:42380', '174.137.42.75:80']        

Its a list with 3 items: Protocol, first seen source IP and Port, first seen destination IP and port.

We are going to append the counters and/or increment them when we call our function. So here is the test for that:


def test_increment_count():
    # Setting up some mock test flows
    socket = ['TCP', '24.6.173.220:42380', '174.137.42.75:80']
    socket2 = ['TCP', '24.6.173.221:4890', '174.137.42.72:80']
    flows = [socket, socket2]
    
    # The first counter will be in the 4 position (3 because 0 is a number)
    increment_count(socket, flows, 3)
    increment_count(socket, flows, 4)
    
    # If all goes well the counters should be initialized to one
    assert flows[0][3] == 1
    assert flows[0][4] == 1

    # Lets add another round to ensure it does increment
    increment_count(socket, flows, 3)
    increment_count(socket, flows, 4)
    
    # If all goes well the counters now be at 2
    assert flows[0][3] == 2
    assert flows[0][4] == 2        

Now lets create the function that adds and increments the counters in the main script:


def increment_count(socket, flow_list, counter_position):
    for sockets in range(len(flow_list)):
        if flow_list[sockets][0] == socket[0] \
                and flow_list[sockets][1] == socket[1] \
                and flow_list[sockets][2] == socket[2]:
            if len(flow_list[sockets]) == counter_position:
                # Add the counter if the position doesn't exist
                flow_list[sockets].append(1)
            elif len(flow_list[sockets]) >= counter_position:
                # Counter exists increment it
                flow_list[sockets][counter_position] = flow_list[sockets][counter_position] + 1        

A quick run of pytest reveals this is now working:

pytest -q
... ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? [100%]
3 passed in 1.92s        

So now we need to add some code to the summary function so that each time we see a flow we increment the counters, but it isn't that simple since the 3-tuple we created will no longer be found in the summary list because we are making it a 5-tuple with the addition of the counters.

We thus need to maintain 2 lists, one without the counters to be able to discard flows that have already been added, but increment them on the correct counter on the list with counters.


def summarize_packets(flows):
    summarized_flows = list()

    # Create a new list with counters
    flows_with_count = list()

    for flow in flows:
            forward_socket = [flow[0], flow[1], flow[2]]
            reverse_socket = [flow[0], flow[2], flow[1]]

            if forward_socket not in summarized_flows:
                if reverse_socket not in summarized_flows:
                    
                    # Copy the contents of the flow to a new list
                    forward_socket_with_count = flow.copy()
                    
                    # Intialize the counter
                    forward_socket_with_count.append(1)
                    
                    # Add the new flow with the counter to the new list
                    flows_with_count.append(forward_socket_with_count)
                    
                    # Add the flow without the counter to the 
                    # list of summarized flows
                    summarized_flows.append(forward_socket)
                else:
                    # The Reverse of the socket pair exists increment 
                    # the counter
                    increment_count(reverse_socket, flows_with_count, 4)
            else:
                # The socket pair exists increment the counter
                increment_count(forward_socket, flows_with_count, 3)

    # Return the list of flows with the counters
    return flows_with_count        

Looks a lot but really we just did what we said :)

Now lets close the sprint with a satisfying result by adding a main function to print the output:

def main():
    capture = read_pcap('tests/mock_data/http.pcap')
    summary = summarize_packets(capture)

    print(tabulate(summary, headers=["Proto", "Src", "Dst", "FCount",
                                     "RCount"]))


if __name__ == "__main__":
    main()        

Now we can run the script from the command line with :

python pcap_summary/pcap_summary.py        

And get a shiny result:

Aucun texte alternatif pour cette image

Sprint Conclusion

That looks good to me, but we are missing the TCP flags, which in the case of the demo file wouldn't tell us much, but IRL we often come across debugging situation were systems are systematically sending a RESET upon SYN request.

Sprint Backlog

As said in the conclusion, the script currently passes all tests, and for the use case of reducing the number of lines we need to read to see whats flowing on the network. But we want more for Debugging:

  • Add the TCP flags to the summary

Sprint #4

This is the penultimate sprint, by now we've gotten a hang of how we are to manipulate the objects we are creating, and full disclosure so has Github CoPilot which i've been using for a month :)

So to extract the TCP Flags we are going to need to modify the first function that extracts information from the packets, then add them to the summary list without creating duplicates or breaking anything.

Speaking a breaking things lets start with the test for the TCP flag addition:

def test_add_tcp_flags():
    # Setting up some mock test flows
    # Notice I added the TCP flags to the end of the list
    # These 3 flows represent the same flow but with different TCP flags
    # So 3 packets of a conversation
    socket = ['TCP', '24.6.173.220:42380', '174.137.42.75:80', 'S']
    socket2 = ['TCP', '24.6.173.220:42380', '174.137.42.75:80', 'SA']
    socket3 = ['TCP', '24.6.173.220:42380', '174.137.42.75:80', 'R']
    flows = [socket, socket2, socket3]
    
    add_tcp_flags(socket, flows)
    assert flows[0][3] == 'S'
    add_tcp_flags(socket2, flows)
    assert flows[0][3] == 'SA'
    add_tcp_flags(socket3, flows)
    assert flows[0][3] == 'SAR'        

In this test we setup 3 packet summaries that represent the same socket pairs but with different TCP flags

The expected result will be that the flags get added to the summary but only if they were not already present.

Also note that since the counters are being added dynamically there position will be shifted by one since we are adding the TCP flags in the 3rd position.

Here is the function that satisfies the test we created :

def add_tcp_flags(socket, flow_list):
    for sockets in range(len(flow_list)):
        if flow_list[sockets][0] == socket[0] \
                and flow_list[sockets][1] == socket[1] \
                and flow_list[sockets][2] == socket[2]:
            for flag in socket[3]:
                if flag not in flow_list[sockets][3]:
                    flow_list[sockets][3] = flow_list[sockets][3] + flag        

We iterate over all the items in the flow list and when we find the one containing the same 3-tuple as the input socket we check if each character of the TCP flag are already set, if not we add them.

A quick Pytest to check before modifying the rest of the code and :

pytest -q
....? ?  ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? [100%]
4 passed in 2.50s        

So now lets add the TCP flags and intialize the flag position at nothing for UDP flows:

def read_pcap(pcap_file):
    [...]

    for packet in packets:
        if packet.haslayer(IP):
            if packet.haslayer(TCP):
                [...]
                flags = packet.sprintf("%TCP.flags%")
            if packet.haslayer(UDP):
                [...]
                flags = "--"
            if protocol:
                forward_socket = [protocol, source_socket,                
                                  destination_socket, flags]        

I've tried to show only the changes for readbility.

Now lets add the flags in the summarization process :

def summarize_packets(flows):
    [...]
    for flow in flows:
            [...]
            reverse_flow = [flow[0], flow[2], flow[1], flow[3]]
            if forward_socket not in summarized_flows:
                if reverse_socket not in summarized_flows:
                    [...]
                else:
                    # Bump the counter position by one
                    increment_count(reverse_socket, flows_with_count, 5)
                    add_tcp_flags(reverse_flow, flows_with_count)
            else:
                # Bump the counter position by one
                increment_count(flow, flows_with_count, 4)
                add_tcp_flags(flow, flows_with_count)        

Finally by adding the addition header to our pretty table :

def main()
   [...]

    print(tabulate(summary, headers=["Proto", "Src", "Dst", "Flags",      
                                     "FCount", "RCount"])):        

We get a great looking result :

Aucun texte alternatif pour cette image

Conclusion

This is starting to look great, we moved the counters one position, and added the flags while not breaking anything thanks to Pytest.

Once we get down this far we can realise that using a list of list wasn't the smartest idea from the start a dictionary could/would have been better to not have all these loops and indexes flotting around. But I guess the key here is learning things and getting the right results.

I did also add a counter to the final script and publish it to Pypi to make is available to all my environments when I need it without having to git clone and build it into the path.

The "live" version also enable filtering of flows based on a search string, ideal for big captures with many flows.

In a future version I want to add application suggest based on the ports used, regex searches and also Json & CSV output. This is for a next project where I want to be able to turn a flow into a live ACL on a given Firewall :)

https://github.com/craigarms/pcap_summary

Thanks to my team mate Fabien Berarde who lives by the capture and reverse engineering complex flows in hex format :D, he is my influence to use captures more and more frequently to understand an application or an issue.

Sébastien PRADERES

<freelance> program manager

1 年

Congrats to both you and your team for this exiting innovation. Proud to work with you all during the past to year. I also learn a lot with you.

回复
Fabien Berarde

Ingénieur réseaux et sécurité

1 年

You gave me credit that I do not deserved this time :). Very nice post, thanks a lot, I also liked that you talked about Unit Testing which I think is uncommon in the network engineering practices.

回复

要查看或添加评论,请登录

社区洞察

其他会员也浏览了