amazon_kinesis_utils package¶
Submodules¶
baikonur_logging module¶
Utilities specific to Baikonur Kinesis/Lambda logging modules.
-
amazon_kinesis_utils.baikonur_logging.
append_to_log_dict
(dictionary: dict, log_type: str, log_data: object, log_timestamp=None, log_id=None)¶
-
amazon_kinesis_utils.baikonur_logging.
parse_payload_to_log_dict
(payload, log_dict, failed_dict, log_id_key, log_timestamp_key, log_type_key, log_type_unknown_prefix, log_type_whitelist=None, timestamp_required=False)¶
-
amazon_kinesis_utils.baikonur_logging.
save_json_logs_to_s3
(client, log_dict: dict, reason: str = 'not specified', gzip_compress: bool = True, key_prefix: str = '')¶
kinesis module¶
Utilities to work with Kinesis Aggregated records, JSON events coming from CloudWatch Logs with subscription filters, gzipped JSON data and more.
-
exception
amazon_kinesis_utils.kinesis.
KinesisException
¶ Bases:
Exception
A custom exception returned on put_records_batch failures. Intentionally not catching this exception in Lambda Functions (source mapped to a Kinesis Data Stream) will make Lambda rerun until all record are successfully sent.
-
amazon_kinesis_utils.kinesis.
create_record
(data: str) → dict¶ Create a single Kinesis Record for use with PutRecords API
Parameters: data – A string to convert to record Returns: Kinesis Record for PutRecords API
-
amazon_kinesis_utils.kinesis.
create_records
(data: List[str]) → List[dict]¶ Create Kinesis Records from multiple str data for use with PutRecords API
Parameters: data – List of strings to convert to records Returns: List of Kinesis Records for PutRecords API
-
amazon_kinesis_utils.kinesis.
extract_data_from_json_cwl_message
(message: dict) → List[str]¶ Extract log events from CloudWatch Logs subscription filters JSON messages (parsed to dict). For details, see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html
Parameters: message – Dictionary representing CloudWatch Logs subscription filters JSON messages Returns: List of raw log event messages
-
amazon_kinesis_utils.kinesis.
normalize_cloudwatch_messages
(payload: str) → List[str]¶ Normalize messages from CloudWatch Logs subscription filters and pass through other data
Parameters: payload – A string containing JSON data (decoded payload inside Kinesis records) Returns: List of normalized raw data (CloudWatch Logs subscription filters may send multiple log events in one payload)
-
amazon_kinesis_utils.kinesis.
parse_records
(raw_records: list) → Generator[str, None, None]¶ Generator that de-aggregates, decodes, gzip decompresses Kinesis Records
Parameters: raw_records – Raw Kinesis records (usually event[‘Records’] in Lambda handler function) Returns:
-
amazon_kinesis_utils.kinesis.
put_records_batch
(client, stream_name: str, records: list, max_retries: int, max_batch_size: int = 500) → List[dict]¶ Put multiple records to Kinesis Data Streams using PutRecords API in batches.
Parameters: - client – Kinesis API client (e.g. boto3.client(‘kinesis’) )
- stream_name – Kinesis Data Streams stream name
- records – list of records to send. Records will be dumped with json.dumps
- max_retries – Maximum retries for resending failed records
- max_batch_size – Maximum number of records sent in a single PutRecords API call.
Returns: Records failed to put in Kinesis Data Stream after all retries. Each PutRecords API call can receive up to 500 records: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records
misc module¶
Various utilities useful when working with Kinesis Data Streams.
-
amazon_kinesis_utils.misc.
dict_get_default
(dictionary: dict, key: str, default: any, verbose: bool = False) → Any¶ Get key from dictionary if key is in dictionary, default value otherwise
Parameters: - dictionary – dictionary to retrieve key from
- key – key name in dictionary
- default – value to return if key is not in dictionary
- verbose – output detailed warning message when returning default value
Returns: value for key if key is in dictionary, default value otherwise
-
amazon_kinesis_utils.misc.
split_list
(lst: list, n: int) → List[list]¶ Split a list of object in chunks of size n
Parameters: - lst – List to split in chunks
- n – Size of chunk (last chunk may be less than n)
s3 module¶
Utilities to save string data to S3 easily.
-
amazon_kinesis_utils.s3.
put_str_data
(client, bucket: str, key: str, data: str, gzip_compress: bool = False)¶ Put str data to S3 bucket with optional gzip compression
Parameters: - client – S3 API client (e.g. boto3.client(‘s3’) )
- bucket – S3 bucket name
- key – S3 object key
- data – Data to save
- gzip_compress – Boolean switch to control gzip compression (default = False)