Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions internal/lock/lock.mbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
///|
/// A mutual-exclusion lock.
///
/// `Lock` wraps a binary semaphore: at most one task can hold it at a time.
/// Other tasks calling `acquire` block until the holder calls `release`, and
/// waiters are served first-come-first-serve. Prefer `with_lock`, which pairs
/// the acquire/release for you and releases even when the action fails.
struct Lock(@async/semaphore.Semaphore)

///|
/// Create a new, unlocked `Lock`.
pub fn Lock::new() -> Lock {
Lock(Semaphore(1))
}

///|
/// Acquire the lock, blocking until it is available.
///
/// As a blocking point, `acquire` may be cancelled, in which case the lock is
/// not held and the cancellation is raised. Every successful `acquire` must be
/// matched by exactly one `release`.
pub async fn Lock::acquire(self : Lock) -> Unit {
self.0.acquire()
}

///|
/// Try to acquire the lock without blocking.
///
/// Returns `true` and takes the lock if it was free, otherwise returns `false`
/// immediately. A successful acquisition must be matched by one `release`.
pub fn Lock::try_acquire(self : Lock) -> Bool {
self.0.try_acquire()
}

///|
/// Release the lock, waking the next waiter if any.
pub fn Lock::release(self : Lock) -> Unit {
self.0.release()
}

///|
/// Run `action` while holding the lock, returning its result.
///
/// The lock is released whether `action` returns normally or fails. If
/// `acquire` is cancelled before the lock is taken, `action` does not run.
pub async fn[X] Lock::with_lock(self : Lock, action : async () -> X) -> X {
self.acquire()
defer self.release()
action()
}
44 changes: 44 additions & 0 deletions internal/lock/lock_test.mbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
///|
/// A held lock keeps concurrent critical sections from overlapping, even when
/// the holder suspends.
async test "with_lock serializes concurrent critical sections" {
let lock = @lock.Lock::new()
let mut in_critical = false
let mut overlaps = 0
let order = []
@async.with_task_group(group => {
for id in 0..<5 {
group.spawn_bg(() => {
lock.with_lock(() => {
if in_critical {
overlaps += 1
}
in_critical = true
// Suspend while holding the lock; a missing lock would let another task
// enter here and trip the `overlaps` counter.
@async.sleep(1)
order.push(id)
in_critical = false
})
})
}
})
assert_eq(overlaps, 0)
assert_eq(order.length(), 5)
}

///|
/// `with_lock` releases the lock even when the action fails.
async test "with_lock releases on failure" {
let lock = @lock.Lock::new()
let failed = try {
lock.with_lock(() => fail("boom"))
false
} catch {
_ => true
}
assert_eq(failed, true)
// The lock must be free again, so a follow-up acquire succeeds immediately.
assert_eq(lock.try_acquire(), true)
lock.release()
}
7 changes: 7 additions & 0 deletions internal/lock/moon.pkg
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {
"moonbitlang/async/semaphore" @async/semaphore,
}

import {
"moonbitlang/async",
} for "test"
19 changes: 19 additions & 0 deletions internal/lock/pkg.generated.mbti
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Generated using `moon info`, DON'T EDIT IT
package "moonbit-community/tty/internal/lock"

// Values

// Errors

// Types and methods
type Lock
pub async fn Lock::acquire(Self) -> Unit
pub fn Lock::new() -> Self
pub fn Lock::release(Self) -> Unit
pub fn Lock::try_acquire(Self) -> Bool
pub async fn[X] Lock::with_lock(Self, async () -> X) -> X

// Type aliases

// Traits

2 changes: 1 addition & 1 deletion moon.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "moonbit-community/tty"
version = "0.2.5"

import {
"moonbitlang/async@0.19.1",
"moonbitlang/async@0.19.4",
}

readme = "README.md"
Expand Down
1 change: 1 addition & 0 deletions moon.pkg
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
"moonbit-community/tty/color",
"moonbit-community/tty/input" @public_input,
"moonbit-community/tty/internal/io" @internal_io,
"moonbit-community/tty/internal/lock",
"moonbit-community/tty/internal/input",
"moonbit-community/tty/internal/win32",
"moonbit-community/tty/internal/vt",
Expand Down
22 changes: 20 additions & 2 deletions tty.mbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ struct Tty {
reader : @input.EventReader
input_backend : WindowsInput
pending_input : Array[@public_input.InputEvent]
// Serializes output writes so a single `write`/`write_string` call lands on
// the terminal atomically with respect to other concurrent writers.
write_lock : @lock.Lock
Comment thread
tonyfettes marked this conversation as resolved.
}

///|
Expand All @@ -35,6 +38,9 @@ struct Tty {
output : &Writer
reader : @input.EventReader
pending_input : Array[@public_input.InputEvent]
// Serializes output writes so a single `write`/`write_string` call lands on
// the terminal atomically with respect to other concurrent writers.
write_lock : @lock.Lock
}

///|
Expand All @@ -50,6 +56,7 @@ pub fn[I : Reader, O : Writer] Tty::new(input : I, output : O) -> Tty {
reader: input_backend.event_reader(input as &@async/io.Reader),
input_backend,
pending_input: [],
write_lock: @lock.Lock::new(),
}
}

Expand All @@ -64,6 +71,7 @@ pub fn[I : Reader, O : Writer] Tty::new(input : I, output : O) -> Tty {
output,
reader: @input.EventReader::new(input as &@async/io.Reader),
pending_input: [],
write_lock: @lock.Lock::new(),
}
}

Expand Down Expand Up @@ -132,14 +140,24 @@ async fn Tty::read_internal_event(

///|
/// Write bytes to the terminal output handle.
///
/// The write is serialized through an internal lock, so a single call lands on
/// the terminal atomically even when multiple tasks write concurrently. To make
/// a multi-call sequence atomic, assemble it into one buffer and issue a single
/// `write`.
pub async fn Tty::write(self : Self, data : &@async/io.Data) -> Unit {
self.output.write(data)
self.write_lock.with_lock(() => self.output.write(data))
}

///|
/// Write a string to the terminal output handle.
///
/// The write is serialized through an internal lock, so a single call lands on
/// the terminal atomically even when multiple tasks write concurrently. To make
/// a multi-call sequence atomic, assemble it into one string and issue a single
/// `write_string`.
pub async fn Tty::write_string(self : Self, string : String) -> Unit {
self.output.write(string)
self.write_lock.with_lock(() => self.output.write(string))
}

///|
Expand Down
Loading