Links

API Driven Scanning

API driven scanning is the notion of scanning a file and receiving the verdict before it is written anywhere. We see this when your workflow demands a verdict at the time of uploaded. We often hear from customers that they want the file scanned before it resides in Amazon S3. Or they may have aspects of their workflow such that they just need an API driven verdict engine and Amazon S3 may not be in play at all. This is often necessary in applications where users are waiting to be told the upload was successful and the file accepted. APIs allow the application to make a direct handoff of the file to the scanning agent.
Ultimately, API driven scanning provides an API Endpoint verdict engine that can be used inside or out of AWS. You can send files to scan from on-prem or applications residing within AWS or from anywhere you grant access. The API scanning agents sit behind an AWS Load Balancer. You can make the Load Balancer internet-facing or internal depending on your requirements. Learn more about configuring and managing the API endpoint on the API Agent Settings page.

Setup

  1. 1.
    Create a user for API use - How to
  2. 2.
    Setup and configure API Agent Region - How to
  3. 3.
    Integrate HTTP Post calls into your applications - explore samples below
You can use the programming language of your choice as we only require you to leverage HTTP Post to submit the file for scanning. Below are very simple examples of how to submit a file and the results you will see back.
Please note that you can use Sophos, CrowdStrike, or ClamAV for API driven scanning. If you are using ClamAV you can only scan files up to 2GB in size. If you need to scan a file larger than 2GB in size using API driven scanning you'll need to select Sophos. If you are using CrowdStrike you will be limited to files up to 20MB in size and a limited set of file types.

Steps to making the API call:

  1. 1.
    Make request for Auth Token a. Specify content type of JSON in headers b. Capture username and password in JSON c. HTTP Post the data block and headers to <baseURL> + /api/Token
    headers = {'Content-type': 'application/json'}
    json_foo: {"username": "<username here>", "password": "<pw here>"}
    r = session.post("https://<baseURL to load balancer or friendly URL>/api/Token", data=json_foo, headers=headers)
    This will return the following response text:
    {
    "accessToken":"eyJraWQiOiI0Qk41QU1yVXdhWUUrZlBUZ0dhQTZWQUNXUmREMmh2dlMxWFgrUmNmTzd3PSIsImFsZyI6IlJTMjU2In0.eyJzdWIiOiIyMDYyZDQxMC1kMGE0LTRiNTItYjc2Yi03M2FiNWQ5Njk4YWQiLCJjb2duaXRvOmdyb3VwcyI6WyJVc2VycyIsIlByaW1hcnkiXSwiZW1haWxfdmVyaWZpZWQiOnRydWUsImlzcyI6Imh0dHBzOlwvXC9jb2duaXRvLWlkcC51cy1lYXN0LTEuYW1hem9uYXdzLmNvbVwvdXMtZWFzdC0xX1haNWpVNXcwWSIsImN1c3RvbTpoaWRlX3RyaWFsX21zZyI6IjAiLCJjb2duaXRvOnVzZXJuYW1lIjoiZWRjIiwiY3VzdG9tOnVzZXJfZGlzYWJsZWQiOiIwIiwiY3VzdG9tOmF3c19hY2NvdW50X2lkIjoiNzMwMDc5MDI1Njg4IiwiY3VzdG9tOmhpZGVfd2VsY29tZV9tc2ciOiIwIiwiYXVkIjoiNXM2M29raWtodGJxdDR2cTFtMmV1bDk4Z2kiLCJldmVudF9pZCI6IjkwOTJlZjc4LTMzZTMtNDVhNS1hZTlhLTVmN2Y0NGY2NDZmNiIsInRva2VuX3VzZSI6ImlkIiwiYXV0aF90aW1lIjoxNjI1MjgyODU5LCJleHAiOjE2MjUyODY0NTksImlhdCI6MTYyNTI4Mjg1OSwiZW1haWwiOiJzdXBwb3J0QGNsb3Vkc3RvcmFnZXNlYy5jb20ifQ.QehudPO4zTphRq9ch3p6IopzRz7m72D5LquVgnzw8iHfDBbgZLQiAM7uWtkKGQw5fYV5dsB_U0fbcrW6F3ov_U4LcpvLgP88NXk7MR9PprzIQQjvnHRU9z6wy6wavgrK-VdPiqNF7dsKaAJGW6vVZCzFzVIEKaZCThHpqVYbKdiSfVm08nvWsWEM4fxAgCFY8sAr2pNxY5VHydGc_iP4On3H7MSFh1n7ee-lH88Ao8PLWMWQBYlbR6ZFLin7KKi6lhDOE-b4cAGDgPtl4acdw6ha_AWJPxozJILQkSAesl-BbxWquphTJ-oD_jRl7DvJBSbBw3DPNzXcO4w4SMnnLA",
    "tokenType":"Bearer",
    "expiresIn":3600
    }
    Save the access token off for the next call. It is valid for 1 hour if you choose to re-use it.
  2. 2.
    Send the file for scanning a. Specify the headers - big thing here is the accessToken needs to be added, this is the minimum b. Get the file as your language dictates, but should be multipart form upload c. HTTP Post the file and headers <baseURL> + /api/Scan
    headers = {"Prefer": "respond-async", "Content-Type": form.content_type, 'Authorization': 'Bearer ' + accessToken}
    r = session.post("https://<baseURL to load balancer or friendly URL>/api/Scan", headers=headers, data=form, timeout=4000)
    This will return the following response text:
    {
    "dateScanned": "2021-07-02T07:04:18.8896831Z",
    "detectedInfections": [],
    "errorMessage": null,
    "result": "Clean"
    }

