
Данный Python скрипт создает события в PagerDuty используя APIv2.
За основу был взят следующий скрипт.
Для начала нужно создать "Routing Key", он же "Integration Key", не путать с "API Access Key", который можно использовать для любых API вызовов, нам же нужен только ключ от определенного сервиса.
Переходим в настройки сервиса, в моем случае он называется "AWS", и переходим к вкладке "Integrations"

Добавляем новую интеграцию — "Add a new integration"

Задаем имя и выбираем "Integration Type" -> "Use our API directly" -> "Events API v2" и получаем "Integration Key"

main.py:
import requests
import json
from typing import Any, Dict, List, Optional
routing_key = 'abc123'
api_url = 'https://events.pagerduty.com/v2/enqueue'
def create_event(
summary: str,
severity: str,
source: str = 'aws',
action: str = 'trigger',
dedup_key: Optional[str] = None,
custom_details: Optional[Any] = None,
group: Optional[str] = None,
component: Optional[str] = None,
class_type: Optional[str] = None,
images: Optional[List[Any]] = None,
links: Optional[List[Any]] = None
) -> Dict:
payload = {
"summary": summary,
"severity": severity,
"source": source,
}
if custom_details is not None:
payload["custom_details"] = custom_details
if component:
payload["component"] = component
if group:
payload["group"] = group
if class_type:
payload["class"] = class_type
actions = ('trigger', 'acknowledge', 'resolve')
if action not in actions:
raise ValueError("Event action must be one of: %s" % ', '.join(actions))
data = {
"event_action": action,
"routing_key": routing_key,
"payload": payload,
}
if dedup_key:
data["dedup_key"] = dedup_key
elif action != 'trigger':
raise ValueError(
"The dedup_key property is required for event_action=%s events, and it must \
be a string."
% action
)
if images is not None:
data["images"] = images
if links is not None:
data["links"] = links
response = requests.post(api_url, json = data)
print(response.content)
def main():
create_event(summary = "Instance is down", severity = 'critical', custom_details = "Instance ID: i-12345")
if __name__ == '__main__':
main()
Замените значение переменной "routing_key" на значение "Integration Key"
В данном примере будет создано событие с уровнем "critical", заголовком "Instance is down" и в деталях будет указано "Instance ID: i-12345"
Так же скрипт можно использовать для создания событий на основе AWS CloudWatch Alarms, не используя AWS интеграцию в PagerDuty. Для этого создайте SNS топик, Lambda функцию и укажите SNS топик как источник для Lambda функции. После чего можно создавать CloudWatch Alarm и в качестве действия при срабатывании указываем SNS топик.
main.py (Lambda + SNS):
import requests
import json
from typing import Any, Dict, List, Optional
routing_key = 'abc123'
api_url = 'https://events.pagerduty.com/v2/enqueue'
def get_message_info():
subject = event["Records"][0]["Sns"]["Subject"]
message = event["Records"][0]["Sns"]["Message"]
return subject, message
def create_event(
summary: str,
severity: str,
source: str = 'cloudwatch',
action: str = 'trigger',
dedup_key: Optional[str] = None,
custom_details: Optional[Any] = None,
group: Optional[str] = None,
component: Optional[str] = None,
class_type: Optional[str] = None,
images: Optional[List[Any]] = None,
links: Optional[List[Any]] = None
) -> Dict:
payload = {
"summary": summary,
"severity": severity,
"source": source,
}
if custom_details is not None:
payload["custom_details"] = custom_details
if component:
payload["component"] = component
if group:
payload["group"] = group
if class_type:
payload["class"] = class_type
actions = ('trigger', 'acknowledge', 'resolve')
if action not in actions:
raise ValueError("Event action must be one of: %s" % ', '.join(actions))
data = {
"event_action": action,
"routing_key": routing_key,
"payload": payload,
}
if dedup_key:
data["dedup_key"] = dedup_key
elif action != 'trigger':
raise ValueError(
"The dedup_key property is required for event_action=%s events, and it must \
be a string."
% action
)
if images is not None:
data["images"] = images
if links is not None:
data["links"] = links
response = requests.post(api_url, json = data)
print(response.content)
def main(event, context):
info = get_message_info()
create_event(summary = info[0], severity='critical', custom_details = info[1])
if __name__ == '__main__':
main()
Python пакет "requests" придется упаковывать в zip архив с Lambda функцией, так как в Lambda его нет. Для этого можно воспользоваться "virtualenv".