90DaysOfDevOps/2023/day2-ops-code/async/requestor/main.go
Alistair Hey 335acd5b82
Add day 85 - Queues Workers and Tasks
Signed-off-by: Alistair Hey <alistair@heyal.co.uk>
2023-03-27 08:48:39 +01:00

109 lines
2.3 KiB
Go

package main
import (
"database/sql"
"errors"
"fmt"
_ "github.com/go-sql-driver/mysql"
nats "github.com/nats-io/nats.go"
"time"
)
func storeString(input string) error {
// Connect to the database
db, err := sql.Open("mysql", "root:password@tcp(mysql:3306)/mysql")
defer db.Close()
// Insert the random number into the database
_, err = db.Exec("INSERT INTO requestor_async(random_string) VALUES(?)", input)
if err != nil {
return err
}
fmt.Printf("Random string %s has been inserted into the database\n", input)
return nil
}
func getStringFromDB(input string) error {
// see if the string exists in the db, if so return nil
// if not, return an error
db, err := sql.Open("mysql", "root:password@tcp(mysql:3306)/mysql")
defer db.Close()
result, err := db.Query("SELECT * FROM requestor_async WHERE random_string = ?", input)
if err != nil {
return err
}
for result.Next() {
var randomString string
err = result.Scan(&randomString)
if err != nil {
return err
}
if randomString == input {
return nil
}
}
return errors.New("string not found")
}
func main() {
err := createRequestordb()
if err != nil {
panic(err.Error())
}
// setup a goroutine loop calling the generator every minute, saving the result in the DB
nc, _ := nats.Connect("nats://my-nats:4222")
defer nc.Close()
ticker := time.NewTicker(60 * time.Second)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
nc.Publish("generator", []byte(""))
case <-quit:
ticker.Stop()
return
}
}
}()
nc.Subscribe("generator_reply", func(msg *nats.Msg) {
err := storeString(string(msg.Data))
if err != nil {
print(err)
}
})
nc.Subscribe("confirmation", func(msg *nats.Msg) {
err := getStringFromDB(string(msg.Data))
if err != nil {
print(err)
}
nc.Publish("confirmation_reply", []byte(string(msg.Data)))
})
// create a goroutine here to listen for messages on the queue to check, see if we have them
for {
time.Sleep(10 * time.Second)
}
}
func createRequestordb() error {
db, err := sql.Open("mysql", "root:password@tcp(mysql:3306)/mysql")
if err != nil {
return err
}
defer db.Close()
// try to create a table for us
_, err = db.Exec("CREATE TABLE IF NOT EXISTS requestor_async(random_string VARCHAR(100))")
return err
}