Skip to content

Commit

Permalink
Jit: Threads: Add Fence, Wait, and Notify
Browse files Browse the repository at this point in the history
Add a few test cases with non-zero offsets

Signed-off-by: Máté Tokodi [email protected]
  • Loading branch information
matetokodi committed Sep 10, 2024
1 parent 01f5ac2 commit 5bafd6d
Show file tree
Hide file tree
Showing 15 changed files with 362 additions and 7 deletions.
1 change: 1 addition & 0 deletions .github/workflows/actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ jobs:
- name: Run Tests
run: |
$RUNNER --engine="$GITHUB_WORKSPACE/out/extended/walrus" wasm-test-extended
$RUNNER --jit --engine="$GITHUB_WORKSPACE/out/extended/walrus" wasm-test-extended
build-test-performance:
runs-on: ubuntu-latest
Expand Down
19 changes: 18 additions & 1 deletion src/interpreter/ByteCode.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,8 @@ class FunctionType;
#define FOR_EACH_BYTECODE_ATOMIC_OTHER(F) \
F(MemoryAtomicNotify) \
F(MemoryAtomicWait32) \
F(MemoryAtomicWait64)
F(MemoryAtomicWait64) \
F(AtomicFence)
#else // Extended Features
#define FOR_EACH_BYTECODE_ATOMIC_LOAD_OP(F)
#define FOR_EACH_BYTECODE_ATOMIC_STORE_OP(F)
Expand Down Expand Up @@ -1864,6 +1865,22 @@ class MemoryAtomicNotify : public ByteCode {
ByteCodeStackOffset m_src1Offset;
ByteCodeStackOffset m_dstOffset;
};

class AtomicFence : public ByteCode {
public:
AtomicFence()
: ByteCode(Opcode::AtomicFenceOpcode)
{
}

#if !defined(NDEBUG)
void dump(size_t pos)
{
}
#endif
protected:
uint32_t m_offset;
};
#endif

#if !defined(NDEBUG)
Expand Down
7 changes: 7 additions & 0 deletions src/interpreter/Interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,13 @@ ByteCodeStackOffset* Interpreter::interpret(ExecutionState& state,
ADD_PROGRAM_COUNTER(MemoryAtomicNotify);
NEXT_INSTRUCTION();
}
DEFINE_OPCODE(AtomicFence)
:
{
// FIXME do nothing
ADD_PROGRAM_COUNTER(AtomicFence);
NEXT_INSTRUCTION();
}
#endif

