amazon_kinesis_utils package

Submodules

baikonur_logging module

Utilities specific to Baikonur Kinesis/Lambda logging modules.

amazon_kinesis_utils.baikonur_logging.append_to_log_dict(dictionary: dict, log_type: str, log_data: object, log_timestamp=None, log_id=None)
amazon_kinesis_utils.baikonur_logging.parse_payload_to_log_dict(payload, log_dict, failed_dict, log_id_key, log_timestamp_key, log_type_key, log_type_unknown_prefix, log_type_whitelist=None, timestamp_required=False)
amazon_kinesis_utils.baikonur_logging.save_json_logs_to_s3(client, log_dict: dict, reason: str = 'not specified', gzip_compress: bool = True, key_prefix: str = '')

kinesis module

Utilities to work with Kinesis Aggregated records, JSON events coming from CloudWatch Logs with subscription filters, gzipped JSON data and more.

exception amazon_kinesis_utils.kinesis.KinesisException

Bases: Exception

A custom exception returned on put_records_batch failures. Intentionally not catching this exception in Lambda Functions (source mapped to a Kinesis Data Stream) will make Lambda rerun until all record are successfully sent.

amazon_kinesis_utils.kinesis.create_record(data: str) → dict

Create a single Kinesis Record for use with PutRecords API

Parameters:data – A string to convert to record
Returns:Kinesis Record for PutRecords API
amazon_kinesis_utils.kinesis.create_records(data: List[str]) → List[dict]

Create Kinesis Records from multiple str data for use with PutRecords API

Parameters:data – List of strings to convert to records
Returns:List of Kinesis Records for PutRecords API
amazon_kinesis_utils.kinesis.extract_data_from_json_cwl_message(message: dict) → List[str]

Extract log events from CloudWatch Logs subscription filters JSON messages (parsed to dict). For details, see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html

Parameters:message – Dictionary representing CloudWatch Logs subscription filters JSON messages
Returns:List of raw log event messages
amazon_kinesis_utils.kinesis.normalize_cloudwatch_messages(payload: str) → List[str]

Normalize messages from CloudWatch Logs subscription filters and pass through other data

Parameters:payload – A string containing JSON data (decoded payload inside Kinesis records)
Returns:List of normalized raw data (CloudWatch Logs subscription filters may send multiple log events in one payload)
amazon_kinesis_utils.kinesis.parse_records(raw_records: list) → Generator[str, None, None]

Generator that de-aggregates, decodes, gzip decompresses Kinesis Records

Parameters:raw_records – Raw Kinesis records (usually event[‘Records’] in Lambda handler function)
Returns:
amazon_kinesis_utils.kinesis.put_records_batch(client, stream_name: str, records: list, max_retries: int, max_batch_size: int = 500) → List[dict]

Put multiple records to Kinesis Data Streams using PutRecords API in batches.

Parameters:
  • client – Kinesis API client (e.g. boto3.client(‘kinesis’) )
  • stream_name – Kinesis Data Streams stream name
  • records – list of records to send. Records will be dumped with json.dumps
  • max_retries – Maximum retries for resending failed records
  • max_batch_size – Maximum number of records sent in a single PutRecords API call.
Returns:

Records failed to put in Kinesis Data Stream after all retries. Each PutRecords API call can receive up to 500 records: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records

misc module

Various utilities useful when working with Kinesis Data Streams.

amazon_kinesis_utils.misc.dict_get_default(dictionary: dict, key: str, default: any, verbose: bool = False) → Any

Get key from dictionary if key is in dictionary, default value otherwise

Parameters:
  • dictionary – dictionary to retrieve key from
  • key – key name in dictionary
  • default – value to return if key is not in dictionary
  • verbose – output detailed warning message when returning default value
Returns:

value for key if key is in dictionary, default value otherwise

amazon_kinesis_utils.misc.split_list(lst: list, n: int) → List[list]

Split a list of object in chunks of size n

Parameters:
  • lst – List to split in chunks
  • n – Size of chunk (last chunk may be less than n)

s3 module

Utilities to save string data to S3 easily.

amazon_kinesis_utils.s3.put_str_data(client, bucket: str, key: str, data: str, gzip_compress: bool = False)

Put str data to S3 bucket with optional gzip compression

Parameters:
  • client – S3 API client (e.g. boto3.client(‘s3’) )
  • bucket – S3 bucket name
  • key – S3 object key
  • data – Data to save
  • gzip_compress – Boolean switch to control gzip compression (default = False)

Module contents