Skip to content

Commit 16d82f3

Browse files
authored
Generate ripoff queries asynchronously to help plugin performance (#27)
1 parent cc95417 commit 16d82f3

10 files changed

Lines changed: 277 additions & 111 deletions

File tree

README.md

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -135,42 +135,35 @@ In the future, additional flags may be added to allow you to include tables, add
135135

136136
## Plugins
137137

138-
If you would like to implement your own `valueFuncs`, you can do so by writing a ripoff plugin.
139-
140-
Plugins are local unauthenticated TCP servers that consume and emit newline-separated JSON messages from ripoff.
138+
If you would like to implement your own `valueFuncs`, you can do so by writing a ripoff plugin, which is a local TCP server that sends/recieves JSON.
141139

142140
### Writing a plugin
143141

144-
Plugins must listen to a local TCP port and provide a TCP stream (loop of receiving and sending messages) to clients.
142+
Plugins must meet the following requirements:
145143

146-
On startup, plugins must output the string `READY` in its first line of output to indicate to ripoff that it is ready to receive TCP messges.
144+
- Listen to a local TCP port
145+
- Consume newline-separated JSON messages, which come in as a stream
146+
- Output newline-separated JSON responses
147+
- Ouput `READY` in the first line of standard output when the plugin is ready for TCP connections
147148

148-
Each incoming message will be a single line of JSON in the following types:
149+
Each incoming message will be a single line of JSON of the following shapes:
149150

150-
#### Return a value
151+
#### valueFunc
151152

152153
Your plugin must process an arbitrary `valueFunc` and return a string value. You can decide how to handle functions you do not expect/provide, by either returning an empty value or disconnecting the client.
153154

155+
The `id` field is used to support unordered stream messages, so you can return responses at any time and in any order as long as they have the same `id` as the relevant request.
156+
154157
Message from ripoff:
155158

156159
```json
157-
{"type": "valueFunc", "valueFunc": "someFuncName", "args": ["some", "argument", "list"]}
160+
{"id": "some-id", "type": "valueFunc", "valueFunc": "someFuncName", "args": ["some", "argument", "list"]}
158161
```
159162

160163
Response from your TCP server:
161164

162165
```json
163-
{"value": "someString"}
164-
```
165-
166-
#### Exit your process
167-
168-
Ripoff will send a kill signal to your process, but if you'd like to clean up before that an exit message will be sent beforehand.
169-
170-
Request message:
171-
172-
```json
173-
{"type": "exit"}
166+
{"id": "the-same-id-from-the-request", "value": "someString"}
174167
```
175168

176169
#### Example
@@ -179,7 +172,9 @@ An example plugin can be found at `cmd/helloplugin/helloplugin.go`. although TCP
179172

180173
### Using a plugin
181174

182-
Plugins are defined in your ripoff files, which instruct ripoff to spawn a process to start your TCP server, then later connect to it with a single TCP stream. Here's an example from ripoff's tests:
175+
Plugins are defined in your ripoff files, which instruct ripoff to spawn a process to start your TCP server.
176+
177+
Here's an example from ripoff's tests:
183178

184179
```yml
185180
# A list of plugins to register with ripoff.

cmd/helloplugin/helloplugin.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"log"
88
"net"
9-
"os"
109
)
1110

1211
func main() {
@@ -33,12 +32,14 @@ func main() {
3332
}
3433

3534
type Request struct {
35+
Id string `json:"id"`
3636
Type string `json:"type"`
3737
ValueFunc string `json:"valueFunc"`
3838
Args []string `json:"args"`
3939
}
4040

4141
type Response struct {
42+
Id string `json:"id"`
4243
Value string `json:"value"`
4344
}
4445

@@ -59,10 +60,6 @@ func handleConnection(conn net.Conn) {
5960
log.Println("Error parsing body:", err)
6061
return
6162
}
62-
if r.Type == "exit" {
63-
os.Exit(0)
64-
return
65-
}
6663
if len(r.Args) == 0 {
6764
log.Println("No args provided")
6865
return
@@ -78,6 +75,7 @@ func handleConnection(conn net.Conn) {
7875
return
7976
}
8077
resp, err := json.Marshal(Response{
78+
Id: r.Id,
8179
Value: value,
8280
})
8381
if err != nil {

cmd/ripoff/ripoff.go

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package main
22

33
import (
4+
"bufio"
45
"context"
56
"flag"
67
"fmt"
78
"log/slog"
89
"os"
910
"path"
11+
"slices"
12+
"strings"
1013

1114
"github.com/jackc/pgx/v5"
1215

@@ -17,9 +20,54 @@ func errAttr(err error) slog.Attr {
1720
return slog.Any("error", err)
1821
}
1922

23+
func confirmPluginsSafe(plugins map[string]ripoff.RipoffPlugin) {
24+
baseDir, err := os.UserHomeDir()
25+
if err != nil {
26+
baseDir = os.TempDir()
27+
}
28+
consentFilePath := path.Join(baseDir, ".ripoff-consent")
29+
consentFile, err := os.ReadFile(consentFilePath)
30+
if err != nil && !os.IsNotExist(err) {
31+
slog.Error("Could not read from consent file", errAttr(err), slog.String("filepath", consentFilePath))
32+
}
33+
consentFileLines := strings.Split(string(consentFile), "\n")
34+
scanner := bufio.NewScanner(os.Stdin)
35+
newConsentLines := []string{}
36+
for _, plugin := range plugins {
37+
cmdJoined := strings.Join(append([]string{plugin.Address, " -> "}, plugin.Command...), " ")
38+
if !slices.Contains(consentFileLines, cmdJoined) {
39+
newConsentLines = append(newConsentLines, cmdJoined)
40+
}
41+
}
42+
if len(newConsentLines) > 0 {
43+
fmt.Printf("You have not run these ripoff plugins before, please confirm that the following commands are safe to run on your machine: \n")
44+
fmt.Println()
45+
for _, consentLine := range newConsentLines {
46+
fmt.Printf(" %s\n", consentLine)
47+
}
48+
fmt.Println()
49+
fmt.Println("Run the above? (Y/N)")
50+
scanner.Scan()
51+
input := scanner.Text()
52+
if input == "y" || input == "Y" {
53+
consentFileLines = append(consentFileLines, newConsentLines...)
54+
err = os.WriteFile(consentFilePath, []byte(strings.Join(consentFileLines, "\n")), 0644)
55+
if err != nil {
56+
slog.Error("Could not append to the consent file", errAttr(err), slog.String("filepath", consentFilePath))
57+
}
58+
fmt.Println("Proceeding...")
59+
} else {
60+
fmt.Println("ABORT")
61+
os.Exit(1)
62+
}
63+
}
64+
}
65+
2066
func main() {
2167
verbosePtr := flag.Bool("v", false, "enable verbose output")
2268
softPtr := flag.Bool("s", false, "do not commit generated queries")
69+
maxConcurrencyPtr := flag.Int("c", ripoff.DEFAULT_MAX_CONCURRENCY, "maximum number of rows to generate queries for at one time. defaults at 1000")
70+
unsafePluginPtr := flag.Bool("u", false, "execute new plugin commands without prompting. only for use in CI or trusted environments")
2371
flag.Parse()
2472

2573
if *verbosePtr {
@@ -77,7 +125,11 @@ func main() {
77125
os.Exit(1)
78126
}
79127

80-
err = ripoff.RunRipoff(ctx, tx, totalRipoff)
128+
if !*unsafePluginPtr && len(totalRipoff.Plugins) > 0 {
129+
confirmPluginsSafe(totalRipoff.Plugins)
130+
}
131+
132+
err = ripoff.RunRipoff(ctx, tx, totalRipoff, *maxConcurrencyPtr)
81133
if err != nil {
82134
slog.Error("Could not run ripoff", errAttr(err))
83135
os.Exit(1)

db.go

Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"regexp"
1111
"slices"
1212
"strings"
13+
"sync"
1314
"time"
1415

1516
"github.com/brianvoe/gofakeit/v7"
@@ -19,9 +20,11 @@ import (
1920
"github.com/tj/go-naturaldate"
2021
)
2122

23+
const DEFAULT_MAX_CONCURRENCY = 1000
24+
2225
// Runs ripoff from start to finish, without committing the transaction.
23-
func RunRipoff(ctx context.Context, tx pgx.Tx, totalRipoff RipoffFile) error {
24-
manager, err := NewPluginManager(totalRipoff.Plugins)
26+
func RunRipoff(ctx context.Context, tx pgx.Tx, totalRipoff RipoffFile, maxConcurrency int) error {
27+
manager, err := NewPluginManager(ctx, totalRipoff.Plugins)
2528
if err != nil {
2629
return err
2730
}
@@ -32,7 +35,7 @@ func RunRipoff(ctx context.Context, tx pgx.Tx, totalRipoff RipoffFile) error {
3235
return err
3336
}
3437

35-
queries, err := buildQueriesForRipoff(manager, primaryKeys, totalRipoff)
38+
queries, err := buildQueriesForRipoff(maxConcurrency, manager, primaryKeys, totalRipoff)
3639
if err != nil {
3740
return err
3841
}
@@ -163,10 +166,11 @@ func prepareValue(manager *PluginManager, rawValue string) (string, error) {
163166
return fakerResult, nil
164167
}
165168

166-
func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, rowId string, row Row, dependencyGraph map[string][]string) (string, error) {
169+
func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, rowId string, row Row) (string, []string, error) {
170+
dependencyResult := []string{}
167171
parts := strings.Split(rowId, ":")
168172
if len(parts) < 2 {
169-
return "", fmt.Errorf("invalid id: %s", rowId)
173+
return "", dependencyResult, fmt.Errorf("invalid id: %s", rowId)
170174
}
171175
table := parts[0]
172176
primaryKeysForTable, hasPrimaryKeysForTable := primaryKeys[table]
@@ -210,10 +214,10 @@ func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, row
210214
case []string:
211215
dependencies = v
212216
default:
213-
return "", fmt.Errorf("cannot parse ~dependencies value in row %s", rowId)
217+
return "", dependencyResult, fmt.Errorf("cannot parse ~dependencies value in row %s", rowId)
214218
}
215-
dependencyGraph[rowId] = append(dependencyGraph[rowId], dependencies...)
216-
dependencyGraph[rowId] = slices.Compact(dependencyGraph[rowId])
219+
dependencyResult = append(dependencyResult, dependencies...)
220+
dependencyResult = slices.Compact(dependencyResult)
217221
continue
218222
}
219223

@@ -230,14 +234,14 @@ func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, row
230234
addEdge := referenceRegex.MatchString(value)
231235
// Don't add edges to and from the same row.
232236
if addEdge && rowId != value {
233-
dependencyGraph[rowId] = append(dependencyGraph[rowId], value)
234-
dependencyGraph[rowId] = slices.Compact(dependencyGraph[rowId])
237+
dependencyResult = append(dependencyResult, value)
238+
dependencyResult = slices.Compact(dependencyResult)
235239
}
236240

237241
columns = append(columns, pq.QuoteIdentifier(column))
238242
valuePrepared, err := prepareValue(manager, value)
239243
if err != nil {
240-
return "", err
244+
return "", dependencyResult, err
241245
}
242246
// Assume this column is the primary key.
243247
if rowId == value && onConflictColumn == "" {
@@ -249,7 +253,7 @@ func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, row
249253
}
250254

251255
if onConflictColumn == "" {
252-
return "", fmt.Errorf("cannot determine column to conflict with for: %s, saw %s", rowId, row)
256+
return "", dependencyResult, fmt.Errorf("cannot determine column to conflict with for: %s, saw %s", rowId, row)
253257
}
254258

255259
// Extremely smart query builder.
@@ -263,11 +267,11 @@ func buildQueryForRow(manager *PluginManager, primaryKeys PrimaryKeysResult, row
263267
strings.Join(values, ","),
264268
onConflictColumn,
265269
strings.Join(setStatements, ","),
266-
), nil
270+
), dependencyResult, nil
267271
}
268272

269273
// Returns a sorted array of queries to run based on a given ripoff file.
270-
func buildQueriesForRipoff(manager *PluginManager, primaryKeys PrimaryKeysResult, totalRipoff RipoffFile) ([]string, error) {
274+
func buildQueriesForRipoff(maxConcurrency int, manager *PluginManager, primaryKeys PrimaryKeysResult, totalRipoff RipoffFile) ([]string, error) {
271275
dependencyGraph := map[string][]string{}
272276
queries := map[string]string{}
273277

@@ -277,12 +281,37 @@ func buildQueriesForRipoff(manager *PluginManager, primaryKeys PrimaryKeysResult
277281
}
278282

279283
// Build queries.
284+
var wg sync.WaitGroup
285+
semaphore := make(chan struct{}, maxConcurrency)
286+
type rowChanItem struct {
287+
rowId string
288+
query string
289+
dependencies []string
290+
err error
291+
}
292+
rowChan := make(chan rowChanItem, len(totalRipoff.Rows))
280293
for rowId, row := range totalRipoff.Rows {
281-
query, err := buildQueryForRow(manager, primaryKeys, rowId, row, dependencyGraph)
282-
if err != nil {
283-
return []string{}, err
294+
semaphore <- struct{}{}
295+
wg.Add(1)
296+
go func(rowId string, row Row) {
297+
defer wg.Done()
298+
defer func() { <-semaphore }()
299+
query, dependencies, err := buildQueryForRow(manager, primaryKeys, rowId, row)
300+
rowChan <- rowChanItem{rowId, query, dependencies, err}
301+
}(rowId, row)
302+
}
303+
304+
go func() {
305+
wg.Wait()
306+
close(rowChan)
307+
}()
308+
309+
for rowItem := range rowChan {
310+
if rowItem.err != nil {
311+
return []string{}, rowItem.err
284312
}
285-
queries[rowId] = query
313+
dependencyGraph[rowItem.rowId] = rowItem.dependencies
314+
queries[rowItem.rowId] = rowItem.query
286315
}
287316

288317
// Sort and reverse the graph, so queries are in order of least (hopefully none) to most dependencies.

db_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ func runTestData(t *testing.T, ctx context.Context, tx pgx.Tx, testDir string) {
2323
require.NoError(t, err)
2424
totalRipoff, err := RipoffFromDirectory(testDir, enums)
2525
require.NoError(t, err)
26-
err = RunRipoff(ctx, tx, totalRipoff)
26+
err = RunRipoff(ctx, tx, totalRipoff, DEFAULT_MAX_CONCURRENCY)
2727
require.NoError(t, err)
2828
// Run again to implicitly test upsert behavior.
29-
err = RunRipoff(ctx, tx, totalRipoff)
29+
err = RunRipoff(ctx, tx, totalRipoff, DEFAULT_MAX_CONCURRENCY)
3030
require.NoError(t, err)
3131
// Try to verify that the number of generated rows matches the ripoff.
3232
tableCount := map[string]int{}

export_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func runExportTestData(t *testing.T, ctx context.Context, tx pgx.Tx, testDir str
4848
_, err = tx.Exec(ctx, string(truncateFile))
4949
require.NoError(t, err)
5050
// Run generated ripoff.
51-
err = RunRipoff(ctx, tx, ripoffFile)
51+
err = RunRipoff(ctx, tx, ripoffFile, DEFAULT_MAX_CONCURRENCY)
5252
require.NoError(t, err)
5353
// Try to verify that the number of generated rows matches the ripoff.
5454
tableCount := map[string]int{}

0 commit comments

Comments
 (0)