SQS Events
Jets supports SQS Events as a Lambda trigger. So you can send a message to an SQS queue and it triggers a Lambda function to run. The Lambda function has access to the message data via event
.
There are a few ways to connect an SQS queue to a Lambda function with Jets.
- Existing SQS Queue
- Generated Function SQS Queue
- Generated Shared SQS Queue
We’ll cover each of them:
Existing SQS Queue
Here is an example connecting an existing SQS queue to a Lambda function in a Event
Generate code.
jets generate:event waiter --trigger sqs --method order
It looks something like this.
app/events/waiter_event.rb
class WaiterEvent < ApplicationEvent
class_timeout 15.minutes # Lambda Function timeout must be less than or equal to the SQS Visibility timeout
sqs_event "hello-queue"
def order
puts "event #{JSON.dump(event)}"
puts "sqs_events #{JSON.dump(sqs_events)}"
end
end
Ultimately, the sqs_event
declaration generates a Lambda::EventSourceMapping. The properties of the mapping can be set with an additional Hash argument:
sqs_event("hello-queue", batch_size: 10)
Generated Function SQS Queue
Jets can create and manage an SQS queue for a specific function. This is done with a special :generate_queue
argument.
class CoolEvent < ApplicationEvent
class_timeout 30 # must be less than or equal to the SQS queue default timeout
sqs_event :generate_queue
def lift
puts "lift event #{JSON.dump(event)}"
end
end
A special :QueueProperties
key will set the SQS::Queue properties. Other keys set the Lambda::EventSourceMapping properties. Example:
sqs_event(:generate_queue,
BatchSize: 10, # property of EventSourceMapping
QueueProperties: {
MessageRetentionPeriod: 345600, # 4 days in seconds
})
Here’s an example screenshot of a generated SQS queue:
Note, SQS Queues managed by Jets are deleted when you delete the Jets application.
Generated Shared SQS Queue
Jets can also support creating a shared SQS Queue via a Shared Resource. Here’s how you create the SQS queue as a shared resource:
shared/resources/list.rb
class List < Jets::Stack
sqs_queue(:waitlist)
end
You can reference the Shared Queue like so:
app/events/cool_event.rb
class CoolEvent < ApplicationEvent
class_timeout 30 # must be less than or equal to the SQS queue default timeout
depends_on :list # so we can reference list shared resources
sqs_event ref(:waitlist) # reference sqs queue in shared resource
def fix
puts "fix #{JSON.dump(event)}"
end
end
Underneath the hood, Jets provisions resources via CloudFormation. The use of depends_on
ensures that Jets will pass the shared resource List
stack outputs to the CoolEvent
stack as input parameters. This allows CoolEvent
to reference resources from the separate child List
stack.
For those learning CloudFormation, these resources might help:
- AWS CloudFormation Declarative Infrastructure Code Tutorial
- A Simple Introduction to AWS CloudFormation Part 1: EC2 Instance
- Working with Nested Stacks
Accessing the SQS Url
You can access the SQS url with the lookup
method. The method is available to Jets::Stack
subclasses like the List
class here. Here’s an example:
app/events/postman_event.rb
class PostmanEvent < ApplicationEvent
include Jets::AwsServices
iam_policy "sqs"
def deliver
puts "queue arn: #{List.lookup(:waitlist)}"
puts "queue url: #{List.lookup(:waitlist_url)}"
queue_url = List.lookup(:waitlist_url)
message_body = JSON.dump({"test": "hello world"})
sqs.send_message(
queue_url: queue_url,
message_body: message_body,
)
end
end
Send Test Message
Here’s an example of sending a message to an SQS queue via the aws sqs send-message CLI:
aws sqs send-message --queue-url https://sqs.us-west-2.amazonaws.com/112233445566/hello-queue --message-body '{"test": "hello world"}'
You can send a message via the SQS Console, sdk, etc also.
Tailing Logs
It helps to tail the logs and watch the event as it comes through.
jets logs -f -n waiter_event-order
Event Payloads
Here’s an example of the event payload.
{
"Records": [
{
"messageId": "1e0bfe01-f9df-46c0-8d86-2fd898e4dee9",
"receiptHandle": "AQEBgxVw0hjHeNKB1brir4hr0Fxvz4ERJIqd7bP/iHw82/+UUx/r4W0KG3FSiEA4A+Vk0oS8dT6W8be/Bn7eJjKspZfW2KzC0xzsCmS+BihySk1SX9FM5SW1rFd3bFWYtT6s7pOX2inaU/THtn7Envp5Rs+zehmNIspnLPZkf9h3RFSQk12xaVaOmCQnHtz9o8uKIXwMEwn5IhlJgC0DIuM1v8NZK8Hc65b4xpf09vf01LEA/XdXm24SjfJ0fl7ev2rBXtkMitAfNmKd8x0fcbG3O7H7wB+CIKR4+QvGcI6u9QuAdPU5MpIJ46niJmrtnIx70S5Go1paUYMa77ABBjFWoJkJHvHouuiohEQHdMrH1QSyabNBS2Nw2dikhBcXVtLQW4iH+xNXwLIVUxarAk9EHokh1iGWZsG91whmPaAl0t2Vdfo6Dcm0/6IgXhKcLFIw",
"body": "{\"test\": \"hello world\"}",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1550605918693",
"SenderId": "AIDAJTCD6O457Q7BMTLYM",
"ApproximateFirstReceiveTimestamp": "1550605918704"
},
"messageAttributes": {},
"md5OfBody": "3d635e69eb93fd184b47a31d460ca2b6",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-west-2:112233445566:demo-dev-List-3VJ13ADFT5VZ-Waitlist-X35N8JKWZTL3",
"awsRegion": "us-west-2"
}
]
}
sqs_events
The sqs_event
helper method unravels the data and provides the SQS message body.
[{
"test": "hello world"
}]
IAM Policy
An IAM policy is generated for the Lambda function associated with the SQS event that allows the permissions needed. You can control and override the IAM policy with normal IAM Policies if needed though.
Troubleshooting
If you get an error message about Invalid request provided: Queue visibility timeout
like this:
05:50:35PM CREATE_FAILED AWS::Lambda::EventSourceMapping HandleEventSourceMapping Resource handler returned message: "Invalid request provided: Queue visibility timeout: 30 seconds is less than Function timeout: 60 seconds (Service: Lambda, Status Code: 400, Request ID: 2a7e4692-70fe-45eb-8fd3-06f20692af1e)" (RequestToken: 414a433a-21a3-da18-676e-fd00001fded2, HandlerErrorCode: InvalidRequest)
This means the SQS Queue Default visibility timeout is less than the Lambda Function timeout. You have to decrease the Lambda Function timeout or increase the SQS Queue timeout.