// FOR_EACH_BYTECODE_SIMD_ETC_OP
Expand Down
12 changes: 12 additions & 0 deletions src/jit/Backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,18 @@ void JITCompiler::compileFunction(JITFunction* jitFunc, bool isExternal)
emitAtomic(m_compiler, item->asInstruction());
break;
}
case Instruction::AtomicFence: {
emitAtomicFence(m_compiler);
break;
}
case Instruction::AtomicWait: {
emitAtomicWait(m_compiler, item->asInstruction());
break;
}
case Instruction::AtomicNotify: {
emitAtomicNotify(m_compiler, item->asInstruction());
break;
}
#endif /* ENABLE_EXTENDED_FEATURES */
default: {
switch (item->asInstruction()->opcode()) {
Expand Down
42 changes: 41 additions & 1 deletion src/jit/ByteCodeParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ static bool isFloatGlobal(uint32_t globalIndex, Module* module)
OL5(OTAtomicRmwI32, /* SSDTT */ I32, I32, I32 | TMP, PTR, I32 | S1) \
OL5(OTAtomicRmwI64, /* SSDTT */ I32, I64, I64 | TMP, PTR, I64 | S1) \
OL6(OTAtomicRmwCmpxchgI32, /* SSSDTT */ I32, I32, I32, I32 | TMP, PTR, I32 | S1) \
OL6(OTAtomicRmwCmpxchgI64, /* SSSDTT */ I32, I64, I64, I64 | TMP, PTR, I64 | S1)
OL6(OTAtomicRmwCmpxchgI64, /* SSSDTT */ I32, I64, I64, I64 | TMP, PTR, I64 | S1) \
OL6(OTAtomicWaitI32, /* SSSDTT */ I32, I32, I64, I32 | TMP, PTR, I32 | S0) \
OL6(OTAtomicWaitI64, /* SSSDTT */ I32, I64, I64, I32 | TMP, PTR, I64 | S0) \
OL5(OTAtomicNotify, /* SSDTT */ I32, I32, I32 | TMP, PTR, I32 | S0)
#else /* !ENABLE_EXTENDED_FEATURES */
#define OPERAND_TYPE_LIST_EXTENDED
#endif /* ENABLE_EXTENDED_FEATURES */
Expand Down Expand Up @@ -1343,6 +1346,12 @@ static void compileFunction(JITCompiler* compiler)
instr->addInfo(Instruction::kIsCallback);
break;
}
#if defined(ENABLE_EXTENDED_FEATURES)
case ByteCode::AtomicFenceOpcode: {
group = Instruction::AtomicFence;
FALLTHROUGH;
}
#endif /* ENABLE_EXTENDED_FEATURES */
case ByteCode::UnreachableOpcode: {
compiler->append(byteCode, group, opcode, 0, 0);
break;
Expand Down Expand Up @@ -1958,6 +1967,37 @@ static void compileFunction(JITCompiler* compiler)
operands[3] = STACK_OFFSET(atomicRmwCmpxchg->dstOffset());
break;
}
case ByteCode::MemoryAtomicWait64Opcode: {
requiredInit = OTAtomicWaitI64;
FALLTHROUGH;
}
case ByteCode::MemoryAtomicWait32Opcode: {
Instruction* instr = compiler->append(byteCode, Instruction::AtomicWait, opcode, 3, 1);
instr->addInfo(Instruction::kIsCallback);

MemoryAtomicWait32* memoryAtomicWait = reinterpret_cast<MemoryAtomicWait32*>(byteCode);
Operand* operands = instr->operands();
instr->setRequiredRegsDescriptor(requiredInit != OTNone ? requiredInit : OTAtomicWaitI32);

operands[0] = STACK_OFFSET(memoryAtomicWait->src0Offset());
operands[1] = STACK_OFFSET(memoryAtomicWait->src1Offset());
operands[2] = STACK_OFFSET(memoryAtomicWait->src2Offset());
operands[3] = STACK_OFFSET(memoryAtomicWait->dstOffset());
break;
}
case ByteCode::MemoryAtomicNotifyOpcode: {
Instruction* instr = compiler->append(byteCode, Instruction::AtomicNotify, opcode, 2, 1);
instr->addInfo(Instruction::kIsCallback);

MemoryAtomicNotify* memoryAtomicWait = reinterpret_cast<MemoryAtomicNotify*>(byteCode);
Operand* operands = instr->operands();
instr->setRequiredRegsDescriptor(OTAtomicNotify);

operands[0] = STACK_OFFSET(memoryAtomicWait->src0Offset());
operands[1] = STACK_OFFSET(memoryAtomicWait->src1Offset());
operands[2] = STACK_OFFSET(memoryAtomicWait->dstOffset());
break;
}
#endif /* ENABLE_EXTENDED_FEATURES */
default: {
ASSERT_NOT_REACHED();
Expand Down
4 changes: 4 additions & 0 deletions src/jit/Compiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ class InstructionListItem {
#if defined(ENABLE_EXTENDED_FEATURES)
// Atomic memory operations (e.g. I32AtomicRmwAdd, I64AtomicRmw16OrU)
Atomic,
// Special types for thread synchronization operations
AtomicFence,
AtomicWait,
AtomicNotify,
#endif /* ENABLE_EXTENDED_FEATURES */
};

Expand Down
116 changes: 116 additions & 0 deletions src/jit/MemoryInl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1543,4 +1543,120 @@ static void emitAtomic(sljit_compiler* compiler, Instruction* instr)
#undef OP_XCHG
#undef OP_CMPXCHG

static sljit_s32 atomicWaitCallback(ExecutionContext* context, uint8_t* address, sljit_s32 size)
{
Instance* instance = context->instance;

if (!instance->memory(0)->isShared()) {
return ExecutionContext::ExpectedSharedMemError;
}

uint32_t result = 0;
int64_t timeout = context->tmp2[0];
int64_t expect = context->tmp1[0];

if (size == 8) {
instance->memory(0)->atomicWait(context->state, instance->module()->store(), address, expect, timeout, &result);
} else {
instance->memory(0)->atomicWait(context->state, instance->module()->store(), address, (int32_t)expect, timeout, &result);
}

context->tmp2[0] = result;
return ExecutionContext::NoError;
}

static void emitAtomicWait(sljit_compiler* compiler, Instruction* instr)
{
CompileContext* context = CompileContext::get(compiler);
sljit_s32 size = (instr->opcode() == ByteCode::MemoryAtomicWait64Opcode ? 8 : 4);

MemoryAtomicWait32* atomicWait32Operation = reinterpret_cast<MemoryAtomicWait32*>(instr->byteCode());
sljit_s32 offset = atomicWait32Operation->offset();

Operand* operands = instr->operands();
MemAddress addr(MemAddress::CheckNaturalAlignment | MemAddress::AbsoluteAddress, instr->requiredReg(0), instr->requiredReg(1), instr->requiredReg(2));
addr.check(compiler, operands, offset, size);

#if (defined SLJIT_32BIT_ARCHITECTURE && SLJIT_32BIT_ARCHITECTURE)
JITArgPair expectedPair;
#endif /* SLJIT_32BIT_ARCHITECTURE */
JITArg expected;

#if (defined SLJIT_32BIT_ARCHITECTURE && SLJIT_32BIT_ARCHITECTURE)
if (instr->opcode() == ByteCode::MemoryAtomicWait64Opcode) {
expectedPair = JITArgPair(operands + 1);
} else {
expected = JITArg(operands + 1);
}
JITArgPair timeout(operands + 2);
#else /* !SLJIT_32BIT_ARCHITECTURE */
expected = JITArg(operands + 1);
JITArg timeout(operands + 2);
#endif /* SLJIT_32BIT_ARCHITECTURE */
JITArg dst(operands + 3);

struct sljit_jump* memoryShared;

#if (defined SLJIT_32BIT_ARCHITECTURE && SLJIT_32BIT_ARCHITECTURE)
if (instr->opcode() == ByteCode::MemoryAtomicWait64Opcode) {
sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_MEM1(kContextReg), OffsetOfContextField(tmp1) + WORD_LOW_OFFSET, expectedPair.arg1, expectedPair.arg1w);
sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_MEM1(kContextReg), OffsetOfContextField(tmp1) + WORD_HIGH_OFFSET, expectedPair.arg2, expectedPair.arg2w);
} else {
sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_MEM1(kContextReg), OffsetOfContextField(tmp1), expected.arg, expected.argw);
}
#else /* !SLJIT_32BIT_ARCHITECTURE */
sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_MEM1(kContextReg), OffsetOfContextField(tmp1), expected.arg, expected.argw);
#endif /* SLJIT_32BIT_ARCHITECTURE */