Available APIs

Currently, there are 4 available APIs: Token, Scan, and Scan/Existing and Scan/URL. The Scan option has two uses: "scan and return" and "scan and upload". Read on to learn more about these APIs. For more technical docs on the APIs, please check out our API Swagger Docs.
api/Token
To execute the api/Token API, you need to pass the API-user username and password in to be
headers = {'Content-type': 'application/json'}
json_foo: {"username": "<username here>", "password": "<pw here>"}
r = session.post("https://<baseURL to load balancer or friendly URL>/api/Token", data=json_foo, headers=headers)
This will return the following response text:
headers = {'Content-type': 'application/json'}
json_foo: {"username": "<username here>", "password": "<pw here>"}
r = session.post("https://<baseURL to load balancer or friendly URL>/api/Token", data=json_foo, headers=headers)
Save the access token off for the next call. It is valid for 1 hour if you choose to re-use it.
api/Scan
Scan and Return only
headers = {"Prefer": "respond-async", "Content-Type": form.content_type, 'Authorization': 'Bearer ' + accessToken}
r = session.post("https://<baseURL to load balancer or friendly URL>/api/Scan", headers=headers, data=form, timeout=4000)
This will return the following response text
{
"dateScanned": "2021-07-02T07:04:18.8896831Z",
"detectedInfections": [],
"errorMessage": null,
"result": "Clean"
}
Scan and Upload To scan and upload (if clean) you need to add the uploadTo attribute to the form data. This is as simple as augmenting the sample code to include one more field as seen below.
with open(sys.argv[4], 'rb') as f:
form = encoder.MultipartEncoder({
"documents": ("my_file", f, "application/octet-stream"),
"composite": "NONE",
"uploadTo": sys.argv[5],
})
headers = {"Prefer": "respond-async", "Content-Type": form.content_type, 'Authorization': 'Bearer ' + accessToken}
r = session.post("https://<baseURL to load balancer or friendly URL>/api/Scan", headers=headers, data=form, timeout=4000)
If wanting to test this, add to the sample code below and use the following line to run the script:
python ./scanWithAPI.py <username> <password> <base-URL> <file-to-scan> <bucket-name/full-path-to-file-including-filename.type>

