Examples¶
Threat Intelligence¶
Create Indicator to Group Association 1¶
1 2 3 4 5 6 7 8 9 10 11 12 13 | # Creating a Indicator and Group to insure that a association can be made.
indicator_kwargs = {'ip': '1.1.1.1', 'rating': 0, 'confidence': 0 }
group_kwargs = { 'fileName': 'rand_filename', 'name': 'rand_name' }
indicator = self.ti.indicator('address', self.owner, **indicator_kwargs)
group = self.ti.group('document', self.owner, **group_kwargs)
# Can be done this way as well:
# indicator = self.ti.address(self.owner, **indicator_kwargs)
# group = self.ti.document(self.owner, **group_kwargs)
indicator.create()
group.create()
# indicator to associate
indicator.add_association(group)
|
Create Indicator to Group Association 2¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | # Creating a Indicator and Group to insure that a association can be made.
indicator_kwargs = {'ip': '1.1.1.1', 'rating': 0, 'confidence': 0 }
group_kwargs = { 'fileName': 'rand_filename', 'name': 'rand_name' }
indicator = self.ti.indicator('address', self.owner, **indicator_kwargs)
group = self.ti.group('document', self.owner, **group_kwargs)
indicator.create()
group.create()
# Create indicators/groups representing the indicators/groups in TC
indicator = self.ti.indicator('address', self.owner, ip='1.1.1.1')
group = self.ti.group('document', self.owner, id=group.unique_id)
# Can be done this way as well:
# indicator = self.ti.address(self.owner, '1.1.1.1')
# group = self.ti.document(self.owner, id=group.unique_id)
# indicator to associate
indicator.add_association(group)
|
Retrieve Indicator to Group Associations¶
1 2 3 4 5 6 7 | # Creating a Indicator to make sure group associations can be retrieved.
indicator_kwargs = {'ip': '1.1.1.1', 'rating': 0, 'confidence': 0 }
indicator = self.ti.address(owner=self.owner, ip='1.1.1.1', 'rating': 0, 'confidence': 0)
indicator.create()
for association in indicator.group_associations():
print(association)
|
Retrieve Group to Indicator Associations¶
1 2 3 4 5 6 7 | # Creating a Indicator to make sure group associations can be retrieved.
group_kwargs = { 'fileName': 'rand_filename', 'name': 'rand_name' }
group = self.ti.document(owner=self.owner, **group_kwargs)
group.create()
for association in group.indicator_associations():
print(association)
|
Enable DNS Whois Group¶
1 2 3 4 5 6 7 8 9 10 11 12 | group = self.tcex.ti.group(group_type=group_type, owner=owner, unique_id=group_id)
# get indicator associations
for indicator in group.indicator_associations():
if indicator.get('type') == 'Host':
ti_host = self.ti.host(
hostName=indicator.get('summary'),
owner=self.owner,
dns_active=True,
whois_active=True
)
r = ti_host.update()
ti_host_data = r.json().get('data', {}).get('host', {})
|
Retrieve Inactive Indicators¶
1 2 3 4 5 | ti = self.ti.address()
filters = self.ti.filters()
filters.add_filter('active', '=', 'false')
for address in ti.many(filters=filters):
print(address)
|
Retrieve Active and Inactive Indicators¶
1 2 3 4 5 6 7 | ti = self.ti.address()
filters = self.ti.filters()
filters.add_filter('active', '=', 'true')
filters.add_filter('active', '=', 'false')
parameters = {'orParams': 'true'}
for address in ti.many(filters=filters, params=parameters):
print(address)
|
Retrieve Indicators with Date Added filters¶
1 2 3 4 5 6 | filters = self.tcex.ti.filters()
filters.add_filter('dateAdded', '>', '2020-01-01')
filters.add_filter('dateAdded', '<', '2020-01-02')
indicators = self.tcex.ti.indicator(owner='example')
for indicator in indicators.many(filters=filters):
print(indicator)
|
Retrieve Indicators Owners¶
1 2 3 4 5 6 7 8 | # Create a example indicator
indicator_kwargs = {'ip': '1.1.1.1', 'rating': randint(0, 5), 'confidence': randint(0, 100)}
indicator = self.ti.indicator('address', self.owner, **indicator_kwargs)
indicator.create()
# Get the owners that indicator is in.
owner = indicator.owners()
# indicator.owners() return the response, time to get the owner data.
owner_data = owner.get('data', {}).get('owner', [])
|
Snippets¶
Feature¶
Batch¶
Initialize Batch Create¶
1 | batch = self.tcex.batch(owner="MyOwner")
|
Initialize Batch Delete¶
1 | batch = self.tcex.batch(owner="MyOwner", action="Delete")
|
Add Group Interface 1¶
1 | ti = batch.campaign(name="camp-1", first_seen="12-12-08", xid="my-unique-external-id")
|
Add Group Interface 2¶
1 | ti = batch.group(group_type="Campaign", name="camp-2", date_added="12-12-2008", first_seen="12-12-08", xid="my-unique-external-id")
|
Add Attribute¶
1 | ti.attribute(attr_type="Description", attr_value="Example Description", displayed=True)
|
Add Tag¶
1 | ti.tag(name="Crimeware")
|
Add Security Label¶
1 | ti.security_label(name="My Custom Label", description="My Label Description", color="ffffff")
|
Add Association¶
1 | ti.association("my-unique-adversary-external-id")
|
Save¶
1 2 | # temporarily save group to disk to preserve memory
batch.save(ti)
|
Add Indicator Interface 1¶
1 | ti = batch.file(md5="43c3609411c83f363e051d455ade78a6", rating="5.0", confidence="100")
|
Add Indicator Interface 2¶
1 2 3 4 5 | ti = batch.indicator(indicator_type="File", summary="43c3609411c83f363e051d455ade78a6")
ti.confidence = "50"
ti.rating = "3.2"
ti.occurrence(file_name="drop1.exe", path="C:\\\\test\\\\", date="2017-02-02")
ti.occurrence(file_name="drop2.exe", path="C:\\\\test2\\\\", date="2017-01-01")
|
Submit¶
1 | batch_status = batch.submit_all()
|
Case Management¶
Create Case¶
1 2 3 4 5 6 7 | case_data = {
"name": "Case Name",
"severity": "Low",
"status": "Open",
}
case = self.cm.case(**case_data)
case.submit()
|
Update Case¶
1 2 3 | case = self.cm.case(id=2)
case.name = "Updated Name"
case.submit()
|
Delete Case¶
1 2 | case = self.cm.case(id=2)
case.delete()
|
Get Case¶
1 2 | case = self.cm.case(id=2)
case.get()
|
Get Sub Artifacts¶
1 2 3 4 | case = self.cm.case(id=2)
case.get(all_available_fields=True)
for artifact in case.artifacts:
self.tcex.log.debug(f"artifact: {artifact}")
|
Create Case with Artifacts¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | case_data = {
"name": "Case Name",
"severity": "Low",
"status": "Open",
}
case = self.cm.case(**case_data)
# artifact data
artifact_data = [
{"summary": "asn4455", "intel_type": "indicator-ASN", "type": "ASN"},
{"summary": "asn5544", "intel_type": "indicator-ASN", "type": "ASN"},
]
# add artifacts
for artifact in artifact_data:
case.add_artifact(**artifact)
case.submit()
|
Get Cases with TQL¶
1 2 3 4 5 | cases = self.tcex.cm.cases()
cases.filter.id(TQL.Operator.EQ, case.id)
for case in cases:
self.tcex.log.debug(f"case: {case}")
|
Get Cases with Linked TQL¶
1 2 3 4 5 | cases = self.tcex.cm.cases()
cases.filter.has_artifact.id(TQL.Operator.EQ, artifact.id)
for case in cases:
self.tcex.log.debug(f"case: {case}")
|
Create Artifact¶
1 2 3 4 5 6 7 8 9 | artifact_data = {
"case_id": 1,
"summary": f"asn2342",
"type": "ASN",
}
# create artifact
artifact = self.tcex.cm.artifact(**artifact_data)
artifact.submit()
|
Create Note¶
1 2 3 4 5 6 7 8 | note_data = {
"case_id": 1,
"text": f"sample note for test case."
}
# create note
note = self.tcex.cm.note(**note_data)
note.submit()
|
Add Tag¶
1 2 3 | case = self.tcex.cm.case(id=1)
case.add_tag("sample tag")
case.submit()
|
Create Task¶
1 2 3 4 5 6 7 8 9 10 | task_data = {
"case_id": 1,
"description": f"a description for new task",
"name": f"new task",
"xid": "unique-task-xid"
}
# create task
self.tcex.cm.task(**task_data)
task.submit()
|
Create Workflow Event¶
1 2 3 4 5 6 7 8 | workflow_event_data = {
"case_id": 1,
"summary": "workflow event summary"
}
# create workflow_event
self.tcex.cm.workflow_event(**workflow_event_data)
workflow_event.submit()
|
DataStore¶
Initialize Local¶
1 2 | # when using a value of "local" the scope of the datastore is limited to this App in the current Playbook
ds = self.tcex.datastore("local", "myDnsData")
|
Initialize Organization¶
1 | ds = self.tcex.datastore("organization", "myDnsData")
|
Get¶
1 | response = ds.get(rid="one")
|
Add¶
1 | response = ds.add(rid="one", data={"one": 1})
|
Add (dynamic id)¶
1 | response = ds.add(rid=None, data={"one": 1})
|
Put¶
1 | response = ds.put(rid="one", data={"one": 1})
|
Delete¶
1 | response = ds.delete(rid="one")
|
Search¶
1 2 | search = {"query": {"match_all": {}}}
response = ds.get(rid="_search", data=search)
|
Exit¶
Set exit message¶
1 | self.exit_message = f"Created {indicator_count} indicators."
|
Set exit code¶
1 2 | # set the exit code and allow App to continue to process
self.tcex.playbook.exit_code = 1
|
Exit with error¶
1 2 | # exit the App immediately with the provided exit message
self.tcex.playbook.exit(code=1, msg="Failed to add indicators to Owner.")
|
General¶
Action Method¶
1 2 3 4 5 6 7 | @IterateOnArg(arg="input_arg")
@OnException(exit_msg="Failed to run "do something" operation.")
@OnSuccess(exit_msg="Successfully ran "do something" operation.")
@Output(attribute="return_outputs")
def do_action(self, input_arg):
"""Perform an action on interator_input and append return value to self.return_outputs."""
return input_arg
|
Get variable type¶
1 | var_type = self.tcex.playbook.variable_type(variable=self.args.input)
|
Logging¶
Debug¶
1 | self.tcex.log.debug("debug logging")
|
Info¶
1 | self.tcex.log.info("info logging")
|
Warning¶
1 | self.tcex.log.warning("warning logging")
|
Error¶
1 | self.tcex.log.error("error logging")
|
Metrics¶
Add Metrics¶
1 2 | metrics = self.tcex.metric(name="My Metric", description="Indicator Count", data_type="Sum", interval="Daily", keyed=False)
metrics.add(value=42, date="2008-12-12T12:12:12")
|
Add Keyed Metrics¶
1 2 | metrics = self.tcex.metric(name="My Metric By Owner", description="Indicator Count by Owner", data_type="Sum", interval="Daily", keyed=True)
metrics.add_keyed(value=42, key="MyOrg", date="2008-12-12T12:12:12", return_value=True)
|
Notifications¶
Send to Recipients¶
1 2 3 | notification = self.tcex.notification()
notification.recipients(notification_type="My notification", recipients="[email protected]", priority="High")
status = notification.send(message="High alert send to recipients.")
|
Sent to Organization¶
1 2 3 | notification = self.tcex.notification()
notification.org(notification_type="My notification", priority="High")
status = notification.send(message="High alert send to organization.")
|
Threat Intelligence¶
Get Group by Id¶
1 2 3 4 5 | parameters = {
"includes": ["additional", "attributes", "labels", "tags"]
}
ti = self.tcex.ti.group(group_type="Adversary", owner="MyOrg", unique_id=416)
response = ti.single(params=parameters)
|
Get Groups¶
1 2 3 4 5 6 | parameters = {
"includes": ["additional", "attributes", "labels", "tags"]
}
groups = self.tcex.ti.group(group_type="Adversary", owner="MyOrg")
for group in groups.many(params=parameters):
self.tcex.log.debug(f"group: {group}")
|
Get Tags¶
1 2 | for tag in ti.tags():
self.tcex.log.debug(f"tag: {tag}")
|
Get Attributes¶
1 2 | for attribute in ti.attributes():
self.tcex.log.debug(f"attribute: {attribute}")
|
Get Associations¶
1 2 | for association in ti.indicator_associations():
self.tcex.log.debug(f"association: {association}")
|
Create Group¶
1 2 | ti = self.tcex.ti.group(group_type="Campaign", name="camp-3", owner="MyOrg", first_seen="2019-04-02")
response = ti.create()
|
Add Tag¶
1 | response = ti.add_tag(name="Crimeware")
|
Add Attribute¶
1 | response = ti.add_attribute(attribute_type="Description", attribute_value="Example Description.")
|
Add Security Label¶
1 | response = ti.add_label(label="TLP:GREEN")
|
Add Association¶
1 2 | group_assoc = self.tcex.ti.group(group_type="Campaign", unique_id=417)
response = ti.add_association(target=group_assoc)
|
Update Group¶
1 2 | ti = self.tcex.ti.group(group_type="Campaign", first_seen="2019-04-03", unique_id=417)
response = ti.update()
|
Delete Group¶
1 2 | ti = self.tcex.ti.group(group_type="Campaign", unique_id=419)
response = ti.delete()
|
Get Indicatory by Value¶
1 2 3 4 5 | parameters = {
"includes": ["additional", "attributes", "labels", "tags"]
}
ti = self.tcex.ti.indicator(indicator_type="Address", owner="MyOrg", unique_id="127.0.0.1")
response = ti.single(params=parameters)
|
Get Indicators¶
1 2 3 4 5 6 | parameters = {
"includes": ["additional", "attributes", "labels", "tags"]
}
indicators = self.tcex.ti.indicator(indicator_type="Address", owner="MyOrg")
for indicator in indicators.many(params=parameters):
self.tcex.log.debug(f"indicator: {indicator}")
|
Create Indicator¶
1 2 | ti = self.tcex.ti.indicator(indicator_type="Address", owner="MyOrg", ip="12.13.14.15")
response = ti.create()
|