#if (defined SLJIT_32BIT_ARCHITECTURE && SLJIT_32BIT_ARCHITECTURE)
sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_MEM1(kContextReg), OffsetOfContextField(tmp2) + WORD_LOW_OFFSET, timeout.arg1, timeout.arg1w);
sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_MEM1(kContextReg), OffsetOfContextField(tmp2) + WORD_HIGH_OFFSET, timeout.arg2, timeout.arg2w);
#else /* !SLJIT_32BIT_ARCHITECTURE */
sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_MEM1(kContextReg), OffsetOfContextField(tmp2), timeout.arg, timeout.argw);
#endif /* SLJIT_32BIT_ARCHITECTURE */

sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_R1, 0, addr.baseReg, 0);
sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_R0, 0, kContextReg, 0);
sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_R2, 0, SLJIT_IMM, size);

sljit_emit_icall(compiler, SLJIT_CALL, SLJIT_ARGS3(W, P, W, W), SLJIT_IMM, GET_FUNC_ADDR(sljit_sw, atomicWaitCallback));

memoryShared = sljit_emit_cmp(compiler, SLJIT_EQUAL, SLJIT_IMM, ExecutionContext::NoError, SLJIT_R0, 0);
context->appendTrapJump(ExecutionContext::ExpectedSharedMemError, sljit_emit_jump(compiler, SLJIT_JUMP));
sljit_set_label(memoryShared, sljit_emit_label(compiler));