The filename chosen must be in the variable uploadTo for it to be considered as it is in the S3 bucket (e.g: uploadTo: bucketname/filename.type). If the full path of the file is not indicated, the file will be considered a Binary data stream, and uploaded as a tmp file.

api/Scan/Existing
To scan a file that already exists within an S3 bucket, you use the api/Scan/Existing API call. This call requires you to pass a block of json as seen below. container and objectPath are required. versionID is optional, but allows you to specify a particular version number to scan. If you do not specify this field, we will scan the latest. uploadedBy is also optional, but allows you to specify who is doing the scanning.
{
"container": "<bucket_containing_the_file>",
"objectPath": "<path_in_bucket_to_file>",
"versionId": "<version_id_to_scan>",
"uploadedBy": "<user_performing_scan>"
}
To execute this call, capture the bucket name and full path to file
json_foo: {"container": "<bucket-name>", "objectPath": "<object-key>"}
headers = {"Content-Type": "application/json", 'Authorization': 'Bearer ' + accessToken}
r = session.post("https://<baseURL to load balancer or friendly URL>/api/Scan/Existing", headers=headers, data=json_foo)
api/Scan/URL
There will be times when scanning a file existing by URL path is desired. Presigned URLs for Amazon S3 objects is a good usecase or anything your applications have access to hanging out on a fully qualified URL. The api/Scan/URL API call will allow you to do this task. You simply need to add the URL field to the form data
To execute this call, capture the bucket name and full path to file
# specify file URL in form data
form = encoder.MultipartEncoder({"url": "some-URL-here", })
headers = {"Content-Type": "application/json", 'Authorization': 'Bearer ' + accessToken}
r = session.post("https://<baseURL to load balancer or friendly URL>/api/Scan/URL", headers=headers, data=form)

Code Samples

