Add day 85 - Queues Workers and Tasks

Signed-off-by: Alistair Hey <alistair@heyal.co.uk>
This commit is contained in:
Alistair Hey 2023-03-24 15:28:36 +00:00
parent 08dd4c7164
commit 335acd5b82
No known key found for this signature in database
13 changed files with 475 additions and 2 deletions

View File

@ -156,8 +156,8 @@ Or contact us via Twitter, my handle is [@MichaelCade1](https://twitter.com/Mich
### Engineering for Day 2 Ops
- [] 👷🏻‍♀️ 84 > [Writing an API - What is an API?](2023/day84.md)
- [] 👷🏻‍♀️ 85 > [](2023/day85.md)
- [] 👷🏻‍♀️ 84 > [Writing an API - What is an API?](2023/day84.md)
- [] 👷🏻‍♀️ 85 > [Queues, Queue workers and Tasks (Asynchronous architecture)](2023/day85.md)
- [] 👷🏻‍♀️ 86 > [](2023/day86.md)
- [] 👷🏻‍♀️ 87 > [](2023/day87.md)
- [] 👷🏻‍♀️ 88 > [](2023/day88.md)

View File

@ -0,0 +1,17 @@
# Set the base image to use
FROM golang:1.17-alpine
# Set the working directory inside the container
WORKDIR /app
# Copy the source code into the container
COPY . .
# Build the Go application
RUN go build -o main .
# Expose the port that the application will run on
EXPOSE 8080
# Define the command that will run when the container starts
CMD ["/app/main"]

View File

@ -0,0 +1,17 @@
module main
go 1.20
require (
github.com/go-sql-driver/mysql v1.7.0
github.com/nats-io/nats.go v1.24.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/nats-io/nats-server/v2 v2.9.15 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.6.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

View File

@ -0,0 +1,32 @@
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/nats-server/v2 v2.9.15 h1:MuwEJheIwpvFgqvbs20W8Ish2azcygjf4Z0liVu2I4c=
github.com/nats-io/nats-server/v2 v2.9.15/go.mod h1:QlCTy115fqpx4KSOPFIxSV7DdI6OxtZsGOL1JLdeRlE=
github.com/nats-io/nats.go v1.24.0 h1:CRiD8L5GOQu/DcfkmgBcTTIQORMwizF+rPk6T0RaHVQ=
github.com/nats-io/nats.go v1.24.0/go.mod h1:dVQF+BK3SzUZpwyzHedXsvH3EO38aVKuOPkkHlv5hXA=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=

View File

@ -0,0 +1,100 @@
package main
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
nats "github.com/nats-io/nats.go"
"math/rand"
"time"
)
func generateAndStoreString() (string, error) {
// Connect to the database
db, err := sql.Open("mysql", "root:password@tcp(mysql:3306)/mysql")
if err != nil {
return "", err
}
defer db.Close()
// Generate a random string
// Define a string of characters to use
characters := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
// Generate a random string of length 10
randomString := make([]byte, 64)
for i := range randomString {
randomString[i] = characters[rand.Intn(len(characters))]
}
// Insert the random number into the database
_, err = db.Exec("INSERT INTO generator_async(random_string) VALUES(?)", string(randomString))
if err != nil {
return "", err
}
fmt.Printf("Random string %s has been inserted into the database\n", string(randomString))
return string(randomString), nil
}
func main() {
err := createGeneratordb()
if err != nil {
panic(err.Error())
}
nc, _ := nats.Connect("nats://my-nats:4222")
defer nc.Close()
nc.Subscribe("generator", func(msg *nats.Msg) {
s, err := generateAndStoreString()
if err != nil {
print(err)
}
nc.Publish("generator_reply", []byte(s))
nc.Publish("confirmation", []byte(s))
})
nc.Subscribe("confirmation_reply", func(msg *nats.Msg) {
stringReceived(string(msg.Data))
})
// Subscribe to the queue
// when a message comes in call generateAndStoreString() then put the string on the
// reply queue. also add a message onto the confirmation queue
// subscribe to the confirmation reply queue
// when a message comes in call
for {
time.Sleep(1 * time.Second)
}
}
func createGeneratordb() error {
db, err := sql.Open("mysql", "root:password@tcp(mysql:3306)/mysql")
if err != nil {
return err
}
defer db.Close()
// try to create a table for us
_, err = db.Exec("CREATE TABLE IF NOT EXISTS generator_async(random_string VARCHAR(100), seen BOOLEAN, requested BOOLEAN)")
return err
}
func stringReceived(input string) {
// Connect to the database
db, err := sql.Open("mysql", "root:password@tcp(mysql:3306)/mysql")
if err != nil {
print(err)
}
defer db.Close()
_, err = db.Exec("UPDATE generator_async SET requested = true WHERE random_string = ?", input)
if err != nil {
print(err)
}
}

View File

@ -0,0 +1,69 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: requestor
spec:
replicas: 1
selector:
matchLabels:
app: requestor
template:
metadata:
labels:
app: requestor
spec:
containers:
- name: requestor
image: heyal/requestor:async
imagePullPolicy: Always
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
name: requestor-service
spec:
selector:
app: requestor
ports:
- name: http
protocol: TCP
port: 8080
targetPort: 8080
type: ClusterIP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: generator
spec:
replicas: 1
selector:
matchLabels:
app: generator
template:
metadata:
labels:
app: generator
spec:
containers:
- name: generator
image: heyal/generator:async
imagePullPolicy: Always
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
name: generator-service
spec:
selector:
app: generator
ports:
- name: http
protocol: TCP
port: 8080
targetPort: 8080
type: ClusterIP

View File

@ -0,0 +1,17 @@
# Set the base image to use
FROM golang:1.17-alpine
# Set the working directory inside the container
WORKDIR /app
# Copy the source code into the container
COPY . .
# Build the Go application
RUN go build -o main .
# Expose the port that the application will run on
EXPOSE 8080
# Define the command that will run when the container starts
CMD ["/app/main"]

View File

@ -0,0 +1,17 @@
module main
go 1.20
require (
github.com/go-sql-driver/mysql v1.7.0
github.com/nats-io/nats.go v1.24.0
)
require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/nats-io/nats-server/v2 v2.9.15 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
golang.org/x/crypto v0.6.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
)

