Go Pipe
last modified April 22, 2025
This tutorial demonstrates how to utilize pipes in Golang for seamless interprocess communication.
Pipe
A pipe is a powerful mechanism for redirecting data between processes, enabling efficient interprocess communication through a unidirectional channel. Pipes are commonly used in scenarios where data needs to flow seamlessly between producers and consumers or where processes collaborate to achieve a common task.
The io.Pipe
function in Go creates a synchronous, in-memory pipe
that provides a direct connection between an io.Reader
and an
io.Writer
. This function is particularly useful when you need to
stream data between components in your application, such as reading from a
source while simultaneously writing to a sink.
Since io.Pipe
operates synchronously, writes to the pipe block
until corresponding reads occur, ensuring consistent data transfer without
buffering. This makes io.Pipe
an excellent choice for applications
that require real-time data processing or tightly coupled communication between
components.
$ go version go version go1.22.2 linux/amd64
This tutorial uses Go version 1.22.2 for all examples.
Go pipe simple example
This example illustrates the basic usage of the io.Pipe
function
for data transfer.
package main import ( "fmt" "io" "log" "os" ) func main() { r, w := io.Pipe() go func() { fmt.Fprint(w, "Hello there\n") w.Close() }() _, err := io.Copy(os.Stdout, r) if err != nil { log.Fatal(err) } }
This program creates a pipe using io.Pipe
, writes data to the
pipe's writer within a goroutine, and copies the data from the pipe's reader to
standard output using io.Copy
.
go func() { fmt.Fprint(w, "Hello there\n") w.Close() }()
In a goroutine, data is written to the PipeWriter
. Writes block
until the data is fully consumed by one or more reads from the
PipeReader
.
$ go run simple.go Hello there
Go cmd StdoutPipe
The StdoutPipe
method of a command provides a pipe connected to the
command's standard output once the command is initiated.
package main import ( "bufio" "fmt" "log" "os" "os/exec" ) func main() { cmd := exec.Command("ping", "webcode.me") stdout, err := cmd.StdoutPipe() if err != nil { log.Fatal(err) } cmd.Start() buf := bufio.NewReader(stdout) num := 0 for { line, _, _ := buf.ReadLine() if num > 3 { os.Exit(0) } num += 1 fmt.Println(string(line)) } }
This example executes a ping
command and reads the first four lines
of its output, displaying them on the console.
cmd := exec.Command("ping", "webcode.me")
A command is created to run ping
, testing the availability of the
webcode.me
website.
stdout, err := cmd.StdoutPipe()
The StdoutPipe
method retrieves a pipe for the command's standard
output stream.
buf := bufio.NewReader(stdout)
A buffered reader is created to read data from the command's standard output efficiently.
for { line, _, _ := buf.ReadLine() if num > 3 { os.Exit(0) } num += 1 fmt.Println(string(line)) }
A loop reads four lines from the output, printing each to the console before terminating the program.
$ go run pingcmd.go PING webcode.me (46.101.248.126) 56(84) bytes of data. 64 bytes from 46.101.248.126 (46.101.248.126): icmp_seq=1 ttl=54 time=29.7 ms 64 bytes from 46.101.248.126 (46.101.248.126): icmp_seq=2 ttl=54 time=35.9 ms 64 bytes from 46.101.248.126 (46.101.248.126): icmp_seq=3 ttl=54 time=37.4 ms
Go pipe POST JSON data
This example demonstrates posting JSON data to
https://httpbin.org/post
using a pipe.
package main import ( "encoding/json" "fmt" "io" "io/ioutil" "log" "net/http" ) type PayLoad struct { Content string } func main() { r, w := io.Pipe() go func() { defer w.Close() err := json.NewEncoder(w).Encode(&PayLoad{Content: "Hello there!"}) if err != nil { log.Fatal(err) } }() resp, err := http.Post("https://httpbin.org/post", "application/json", r) if err != nil { log.Fatal(err) } body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal(err) } fmt.Println(string(body)) }
This program sends a JSON payload to a web server via a pipe and prints the response body, illustrating network communication with pipes.
go func() { defer w.Close() err := json.NewEncoder(w).Encode(&PayLoad{Content: "Hello there!"}) if err != nil { log.Fatal(err) } }()
A JSON payload is encoded and written to the PipeWriter
in a
goroutine, ensuring the writer is closed properly after encoding.
resp, err := http.Post("https://httpbin.org/post", "application/json", r)
The http.Post
function sends the JSON data using the
PipeReader
as the request body, specifying the content type as
JSON.
body, err := ioutil.ReadAll(resp.Body) if err != nil { log.Fatal(err) } fmt.Println(string(body))
The response body is read and converted to a string, then printed to the console to display the server's response.
$ go run post_json.go { "args": {}, "data": "{\"Content\":\"Hello there!\"}\n", "files": {}, "form": {}, "headers": { ...
Go read standard input through pipe
This example creates a Go program that reads data from standard input via a pipe, processes it, and displays the results.
package main import ( "bufio" "fmt" "io" "log" "os" ) func main() { nBytes, nChunks := int64(0), int64(0) r := bufio.NewReader(os.Stdin) buf := make([]byte, 0, 4*1024) for { n, err := r.Read(buf[:cap(buf)]) buf = buf[:n] if n == 0 { if err == nil { continue } if err == io.EOF { break } log.Fatal(err) } nChunks++ nBytes += int64(len(buf)) fmt.Println(string(buf)) if err != nil && err != io.EOF { log.Fatal(err) } } fmt.Println("Bytes:", nBytes, "Chunks:", nChunks) }
This program reads data from standard input, prints it, and tracks the number of bytes and chunks processed, demonstrating pipe-based input handling.
r := bufio.NewReader(os.Stdin)
A buffered reader is created to efficiently read data from standard input, typically piped from another command.
buf := make([]byte, 0, 4*1024)
A 4KB buffer is allocated to store data read from standard input, optimizing memory usage for input processing.
n, err := r.Read(buf[:cap(buf)]) buf = buf[:n]
Data is read into the buffer, and the buffer is sliced to the number of bytes read, ensuring accurate data handling.
nChunks++ nBytes += int64(len(buf))
The program increments the chunk counter and accumulates the total bytes read, tracking input statistics.
fmt.Println(string(buf))
The buffer's contents are converted to a string and printed to the console, displaying the piped input data.
$ date | go run read_stdin.go Sun 15 Nov 2020 01:08:13 PM CET Bytes: 32 Chunks: 1
The date
command's output is piped to the program, which reads,
displays, and reports the byte and chunk counts.
Go Stat
The Stat
function returns a FileInfo structure describing a file,
useful for detecting piped input on standard input.
package main import ( "bufio" "fmt" "log" "os" ) func main() { stat, _ := os.Stdin.Stat() if (stat.Mode() & os.ModeCharDevice) == 0 { var buf []byte scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { buf = append(buf, scanner.Bytes()...) } if err := scanner.Err(); err != nil { log.Fatal(err) } fmt.Printf("Hello %s!\n", buf) } else { fmt.Print("Enter your name: ") var name string fmt.Scanf("%s", &name) fmt.Printf("Hello %s!\n", name) } }
This program accepts input either through a pipe or via a user prompt, greeting the user based on the input source.
stat, _ := os.Stdin.Stat()
The Stat
function retrieves metadata about standard input,
indicating whether it is piped or terminal-based.
if (stat.Mode() & os.ModeCharDevice) == 0 {
This condition checks if standard input is piped, as opposed to coming from a terminal or character device.
var buf []byte scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { buf = append(buf, scanner.Bytes()...) }
A scanner reads piped input line by line, appending each line to a byte slice for further processing.
} else { fmt.Print("Enter your name: ") var name string fmt.Scanf("%s", &name) fmt.Printf("Hello %s!\n", name) }
If no piped input is detected, the program prompts the user to enter a name and prints a greeting.
$ echo "Peter" | go run hello.go Hello Peter! $ go run hello.go Enter your name: Peter Hello Peter!
The program handles both piped input (via echo
) and interactive
prompt input, demonstrating flexible input processing.
Go pipe in HTTP handler
This example uses a pipe within an HTTP handler to stream command output to a web client.
package main import ( "fmt" "io" "net/http" "os/exec" ) func handler(w http.ResponseWriter, r *http.Request) { cmd := exec.Command("date") pr, pw := io.Pipe() defer pw.Close() cmd.Stdout = pw cmd.Stderr = pw go io.Copy(w, pr) cmd.Run() } func main() { http.HandleFunc("/", handler) fmt.Println("server started on port 8080") http.ListenAndServe(":8080", nil) }
This program runs a web server that executes the date
command and
streams its output to clients via a pipe.
cmd := exec.Command("date")
A command is defined to execute the date
system command, which
outputs the current date and time.
pr, pw := io.Pipe() defer pw.Close()
A pipe is created with io.Pipe
, and the writer is deferred to
ensure proper closure after use.
cmd.Stdout = pw cmd.Stderr = pw
The command's standard output and error streams are directed to the
PipeWriter
for streaming.
go io.Copy(w, pr)
In a goroutine, the PipeReader
streams data to the
http.ResponseWriter
, sending it to the client.
cmd.Run()
The Run
method executes the command, generating output that flows
through the pipe to the client.
$ go run handler.go server started on port 8080
The server starts, listening for HTTP requests on port 8080.
$ curl localhost:8080 Sun 15 Nov 2020 02:18:07 PM CET
A curl
request retrieves the date command's output, streamed via
the pipe from the server.
Go pipe with concurrent writers
This example demonstrates using multiple concurrent writers to a single pipe, showcasing synchronized data aggregation from multiple goroutines.
package main import ( "fmt" "io" "log" "os" "sync" ) func main() { r, w := io.Pipe() var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() fmt.Fprintf(w, "Message from writer %d\n", id) }(i) } go func() { wg.Wait() w.Close() }() _, err := io.Copy(os.Stdout, r) if err != nil { log.Fatal(err) } }
This program creates a pipe and spawns three goroutines, each writing a message
to the PipeWriter
. A WaitGroup
ensures the writer is
closed only after all goroutines complete, and the data is read to stdout.
$ go run concurrent_writers.go Message from writer 0 Message from writer 1 Message from writer 2
Go pipe for streaming file processing
This example uses a pipe to stream data from a file to a processing function, demonstrating efficient handling of large files without loading them fully into memory.
package main import ( "bufio" "io" "log" "os" "strings" ) func main() { r, w := io.Pipe() go func() { defer w.Close() file, err := os.Open("input.txt") if err != nil { log.Fatal(err) } defer file.Close() io.Copy(w, file) }() scanner := bufio.NewScanner(r) for scanner.Scan() { line := strings.ToUpper(scanner.Text()) os.Stdout.WriteString(line + "\n") } if err := scanner.Err(); err != nil { log.Fatal(err) } }
This program reads a file through a pipe, streaming its contents to a scanner that converts each line to uppercase and outputs it. The pipe enables processing without loading the entire file into memory.
$ echo -e "hello\nworld" > input.txt $ go run file_stream.go HELLO WORLD
Go pipe with error handling
This example shows how to handle errors in a pipe-based workflow, ensuring robust communication between a writer and reader with proper error propagation.
package main import ( "errors" "fmt" "io" "log" "os" ) func main() { r, w := io.Pipe() go func() { defer w.Close() _, err := fmt.Fprint(w, "Valid data\n") if err != nil { w.CloseWithError(err) return } w.CloseWithError(errors.New("simulated writer error")) }() buf := make([]byte, 1024) n, err := r.Read(buf) if err != nil && err != io.EOF { log.Fatal(err) } fmt.Print(string(buf[:n])) _, err = r.Read(buf) if err != nil { fmt.Println("Reader caught error:", err) } }
This program simulates a writer error using CloseWithError
. The
reader processes initial valid data and then detects the error, demonstrating
robust error handling in pipe communication.
$ go run error_handling.go Valid data Reader caught error: simulated writer error
Go pipe with chained processing
This example illustrates a pipeline where data flows through multiple processing stages via pipes, showcasing a modular approach to data transformation.
package main import ( "bufio" "io" "log" "os" "strings" ) func main() { r1, w1 := io.Pipe() r2, w2 := io.Pipe() go func() { defer w1.Close() w1.Write([]byte("hello there!\n")) }() go func() { defer w2.Close() scanner := bufio.NewScanner(r1) for scanner.Scan() { w2.Write([]byte(strings.ToUpper(scanner.Text()) + "\n")) } if err := scanner.Err(); err != nil { log.Fatal(err) } }() io.Copy(os.Stdout, r2) }
This program creates a two-stage pipeline: the first stage writes data to a pipe, and the second stage reads it, converts it to uppercase, and writes to another pipe. The final output is streamed to stdout.
$ go run chained_pipeline.go HELLO THERE!
Go pipe with real-time log aggregation
This example demonstrates aggregating logs from multiple sources in real-time using a pipe, simulating a centralized logging system with concurrent inputs.
package main import ( "fmt" "io" "log" "os" "sync" "time" ) func main() { r, w := io.Pipe() var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 3; j++ { fmt.Fprintf(w, "[Source %d] Log entry %d at %v\n", id, j, time.Now()) time.Sleep(100 * time.Millisecond) } }(i) } go func() { wg.Wait() w.Close() }() scanner := bufio.NewScanner(r) for scanner.Scan() { fmt.Println(scanner.Text()) } if err := scanner.Err(); err != nil { log.Fatal(err) } }
This program simulates three log sources, each writing entries to a pipe in concurrent goroutines. The reader scans the pipe and prints logs in real-time, demonstrating centralized log aggregation.
Go pipe with compressed data streaming
This example shows how to use a pipe to stream and compress data in real-time, illustrating efficient handling of large datasets with compression.
package main import ( "compress/gzip" "fmt" "io" "log" "os" ) func main() { r, w := io.Pipe() go func() { defer w.Close() gw := gzip.NewWriter(w) defer gw.Close() for i := 0; i < 5; i++ { fmt.Fprintf(gw, "Data chunk %d\n", i) } }() file, err := os.Create("output.gz") if err != nil { log.Fatal(err) } defer file.Close() _, err = io.Copy(file, r) if err != nil { log.Fatal(err) } }
This program streams data through a pipe, compressing it with gzip in a goroutine. The compressed data is written to a file, showcasing real-time compression for efficient storage or transmission.
$ go run compress_stream.go $ gunzip -c output.gz Data chunk 0 Data chunk 1 Data chunk 2 Data chunk 3 Data chunk 4
Source
This article explored the use of pipes in Go for interprocess communication and data streaming.
Author
List all Go tutorials.