You can download and use our Postman collection and environment JSON files below to assist in your testing of our API Scanning functionality:
Both code samples are simple, but documented so take a closer look here to learn more.
Python /scan
Python /scan with upload
Python /scan/existing
Python /scan/url
C# /scan
This is a simple command line example with the base URL and the file to scan passed in on the command line.
python ./scanWithAPI.py <username> <password> <base-URL> <file-to-scan>
import json
import requests
from requests_toolbelt.multipart import encoder
from requests_toolbelt.multipart.encoder import MultipartEncoder
import sys
# baseURL is the value found on the API Agent Settings page as the Default DNS
# this can also be a friendly URL which you've mapped within your DNS
baseURL = sys.argv[3]
# /api/Token is the API to retrieve the auth token
# the auth token is valid for 1 hour, so you can re-use if your application can manage it
getTokenURL = baseURL + '/api/Token'
# /api/Scan is the API to pass the file to for scanning
scanFileURL = baseURL + '/api/Scan'
# must specify the content type as JSON when retrieving the auth token
headers = {'Content-type': 'application/json'}
# as part of the /api/Token HTTP Post you must pass the username and password for the
# user created and configured inside of the Antivirus for Amazon S3 console
# the data block must be passed in JSON format
uname = sys.argv[1]
pw = sys.argv[2]
foo = {"username": "", "password": ""}
foo["username"] = uname
foo["password"] = pw
json_foo = json.dumps(foo)
# make the HTTP post now passing in the username/pw data block and headers
session = requests.Session()
r = session.post(getTokenURL, data=json_foo, headers=headers)
# pull the auth token from the response to use in the scan call below
# valid for 1 hour if you want to re-use
jsonResponse = json.loads(r.text)
accessToken = jsonResponse["accessToken"]
# read file in from wherever it is coming from: form upload, in file system, etc
with open(sys.argv[4], 'rb') as f:
form = encoder.MultipartEncoder({
"documents": ("my_file", f, "application/octet-stream"),
"composite": "NONE",
})
# setup headers for /api/Scan HTTP Post.
# the only thing you really need is the 'Authorization': 'Bearer ' with the auth token
# assigned to that. Depending on how you are reading the file or the language, you may
# need to pass more values in the header as seen below
headers = {"Prefer": "respond-async", "Content-Type": form.content_type, 'Authorization': 'Bearer ' + accessToken}
r = session.post(scanFileURL, headers=headers, data=form, timeout=4000)
# grab the text from the response and check however you will
# below converts the response to JSON for easy formatting and handling
parsed = json.loads(r.text)
print(json.dumps(parsed, indent=4, sort_keys=True))
# do the next portion of your workflow based on what the scan result is
if parsed['result'] == "Clean":
print("file was clean")
#do more work in the workflow here
session.close()
python ./scanWithAPI-withUpload.py <username> <password> <base-URL> <file-to-scan> <bucket-name/file-name>
import json
import requests
from requests_toolbelt.multipart import encoder
from requests_toolbelt.multipart.encoder import MultipartEncoder
import sys
# baseURL is the value found on the API Agent Settings page as the Default DNS
# this can also be a friendly URL which you've mapped within your DNS
baseURL = sys.argv[3]
# /api/Token is the API to retrieve the auth token
# the auth token is valid for 1 hour, so you can re-use if your application can manage it
getTokenURL = baseURL + '/api/Token'
# /api/Scan is the API to pass the file to for scanning
scanFileURL = baseURL + '/api/Scan'
# must specify the content type as JSON when retrieving the auth token
headers = {'Content-type': 'application/json'}
# as part of the /api/Token HTTP Post you must pass the username and password for the
# user created and configured inside of the Antivirus for Amazon S3 console
# the data block must be passed in JSON format
uname = sys.argv[1]
pw = sys.argv[2]
foo = {"username": "", "password": ""}
foo["username"] = uname
foo["password"] = pw
json_foo = json.dumps(foo)
# make the HTTP post now passing in the username/pw data block and headers
session = requests.Session()
r = session.post(getTokenURL, data=json_foo, headers=headers)
# pull the auth token from the response to use in the scan call below
# valid for 1 hour if you want to re-use
jsonResponse = json.loads(r.text)
accessToken = jsonResponse["accessToken"]
# read file in from wherever it is coming from: form upload, in file system, etc
with open(sys.argv[4], 'rb') as f:
form = encoder.MultipartEncoder({
"documents": ("my_file", f, "application/octet-stream"),
"composite": "NONE",
"uploadTo": sys.argv[5],
})
# setup headers for /api/Scan HTTP Post.
# the only thing you really need is the 'Authorization': 'Bearer ' with the auth token
# assigned to that. Depending on how you are reading the file or the language, you may
# need to pass more values in the header as seen below
headers = {"Prefer": "respond-async", "Content-Type": form.content_type, 'Authorization': 'Bearer ' + accessToken}
r = session.post(scanFileURL, headers=headers, data=form, timeout=4000)
# grab the text from the response and check however you will
# below converts the response to JSON for easy formatting and handling
parsed = json.loads(r.text)
print(json.dumps(parsed, indent=4, sort_keys=True))
# do the next portion of your workflow based on what the scan result is
if parsed['result'] == "Clean":
print("file was clean")
#do more work in the workflow here
session.close()
python ./scanWithAPI-Existing.py <username> <password> <base-URL> <bucket-name> <file-name>
import json
import requests
import sys
# baseURL is the value found on the API Agent Settings page as the Default DNS
# this can also be a friendly URL which you've mapped within your DNS
baseURL = sys.argv[3]
# /api/Token is the API to retrieve the auth token
# the auth token is valid for 1 hour, so you can re-use if your application can manage it
getTokenURL = baseURL + '/api/Token'
# /api/Scan is the API to pass the file to for scanning
scanFileURL = baseURL + '/api/Scan'
scanExistingFileURL = baseURL + '/api/Scan/Existing'
print("scan existing URL = " + scanExistingFileURL)
# must specify the content type as JSON when retrieving the auth token
headers = {'Content-type': 'application/json'}
# as part of the /api/Token HTTP Post you must pass the username and password for the
# user created and configured inside of the Antivirus for Amazon S3 console
# the data block must be passed in JSON format
uname = sys.argv[1]
pw = sys.argv[2]
foo = {"username": "", "password": ""}
foo["username"] = uname
foo["password"] = pw
json_foo = json.dumps(foo)
# make the HTTP post now passing in the username/pw data block and headers
session = requests.Session()
r = session.post(getTokenURL, data=json_foo, headers=headers)
# pull the auth token from the response to use in the scan call below
# valid for 1 hour if you want to re-use
jsonResponse = json.loads(r.text)
accessToken = jsonResponse["accessToken"]
print("access token = " + accessToken)
# Setup existing file
foo = {"container": "", "objectPath": ""}
foo["container"] = sys.argv[4]
foo["objectPath"] = sys.argv[5]
json_foo = json.dumps(foo)
print("json foo = " + json_foo)
# setup headers for /api/Scan HTTP Post.
# the only thing you really need is the 'Authorization': 'Bearer ' with the auth token
# assigned to that. Depending on how you are reading the file or the language, you may
# need to pass more values in the header as seen below
headers = {"Content-Type": 'application/json', 'Authorization': 'Bearer ' + accessToken}
r = session.post(scanExistingFileURL, data=json_foo, headers=headers)
# grab the text from the response and check however you will
# below converts the response to JSON for easy formatting and handling
print("r.status_code = " + str(r.status_code))
parsed = json.loads(r.text)
print(json.dumps(parsed, indent=4, sort_keys=True))
# do the next portion of your workflow based on what the scan result is
if parsed['result'] == "Clean":
print("file was clean")
#do more work in the workflow here
session.close()
python ./scanWithAPI-byURL.py <username> <password> <base-URL> <full-URL>
import json
import requests
from requests_toolbelt.multipart import encoder
from requests_toolbelt.multipart.encoder import MultipartEncoder
import sys
# baseURL is the value found on the API Agent Settings page as the Default DNS
# this can also be a friendly URL which you've mapped within your DNS
baseURL = sys.argv[3]
# /api/Token is the API to retrieve the auth token
# the auth token is valid for 1 hour, so you can re-use if your application can manage it
getTokenURL = baseURL + '/api/Token'
# /api/Scan is the API to pass the file to for scanning
scanFileURL = baseURL + '/api/scan/URL'
# must specify the content type as JSON when retrieving the auth token
headers = {'Content-type': 'application/json'}
# as part of the /api/Token HTTP Post you must pass the username and password for the
# user created and configured inside of the Antivirus for Amazon S3 console
# the data block must be passed in JSON format
uname = sys.argv[1]
pw = sys.argv[2]
foo = {"username": "", "password": ""}
foo["username"] = uname
foo["password"] = pw
json_foo = json.dumps(foo)
print("user stuff: " + json_foo)
# make the HTTP post now passing in the username/pw data block and headers
session = requests.Session()
r = session.post(getTokenURL, data=json_foo, headers=headers)
# pull the auth token from the response to use in the scan call below
# valid for 1 hour if you want to re-use
jsonResponse = json.loads(r.text)
accessToken = jsonResponse["accessToken"]
print("accessToken: " + accessToken)
# specify file URL in form data
form = encoder.MultipartEncoder({"url": sys.argv[4], })
# setup headers for /api/Scan HTTP Post.
# the only thing you really need is the 'Authorization': 'Bearer ' with the auth token
# assigned to that. Depending on how you are reading the file or the language, you may
# need to pass more values in the header as seen below
headers = {"Prefer": "respond-async", "Content-Type": form.content_type, 'Authorization': 'Bearer ' + accessToken}
r = session.post(scanFileURL, headers=headers, data=form, timeout=4000)
# grab the text from the response and check however you will
# below converts the response to JSON for easy formatting and handling
parsed = json.loads(r.text)
print(json.dumps(parsed, indent=4, sort_keys=True))
# do the next portion of your workflow based on what the scan result is
if parsed['result'] == "Clean":
print("file was clean")
#do more work in the workflow here
session.close()
// Host URL will either be the default DNS for your Load Balancer or the DNS CNAME you registered on your own domain
const string hostUrl = "https://your-lb-or-registered-dns-name";
// If you are using this in an on-demand per-user basis, ideally you would have users provide their own username and password
// If you are using this inside an application, it would be recommended to make a console user solely for this application
const string username = "av-console-username";
// Hard-coding the password is not a recommended practice,
// ideally it would be stored in something like AWS Secrets Manager, and retrieved by the application to use here,
// or provided by the user if being done on an on-demand basis
const string password = "av-console-password";
// This handler is only needed if you are using the default load balancer DNS, as your SSL certificate will not be valid for it.
// If you've registered your own DNS CNAME that is valid for your certificate, this is not needed.
HttpClientHandler handler =
new()
{
ClientCertificateOptions = ClientCertificateOption.Manual,
ServerCertificateCustomValidationCallback = (_, _, _, _) => true
};
// Timeout is set to infinite as the file upload may take a long time. This could be lowered if desired, just be aware that
// any file uploads taking longer than that will then fail.
HttpClient httpClient =
new(handler)
{
Timeout = System.Threading.Timeout.InfiniteTimeSpan
};
HttpResponseMessage resp =
await httpClient.PostAsync(
$"{hostUrl}/api/token",
new StringContent(
JsonConvert.SerializeObject(
new Dictionary<string, string>
{
{"username", username},
{"password", password}
}),
Encoding.UTF8,
"application/json"));
string tokenResponse = await response.Content.ReadAsStringAsync();
JObject responseJson = JObject.Parse(tokenResponse);
string accessToken = responseJson["accessToken"]?.Value<string>();
httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", accessToken);
// This example is using a file that is stored on disk.
// If for example you already have a stream to the file, just pass that to the new StreamContent() constructor
FileStream fileStream = File.OpenRead("path/to/your/file");
MultipartFormDataContent content =
new()
{
{new StreamContent(fileStream), tmpFile, tmpFile}
};
HttpResponseMessage scanResult =
await httpClient.PostAsync(
scanUrl,
content);
Console.WriteLine(await scanResult.Content.ReadAsStringAsync());
fileStream.Dispose();

Scan Results - JSON formatted

Clean
Infected
{
"dateScanned": "2021-07-02T07:04:18.8896831Z",
"detectedInfections": [],
"errorMessage": null,
"result": "Clean"
}
{
"dateScanned": "2021-07-02T07:09:53.8972969Z",
"detectedInfections": [
{
"file": "DemoFiles/eicarcom2.zip/eicar_com.zip/eicar.com",
"infection": "EICAR-AV-Test"
},
{
"file": "DemoFiles/eicar_com.zip/eicar.com",
"infection": "EICAR-AV-Test"
},
{
"file": "DemoFiles/infected_bill.pdf",
"infection": "Troj/PDFJs-AIA"
},
{
"file": "DemoFiles/eicar.com",
"infection": "EICAR-AV-Test"
},
{
"file": "DemoFiles/urgent_payment.pdf",
"infection": "Troj/PDFJs-AIA"
},
{
"file": "DemoFiles/eicar.com.txt",
"infection": "EICAR-AV-Test"
},
{
"file": "DemoFiles/eicar-from-vincent/eicar_-_wiki.txt",
"infection": "EICAR-AV-Test"
},
{
"file": "DemoFiles/eicar-from-vincent/eicar_-_milestone.txt",
"infection": "EICAR-AV-Test"
},
{
"file": "DemoFiles/eicar-from-vincent/eicar_-_snippet.txt",
"infection": "EICAR-AV-Test"
}
],
"errorMessage": null,
"result": "Infected"
}