View File

@ -0,0 +1,32 @@
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/nats-server/v2 v2.9.15 h1:MuwEJheIwpvFgqvbs20W8Ish2azcygjf4Z0liVu2I4c=
github.com/nats-io/nats-server/v2 v2.9.15/go.mod h1:QlCTy115fqpx4KSOPFIxSV7DdI6OxtZsGOL1JLdeRlE=
github.com/nats-io/nats.go v1.24.0 h1:CRiD8L5GOQu/DcfkmgBcTTIQORMwizF+rPk6T0RaHVQ=
github.com/nats-io/nats.go v1.24.0/go.mod h1:dVQF+BK3SzUZpwyzHedXsvH3EO38aVKuOPkkHlv5hXA=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=

View File

@ -0,0 +1,108 @@
package main
import (
"database/sql"
"errors"
"fmt"
_ "github.com/go-sql-driver/mysql"
nats "github.com/nats-io/nats.go"
"time"
)
func storeString(input string) error {
// Connect to the database
db, err := sql.Open("mysql", "root:password@tcp(mysql:3306)/mysql")
defer db.Close()
// Insert the random number into the database
_, err = db.Exec("INSERT INTO requestor_async(random_string) VALUES(?)", input)
if err != nil {
return err
}
fmt.Printf("Random string %s has been inserted into the database\n", input)
return nil
}
func getStringFromDB(input string) error {
// see if the string exists in the db, if so return nil
// if not, return an error
db, err := sql.Open("mysql", "root:password@tcp(mysql:3306)/mysql")
defer db.Close()
result, err := db.Query("SELECT * FROM requestor_async WHERE random_string = ?", input)
if err != nil {
return err
}
for result.Next() {
var randomString string
err = result.Scan(&randomString)
if err != nil {
return err
}
if randomString == input {
return nil
}
}
return errors.New("string not found")
}
func main() {
err := createRequestordb()
if err != nil {
panic(err.Error())
}
// setup a goroutine loop calling the generator every minute, saving the result in the DB
nc, _ := nats.Connect("nats://my-nats:4222")
defer nc.Close()
ticker := time.NewTicker(60 * time.Second)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
nc.Publish("generator", []byte(""))
case <-quit:
ticker.Stop()
return
}
}
}()
nc.Subscribe("generator_reply", func(msg *nats.Msg) {
err := storeString(string(msg.Data))
if err != nil {
print(err)
}
})
nc.Subscribe("confirmation", func(msg *nats.Msg) {
err := getStringFromDB(string(msg.Data))
if err != nil {
print(err)
}
nc.Publish("confirmation_reply", []byte(string(msg.Data)))
})
// create a goroutine here to listen for messages on the queue to check, see if we have them
for {
time.Sleep(10 * time.Second)
}
}
func createRequestordb() error {
db, err := sql.Open("mysql", "root:password@tcp(mysql:3306)/mysql")
if err != nil {
return err
}
defer db.Close()
// try to create a table for us
_, err = db.Exec("CREATE TABLE IF NOT EXISTS requestor_async(random_string VARCHAR(100))")
return err
}