sljit_emit_op1(compiler, SLJIT_MOV, dst.arg, dst.argw, SLJIT_MEM1(kContextReg), OffsetOfContextField(tmp2));
}

static sljit_s32 atomicNotifyCallback(ExecutionContext* context, uint8_t* address)
{
Instance* instance = context->instance;
uint32_t result = 0;
int32_t count = context->tmp1[0];
instance->memory(0)->atomicNotify(instance->module()->store(), address, count, &result);
return result;
}

static void emitAtomicNotify(sljit_compiler* compiler, Instruction* instr)
{
MemoryAtomicNotify* atomicNotifyOperation = reinterpret_cast<MemoryAtomicNotify*>(instr->byteCode());
sljit_s32 offset = atomicNotifyOperation->offset();

Operand* operands = instr->operands();
MemAddress addr(MemAddress::CheckNaturalAlignment | MemAddress::AbsoluteAddress, instr->requiredReg(0), instr->requiredReg(1), instr->requiredReg(2));
addr.check(compiler, operands, offset, 4);

JITArg count(operands + 1);
JITArg dst(operands + 2);

sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_MEM1(kContextReg), OffsetOfContextField(tmp1), count.arg, count.argw);

sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_R1, 0, addr.baseReg, 0);
sljit_emit_op1(compiler, SLJIT_MOV, SLJIT_R0, 0, kContextReg, 0);

sljit_emit_icall(compiler, SLJIT_CALL, SLJIT_ARGS2(W, P, W), SLJIT_IMM, GET_FUNC_ADDR(sljit_sw, atomicNotifyCallback));

sljit_emit_op1(compiler, SLJIT_MOV, dst.arg, dst.argw, SLJIT_R0, 0);
}

#endif /* ENABLE_EXTENDED_FEATURES */
9 changes: 9 additions & 0 deletions src/jit/MemoryUtilInl.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,12 @@ static void emitDataDrop(sljit_compiler* compiler, Instruction* instr)
sljit_sw addr = GET_FUNC_ADDR(sljit_sw, dropData);
sljit_emit_icall(compiler, SLJIT_CALL, SLJIT_ARGS2V(32, W), SLJIT_IMM, addr);
}

#if defined(ENABLE_EXTENDED_FEATURES)

static void emitAtomicFence(sljit_compiler* compiler)
{
sljit_emit_op0(compiler, SLJIT_MEMORY_BARRIER);
}

#endif /* ENABLE_EXTENDED_FEATURES */
2 changes: 1 addition & 1 deletion src/parser/WASMParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2275,7 +2275,7 @@ class WASMBinaryReader : public wabt::WASMBinaryReaderDelegate {

virtual void OnAtomicFenceExpr(uint32_t consistency_model) override
{
// FIXME do nothing
pushByteCode(Walrus::AtomicFence(), WASMOpcode::AtomicFenceOpcode);
}

virtual void OnAtomicNotifyExpr(int opcode, Index memidx, Address alignmentLog2, Address offset) override
Expand Down
3 changes: 3 additions & 0 deletions src/runtime/JITExec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ ByteCodeStackOffset* JITFunction::call(ExecutionState& state, Instance* instance
case ExecutionContext::UnalignedAtomicError:
Trap::throwException(state, "unaligned atomic");
return resultOffsets;
case ExecutionContext::ExpectedSharedMemError:
Trap::throwException(state, "expected shared memory");
return resultOffsets;
#endif /* ENABLE_EXTENDED_FEATURES */
default:
Trap::throwException(state, "unknown exception");
Expand Down
1 change: 1 addition & 0 deletions src/runtime/JITExec.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct ExecutionContext {
UnreachableError,
#if defined(ENABLE_EXTENDED_FEATURES)
UnalignedAtomicError,
ExpectedSharedMemError,
#endif /* ENABLE_EXTENDED_FEATURES */

// These three in this order must be the last items of the list.
Expand Down
17 changes: 14 additions & 3 deletions src/runtime/Memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,21 @@ class Memory : public Extern {
throwUnsharedMemoryException(state);
}

atomicWait(state, store, m_buffer + (offset + addend), expect, timeOut, out);
}

template <typename T>
void atomicWait(ExecutionState& state, Store* store, uint8_t* absoluteAddress, const T& expect, int64_t timeOut, uint32_t* out) const
{
T read;
atomicLoad(state, offset, addend, &read);
atomicLoad(state, absoluteAddress - m_buffer, 0, &read);
if (read != expect) {
// "not-equal", the loaded value did not match the expected value
*out = 1;
} else {
// wait process
bool notified = false;
Waiter* waiter = store->getWaiter(static_cast<void*>(m_buffer + (offset + addend)));
Waiter* waiter = store->getWaiter(static_cast<void*>(absoluteAddress));

// lock waiter
std::unique_lock<std::mutex> lock(waiter->m_mutex);
Expand Down Expand Up @@ -259,7 +265,12 @@ class Memory : public Extern {
return;
}

Waiter* waiter = store->getWaiter(static_cast<void*>(m_buffer + (offset + addend)));
atomicNotify(store, m_buffer + (offset + addend), count, out);
}

void atomicNotify(Store* store, uint8_t* absoluteAddress, const uint32_t& count, uint32_t* out) const
{
Waiter* waiter = store->getWaiter(static_cast<void*>(absoluteAddress));

waiter->m_mutex.lock();
uint32_t realCount = std::min(waiter->m_waiterItemList.size(), (size_t)count);
Expand Down
20 changes: 20 additions & 0 deletions test/extended/threads/atomic_wait_notify_with_offsets.wast
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
;; wait/notify with non-zero offsets
(module
(memory 1 1 shared)

(func (export "initOffset") (param $value i64) (param $offset i32) (i64.store (local.get $offset) (local.get $value)))

(func (export "memory.atomic.notify") (param $addr i32) (param $count i32) (result i32)
(memory.atomic.notify (local.get 0) (local.get 1)))
(func (export "memory.atomic.wait32") (param $addr i32) (param $expected i32) (param $timeout i64) (result i32)
(memory.atomic.wait32 (local.get 0) (local.get 1) (local.get 2)))
(func (export "memory.atomic.wait64") (param $addr i32) (param $expected i64) (param $timeout i64) (result i32)
(memory.atomic.wait64 (local.get 0) (local.get 1) (local.get 2)))
)

;; non-zero offsets

(invoke "initOffset" (i64.const 0xffffffffffff) (i32.const 64))
(assert_return (invoke "memory.atomic.wait32" (i32.const 64) (i32.const 0) (i64.const 0)) (i32.const 1))
(assert_return (invoke "memory.atomic.wait64" (i32.const 64) (i64.const 0xffffffffffff) (i64.const 10)) (i32.const 2))
(assert_return (invoke "memory.atomic.notify" (i32.const 64) (i32.const 10)) (i32.const 0))
Loading

0 comments on commit 5bafd6d

Please sign in to comment.