View File

@ -0,0 +1,2 @@
docker build ./async/requestor/ -f async/requestor/Dockerfile -t heyal/requestor:async && docker push heyal/requestor:async
docker build ./async/generator/ -f async/generator/Dockerfile -t heyal/generator:async&& docker push heyal/generator:async

View File

@ -0,0 +1,62 @@
# Queues, Queue workers and Tasks (Asynchronous architecture)
Yesterday we looked at how we can use HTTP based APIs for communication between services. This works well until you need
to scale, release a new version or one of your services goes down. Then we start to see the calling service fail because
its dependency is not working as expected. We have tightly coupled our two services, one cant work without the other.
There are many ways to solve this problem, a light touch approach for existing applications is to use something called a
circuit breaker to buffer failures and retry until we get a successful response. This is explained well in this blog
by [Martin Fowler](https://martinfowler.com/bliki/CircuitBreaker.html). However, this is synchronous, if we were to wrap
our calls in a circuit breaker we would start to block processes and our user could see a slowdown in response times.
Additionally, we cant scale our applications using this approach, the way that the code is currently written every
instance of our `generator` api would be asking
the `requestor for confirmation of receiving the string. This wont scale well when we move to having 2, 5, 10, or 100 instances running. We would quickly see the `
requestor` being overwhelmed with requests from the 100 generator applications.
There is a way to solve these problems which is to use Queues. This is a shift in thinking to using an asynchronous
approach to solving our problem. This can work well when the responses dont need to be immediate between applications.
In this case it doesn't matter if we add some delay in the requests between the applications. As long as the data
eventually flows between them we are happy.
![Queues, producers and Consumers](./images/day84-queues.png)
(https://dashbird.io/knowledge-base/well-architected/queue-and-asynchronous-processing/)
In the drawing above we can see how we can add a Queue in between our applications and the Queue stores the intent of
the message across the bridge. If the Consumer was to fail and stop reacting messages then, providing our Queue software
has sufficient storage, the messages to the consumer would still “succeed” as far as the producer is concerned.
This is a powerful pattern that isolates components of a system from each other and limits the blast radius of failure.
It does however add complexity. The Consumer and Producer must share a common understanding of the shape of a message
otherwise the Consumer will be unable to act on the message.
We are going to implement a Queue in between our two applications in our data flows.
By the end of the section we will have the following data flows
Requestor (asks for a random string) → Queue → Generator (gets the message, generates a string and passes it back to the
Requestor on another Queue) → Requestor (gets the string back on a queue and stores it, then sends a message to the
queue saying it has received it) → Queue → Generator (marks that the message was received)
The last section here, where the Generator needs to know if the Requestor got the message is a simplification of a real
process where an application may pass back an enriched data record or some further information but it allows us to have
a simple two way communication.
Can you see how by the end of this section we will be able to stop, scale, deploy or modify each of the two components
without the other needing to know?
## Modifying our application
We are now going to modify our app to fit the model described above. Where we previously made a HTTP api call to our
partner app we are now going to place a message on a named queue and then subscribe to the corresponding response
queues.
Finally we are going to stop using HTTP endpoints to listen for requests, we are now subscribing to queues and waiting
for messages before we perform any work.
I have picked [NATSio](https://nats.io/) as the queue technology as I have worked with it previously and its very easy
to set up and use.
Head over to the 2023/day2-ops-code/README.md to see how to deploy day 2's application.
It sends messages to a queue and then waits for a response. The response is a message on a different queue. The message
contains the data that we are interested in.

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB